feat(scheduler): rename PluginContext to CycleState

This commit is contained in:
draveness
2019-10-03 09:46:35 +08:00
parent 3c0bc3c5ad
commit c73ff9749b
20 changed files with 294 additions and 294 deletions

View File

@@ -87,9 +87,9 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
}
// find the config args of a plugin
pc := pluginConfig[name]
state := pluginConfig[name]
p, err := factory(pc, f)
p, err := factory(state, f)
if err != nil {
return nil, fmt.Errorf("error initializing plugin %q: %v", name, err)
}
@@ -205,9 +205,9 @@ func (f *framework) QueueSortFunc() LessFunc {
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
func (f *framework) RunPreFilterPlugins(
pc *PluginContext, pod *v1.Pod) *Status {
state *CycleState, pod *v1.Pod) *Status {
for _, pl := range f.preFilterPlugins {
status := pl.PreFilter(pc, pod)
status := pl.PreFilter(state, pod)
if !status.IsSuccess() {
if status.IsUnschedulable() {
msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message())
@@ -226,13 +226,13 @@ func (f *framework) RunPreFilterPlugins(
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
func (f *framework) RunPreFilterExtensionAddPod(pc *PluginContext, podToSchedule *v1.Pod,
func (f *framework) RunPreFilterExtensionAddPod(state *CycleState, podToSchedule *v1.Pod,
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
for _, pl := range f.preFilterPlugins {
if pl.Extensions() == nil {
continue
}
if status := pl.Extensions().AddPod(pc, podToSchedule, podToAdd, nodeInfo); !status.IsSuccess() {
if status := pl.Extensions().AddPod(state, podToSchedule, podToAdd, nodeInfo); !status.IsSuccess() {
msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
@@ -246,13 +246,13 @@ func (f *framework) RunPreFilterExtensionAddPod(pc *PluginContext, podToSchedule
// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
func (f *framework) RunPreFilterExtensionRemovePod(pc *PluginContext, podToSchedule *v1.Pod,
func (f *framework) RunPreFilterExtensionRemovePod(state *CycleState, podToSchedule *v1.Pod,
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
for _, pl := range f.preFilterPlugins {
if pl.Extensions() == nil {
continue
}
if status := pl.Extensions().RemovePod(pc, podToSchedule, podToRemove, nodeInfo); !status.IsSuccess() {
if status := pl.Extensions().RemovePod(state, podToSchedule, podToRemove, nodeInfo); !status.IsSuccess() {
msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
@@ -267,10 +267,10 @@ func (f *framework) RunPreFilterExtensionRemovePod(pc *PluginContext, podToSched
// the given node. If any of these plugins doesn't return "Success", the
// given node is not suitable for running pod.
// Meanwhile, the failure message and status are set for the given node.
func (f *framework) RunFilterPlugins(pc *PluginContext,
func (f *framework) RunFilterPlugins(state *CycleState,
pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
for _, pl := range f.filterPlugins {
status := pl.Filter(pc, pod, nodeInfo)
status := pl.Filter(state, pod, nodeInfo)
if !status.IsSuccess() {
if !status.IsUnschedulable() {
errMsg := fmt.Sprintf("error while running %q filter plugin for pod %q: %v",
@@ -289,13 +289,13 @@ func (f *framework) RunFilterPlugins(pc *PluginContext,
// of these plugins returns any status other than "Success", the given node is
// rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses.
func (f *framework) RunPostFilterPlugins(
pc *PluginContext,
state *CycleState,
pod *v1.Pod,
nodes []*v1.Node,
filteredNodesStatuses NodeToStatusMap,
) *Status {
for _, pl := range f.postFilterPlugins {
status := pl.PostFilter(pc, pod, nodes, filteredNodesStatuses)
status := pl.PostFilter(state, pod, nodes, filteredNodesStatuses)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running %q postfilter plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
@@ -310,7 +310,7 @@ func (f *framework) RunPostFilterPlugins(
// stores for each scoring plugin name the corresponding NodeScoreList(s).
// It also returns *Status, which is set to non-success if any of the plugins returns
// a non-success status.
func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScores, *Status) {
func (f *framework) RunScorePlugins(state *CycleState, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScores, *Status) {
pluginToNodeScores := make(PluginToNodeScores, len(f.scorePlugins))
for _, pl := range f.scorePlugins {
pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes))
@@ -322,7 +322,7 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
for _, pl := range f.scorePlugins {
nodeName := nodes[index].Name
score, status := pl.Score(pc, pod, nodeName)
score, status := pl.Score(state, pod, nodeName)
if !status.IsSuccess() {
errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel)
return
@@ -346,7 +346,7 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
if pl.Extensions() == nil {
return
}
if status := pl.Extensions().NormalizeScore(pc, pod, nodeScoreList); !status.IsSuccess() {
if status := pl.Extensions().NormalizeScore(state, pod, nodeScoreList); !status.IsSuccess() {
err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message())
errCh.SendErrorWithCancel(err, cancel)
return
@@ -388,9 +388,9 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
// failure (bool) if any of the plugins returns an error. It also returns an
// error containing the rejection message or the error occurred in the plugin.
func (f *framework) RunPreBindPlugins(
pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
state *CycleState, pod *v1.Pod, nodeName string) *Status {
for _, pl := range f.preBindPlugins {
status := pl.PreBind(pc, pod, nodeName)
status := pl.PreBind(state, pod, nodeName)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running %q prebind plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
@@ -401,13 +401,13 @@ func (f *framework) RunPreBindPlugins(
}
// RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status.
func (f *framework) RunBindPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
func (f *framework) RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status {
if len(f.bindPlugins) == 0 {
return NewStatus(Skip, "")
}
var status *Status
for _, bp := range f.bindPlugins {
status = bp.Bind(pc, pod, nodeName)
status = bp.Bind(state, pod, nodeName)
if status != nil && status.Code() == Skip {
continue
}
@@ -423,9 +423,9 @@ func (f *framework) RunBindPlugins(pc *PluginContext, pod *v1.Pod, nodeName stri
// RunPostBindPlugins runs the set of configured postbind plugins.
func (f *framework) RunPostBindPlugins(
pc *PluginContext, pod *v1.Pod, nodeName string) {
state *CycleState, pod *v1.Pod, nodeName string) {
for _, pl := range f.postBindPlugins {
pl.PostBind(pc, pod, nodeName)
pl.PostBind(state, pod, nodeName)
}
}
@@ -433,9 +433,9 @@ func (f *framework) RunPostBindPlugins(
// plugins returns an error, it does not continue running the remaining ones and
// returns the error. In such case, pod will not be scheduled.
func (f *framework) RunReservePlugins(
pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
state *CycleState, pod *v1.Pod, nodeName string) *Status {
for _, pl := range f.reservePlugins {
status := pl.Reserve(pc, pod, nodeName)
status := pl.Reserve(state, pod, nodeName)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running %q reserve plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
@@ -447,9 +447,9 @@ func (f *framework) RunReservePlugins(
// RunUnreservePlugins runs the set of configured unreserve plugins.
func (f *framework) RunUnreservePlugins(
pc *PluginContext, pod *v1.Pod, nodeName string) {
state *CycleState, pod *v1.Pod, nodeName string) {
for _, pl := range f.unreservePlugins {
pl.Unreserve(pc, pod, nodeName)
pl.Unreserve(state, pod, nodeName)
}
}
@@ -461,11 +461,11 @@ func (f *framework) RunUnreservePlugins(
// Note that if multiple plugins asked to wait, then we wait for the minimum
// timeout duration.
func (f *framework) RunPermitPlugins(
pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
state *CycleState, pod *v1.Pod, nodeName string) *Status {
timeout := maxTimeout
statusCode := Success
for _, pl := range f.permitPlugins {
status, d := pl.Permit(pc, pod, nodeName)
status, d := pl.Permit(state, pod, nodeName)
if !status.IsSuccess() {
if status.IsUnschedulable() {
msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message())
@@ -535,13 +535,13 @@ func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
}
func pluginNameToConfig(args []config.PluginConfig) map[string]*runtime.Unknown {
pc := make(map[string]*runtime.Unknown, 0)
state := make(map[string]*runtime.Unknown, 0)
for i := range args {
// This is needed because the type of PluginConfig.Args is not pointer type.
p := args[i]
pc[p.Name] = &p.Args
state[p.Name] = &p.Args
}
return pc
return state
}
func pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {