implement inter pod topological affinity and anti-affinity

This commit is contained in:
Kevin
2016-05-04 06:50:31 +00:00
parent 28a8a23471
commit 82ba4f077e
46 changed files with 8302 additions and 814 deletions

View File

@@ -30,6 +30,7 @@ var (
ErrDiskConflict = newPredicateFailureError("NoDiskConflict")
ErrVolumeZoneConflict = newPredicateFailureError("NoVolumeZoneConflict")
ErrNodeSelectorNotMatch = newPredicateFailureError("MatchNodeSelector")
ErrPodAffinityNotMatch = newPredicateFailureError("MatchInterPodAffinity")
ErrPodNotMatchHostName = newPredicateFailureError("HostName")
ErrPodNotFitsHostPorts = newPredicateFailureError("PodFitsHostPorts")
ErrNodeLabelPresenceViolated = newPredicateFailureError("CheckNodeLabelPresence")

View File

@@ -19,14 +19,14 @@ package predicates
import (
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/unversioned"
)
type NodeInfo interface {
@@ -723,3 +723,198 @@ func GeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercache.N
}
return true, nil
}
type PodAffinityChecker struct {
info NodeInfo
podLister algorithm.PodLister
failureDomains priorityutil.Topologies
}
func NewPodAffinityPredicate(info NodeInfo, podLister algorithm.PodLister, failureDomains []string) algorithm.FitPredicate {
checker := &PodAffinityChecker{
info: info,
podLister: podLister,
failureDomains: priorityutil.Topologies{DefaultKeys: failureDomains},
}
return checker.InterPodAffinityMatches
}
func (checker *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node, err := checker.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
allPods, err := checker.podLister.List(labels.Everything())
if err != nil {
return false, err
}
if checker.NodeMatchPodAffinityAntiAffinity(pod, allPods, node) {
return true, nil
}
return false, ErrPodAffinityNotMatch
}
// AnyPodMatchesPodAffinityTerm checks if any of given pods can match the specific podAffinityTerm.
func (checker *PodAffinityChecker) AnyPodMatchesPodAffinityTerm(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAffinityTerm api.PodAffinityTerm) (bool, error) {
for _, ep := range allPods {
match, err := checker.failureDomains.CheckIfPodMatchPodAffinityTerm(ep, pod, podAffinityTerm,
func(ep *api.Pod) (*api.Node, error) { return checker.info.GetNodeInfo(ep.Spec.NodeName) },
func(pod *api.Pod) (*api.Node, error) { return node, nil },
)
if err != nil || match {
return match, err
}
}
return false, nil
}
// Checks whether the given node has pods which satisfy all the required pod affinity scheduling rules.
// If node has pods which satisfy all the required pod affinity scheduling rules then return true.
func (checker *PodAffinityChecker) NodeMatchesHardPodAffinity(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAffinity *api.PodAffinity) bool {
var podAffinityTerms []api.PodAffinityTerm
if len(podAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
podAffinityTerms = podAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(podAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// podAffinityTerms = append(podAffinityTerms, podAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for _, podAffinityTerm := range podAffinityTerms {
podAffinityTermMatches, err := checker.AnyPodMatchesPodAffinityTerm(pod, allPods, node, podAffinityTerm)
if err != nil {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, an error ocurred when checking existing pods on the node for PodAffinityTerm %v err: %v",
podName(pod), node.Name, podAffinityTerm, err)
return false
}
if !podAffinityTermMatches {
// TODO: Think about whether this can be simplified once we have controllerRef
// Check if it is in special case that the requiredDuringScheduling affinity requirement can be disregarded.
// If the requiredDuringScheduling affinity requirement matches a pod's own labels and namespace, and there are no other such pods
// anywhere, then disregard the requirement.
// This allows rules like "schedule all of the pods of this collection to the same zone" to not block forever
// because the first pod of the collection can't be scheduled.
names := priorityutil.GetNamespacesFromPodAffinityTerm(pod, podAffinityTerm)
labelSelector, err := unversioned.LabelSelectorAsSelector(podAffinityTerm.LabelSelector)
if err != nil || !names.Has(pod.Namespace) || !labelSelector.Matches(labels.Set(pod.Labels)) {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because none of the existing pods on this node satisfy the PodAffinityTerm %v, err: %+v",
podName(pod), node.Name, podAffinityTerm, err)
return false
}
// the affinity is to put the pod together with other pods from its same service or controller
filteredPods := priorityutil.FilterPodsByNameSpaces(names, allPods)
for _, filteredPod := range filteredPods {
// if found an existing pod from same service or RC,
// the affinity scheduling rules cannot be disregarded.
if labelSelector.Matches(labels.Set(filteredPod.Labels)) {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because none of the existing pods on this node satisfy the PodAffinityTerm %v",
podName(pod), node.Name, podAffinityTerm)
return false
}
}
}
}
// all the required pod affinity scheduling rules satisfied
glog.V(10).Infof("All the required pod affinity scheduling rules are satisfied for Pod %+v, on node %v", podName(pod), node.Name)
return true
}
// Checks whether the given node has pods which satisfy all the
// required pod anti-affinity scheduling rules.
// Also checks whether putting the pod onto the node would break
// any anti-affinity scheduling rules indicated by existing pods.
// If node has pods which satisfy all the required pod anti-affinity
// scheduling rules and scheduling the pod onto the node won't
// break any existing pods' anti-affinity rules, then return true.
func (checker *PodAffinityChecker) NodeMatchesHardPodAntiAffinity(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAntiAffinity *api.PodAntiAffinity) bool {
var podAntiAffinityTerms []api.PodAffinityTerm
if len(podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
podAntiAffinityTerms = podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// podAntiAffinityTerms = append(podAntiAffinityTerms, podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
// foreach element podAntiAffinityTerm of podAntiAffinityTerms
// if the pod matches the term (breaks the anti-affinity),
// don't schedule the pod onto this node.
for _, podAntiAffinityTerm := range podAntiAffinityTerms {
podAntiAffinityTermMatches, err := checker.AnyPodMatchesPodAffinityTerm(pod, allPods, node, podAntiAffinityTerm)
if err != nil || podAntiAffinityTermMatches == true {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because not all the existing pods on this node satisfy the PodAntiAffinityTerm %v, err: %v",
podName(pod), node.Name, podAntiAffinityTerm, err)
return false
}
}
// Check if scheduling the pod onto this node would break
// any anti-affinity rules indicated by the existing pods on the node.
// If it would break, system should not schedule pod onto this node.
for _, ep := range allPods {
epAffinity, err := api.GetAffinityFromPodAnnotations(ep.Annotations)
if err != nil {
glog.V(10).Infof("Failed to get Affinity from Pod %+v, err: %+v", podName(pod), err)
return false
}
if epAffinity.PodAntiAffinity != nil {
var epAntiAffinityTerms []api.PodAffinityTerm
if len(epAffinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
epAntiAffinityTerms = epAffinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(epAffinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// epAntiAffinityTerms = append(epAntiAffinityTerms, epAffinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for _, epAntiAffinityTerm := range epAntiAffinityTerms {
labelSelector, err := unversioned.LabelSelectorAsSelector(epAntiAffinityTerm.LabelSelector)
if err != nil {
glog.V(10).Infof("Failed to get label selector from anti-affinityterm %+v of existing pod %+v, err: %+v", epAntiAffinityTerm, podName(pod), err)
return false
}
names := priorityutil.GetNamespacesFromPodAffinityTerm(ep, epAntiAffinityTerm)
if (len(names) == 0 || names.Has(pod.Namespace)) && labelSelector.Matches(labels.Set(pod.Labels)) {
epNode, err := checker.info.GetNodeInfo(ep.Spec.NodeName)
if err != nil || checker.failureDomains.NodesHaveSameTopologyKey(node, epNode, epAntiAffinityTerm.TopologyKey) {
glog.V(10).Infof("Cannot schedule Pod %+v, onto node %v because the pod would break the PodAntiAffinityTerm %+v, of existing pod %+v, err: %v",
podName(pod), node.Name, epAntiAffinityTerm, podName(ep), err)
return false
}
}
}
}
}
// all the required pod anti-affinity scheduling rules are satisfied
glog.V(10).Infof("Can schedule Pod %+v, on node %v because all the required pod anti-affinity scheduling rules are satisfied", podName(pod), node.Name)
return true
}
// NodeMatchPodAffinityAntiAffinity checks if the node matches
// the requiredDuringScheduling affinity/anti-affinity rules indicated by the pod.
func (checker *PodAffinityChecker) NodeMatchPodAffinityAntiAffinity(pod *api.Pod, allPods []*api.Pod, node *api.Node) bool {
// Parse required affinity scheduling rules.
affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations)
if err != nil {
glog.V(10).Infof("Failed to get Affinity from Pod %+v, err: %+v", podName(pod), err)
return false
}
// check if the current node match the inter-pod affinity scheduling rules.
if affinity.PodAffinity != nil {
if !checker.NodeMatchesHardPodAffinity(pod, allPods, node, affinity.PodAffinity) {
return false
}
}
// check if the current node match the inter-pod anti-affinity scheduling rules.
if affinity.PodAntiAffinity != nil {
if !checker.NodeMatchesHardPodAntiAffinity(pod, allPods, node, affinity.PodAntiAffinity) {
return false
}
}
return true
}

View File

@@ -20,12 +20,14 @@ import (
"fmt"
"os/exec"
"reflect"
"strings"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/util/codeinspector"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
@@ -1564,3 +1566,673 @@ func TestRunGeneralPredicates(t *testing.T) {
}
}
}
func TestInterPodAffinity(t *testing.T) {
podLabel := map[string]string{"service": "securityscan"}
labels1 := map[string]string{
"region": "r1",
"zone": "z11",
}
podLabel2 := map[string]string{"security": "S1"}
node1 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: labels1}}
tests := []struct {
pod *api.Pod
pods []*api.Pod
node api.Node
fits bool
test string
}{
{
pod: new(api.Pod),
node: node1,
fits: true,
test: "A pod that has no required pod affinity scheduling rules can schedule onto a node with no existing pods",
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: podLabel2,
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{"podAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "In",
"values": ["securityscan", "value2"]
}]
},
"topologyKey": "region"
}]
}}`,
},
},
},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: podLabel}}},
node: node1,
fits: true,
test: "satisfies with requiredDuringSchedulingIgnoredDuringExecution in PodAffinity using In operator that matches the existing pod",
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: podLabel2,
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{"podAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "NotIn",
"values": ["securityscan3", "value3"]
}]
},
"topologyKey": "region"
}]
}}`,
},
},
},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: podLabel}}},
node: node1,
fits: true,
test: "satisfies the pod with requiredDuringSchedulingIgnoredDuringExecution in PodAffinity using not in operator in labelSelector that matches the existing pod",
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: podLabel2,
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{"podAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "In",
"values": ["securityscan", "value2"]
}]
},
"namespaces":["DiffNameSpace"]
}]
}}`,
},
},
},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: podLabel, Namespace: "ns"}}},
node: node1,
fits: false,
test: "Does not satisfy the PodAffinity with labelSelector because of diff Namespace",
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: podLabel,
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{"podAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "In",
"values": ["antivirusscan", "value2"]
}]
}
}]
}}`,
},
},
},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: podLabel}}},
node: node1,
fits: false,
test: "Doesn't satisfy the PodAffinity because of unmatching labelSelector with the existing pod",
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: podLabel2,
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{"podAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [
{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "Exists"
}, {
"key": "wrongkey",
"operator": "DoesNotExist"
}]
},
"topologyKey": "region"
}, {
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "In",
"values": ["securityscan"]
}, {
"key": "service",
"operator": "NotIn",
"values": ["WrongValue"]
}]
},
"topologyKey": "region"
}
]
}}`,
},
},
},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: podLabel}}},
node: node1,
fits: true,
test: "satisfies the PodAffinity with different label Operators in multiple RequiredDuringSchedulingIgnoredDuringExecution ",
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: podLabel2,
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{"podAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [
{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "Exists"
}, {
"key": "wrongkey",
"operator": "DoesNotExist"
}]
},
"topologyKey": "region"
}, {
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "In",
"values": ["securityscan2"]
}, {
"key": "service",
"operator": "NotIn",
"values": ["WrongValue"]
}]
},
"topologyKey": "region"
}
]
}}`,
},
},
},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: podLabel}}},
node: node1,
fits: false,
test: "The labelSelector requirements(items of matchExpressions) are ANDed, the pod cannot schedule onto the node becasue one of the matchExpression item don't match.",
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: podLabel2,
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{"podAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "In",
"values": ["securityscan", "value2"]
}]
},
"topologyKey": "region"
}]
},
"podAntiAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "In",
"values": ["antivirusscan", "value2"]
}]
},
"topologyKey": "node"
}]
}}`,
},
},
},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: podLabel}}},
node: node1,
fits: true,
test: "satisfies the PodAffinity and PodAntiAffinity with the existing pod",
},
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//{
// pod: &api.Pod{
// ObjectMeta: api.ObjectMeta{
// Labels: podLabel2,
// Annotations: map[string]string{
// api.AffinityAnnotationKey: `
// {"podAffinity": {
// "requiredDuringSchedulingRequiredDuringExecution": [
// {
// "labelSelector": {
// "matchExpressions": [{
// "key": "service",
// "operator": "Exists"
// }, {
// "key": "wrongkey",
// "operator": "DoesNotExist"
// }]
// },
// "topologyKey": "region"
// }, {
// "labelSelector": {
// "matchExpressions": [{
// "key": "service",
// "operator": "In",
// "values": ["securityscan"]
// }, {
// "key": "service",
// "operator": "NotIn",
// "values": ["WrongValue"]
// }]
// },
// "topologyKey": "region"
// }
// ]
// }}`,
// },
// },
// },
// pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: podlabel}}},
// node: node1,
// fits: true,
// test: "satisfies the PodAffinity with different Label Operators in multiple RequiredDuringSchedulingRequiredDuringExecution ",
//},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: podLabel2,
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{"podAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "In",
"values": ["securityscan", "value2"]
}]
},
"topologyKey": "region"
}]
},
"podAntiAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "In",
"values": ["antivirusscan", "value2"]
}]
},
"topologyKey": "node"
}]
}}`,
},
},
},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine1"},
ObjectMeta: api.ObjectMeta{Labels: podLabel,
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{"PodAntiAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "In",
"values": ["antivirusscan", "value2"]
}]
},
"topologyKey": "node"
}]
}}`,
}},
}},
node: node1,
fits: true,
test: "satisfies the PodAffinity and PodAntiAffinity and PodAntiAffinity symmetry with the existing pod",
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: podLabel2,
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{"podAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "In",
"values": ["securityscan", "value2"]
}]
},
"topologyKey": "region"
}]
},
"podAntiAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "In",
"values": ["securityscan", "value2"]
}]
},
"topologyKey": "zone"
}]
}}`,
},
},
},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: podLabel}}},
node: node1,
fits: false,
test: "satisfies the PodAffinity but doesn't satisfies the PodAntiAffinity with the existing pod",
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: podLabel,
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{"podAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "In",
"values": ["securityscan", "value2"]
}]
},
"topologyKey": "region"
}]
},
"podAntiAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "In",
"values": ["antivirusscan", "value2"]
}]
},
"topologyKey": "node"
}]
}}`,
},
},
},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine1"},
ObjectMeta: api.ObjectMeta{Labels: podLabel,
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{"PodAntiAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "In",
"values": ["securityscan", "value2"]
}]
},
"topologyKey": "zone"
}]
}}`,
}},
}},
node: node1,
fits: false,
test: "satisfies the PodAffinity and PodAntiAffinity but doesn't satisfies PodAntiAffinity symmetry with the existing pod",
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: podLabel,
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{"podAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "service",
"operator": "NotIn",
"values": ["securityscan", "value2"]
}]
},
"topologyKey": "region"
}]
}}`,
},
},
},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: podLabel}}},
node: node1,
fits: false,
test: "pod matches its own Label in PodAffinity and that matches the existing pod Labels",
},
}
for _, test := range tests {
node := test.node
var podsOnNode []*api.Pod
for _, pod := range test.pods {
if pod.Spec.NodeName == node.Name {
podsOnNode = append(podsOnNode, pod)
}
}
fit := PodAffinityChecker{
info: FakeNodeInfo(node),
podLister: algorithm.FakePodLister(test.pods),
failureDomains: priorityutil.Topologies{DefaultKeys: strings.Split(api.DefaultFailureDomains, ",")},
}
fits, err := fit.InterPodAffinityMatches(test.pod, test.node.Name, schedulercache.NewNodeInfo(podsOnNode...))
if !reflect.DeepEqual(err, ErrPodAffinityNotMatch) && err != nil {
t.Errorf("%s: unexpected error %v", test.test, err)
}
if fits != test.fits {
t.Errorf("%s: expected %v got %v", test.test, test.fits, fits)
}
}
}
func TestInterPodAffinityWithMultipleNodes(t *testing.T) {
podLabelA := map[string]string{
"foo": "bar",
}
labelRgChina := map[string]string{
"region": "China",
}
labelRgChinaAzAz1 := map[string]string{
"region": "China",
"az": "az1",
}
labelRgIndia := map[string]string{
"region": "India",
}
tests := []struct {
pod *api.Pod
pods []*api.Pod
nodes []api.Node
fits map[string]bool
test string
}{
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{"podAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "foo",
"operator": "In",
"values": ["bar"]
}]
},
"topologyKey": "region"
}]
}}`,
},
},
},
pods: []*api.Pod{
{Spec: api.PodSpec{NodeName: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: podLabelA}},
},
nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: labelRgChinaAzAz1}},
{ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: labelRgIndia}},
},
fits: map[string]bool{
"machine1": true,
"machine2": true,
"machine3": false,
},
test: "A pod can be scheduled onto all the nodes that have the same topology key & label value with one of them has an existing pod that match the affinity rules",
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [{
"matchExpressions": [{
"key": "hostname",
"operator": "NotIn",
"values": ["h1"]
}]
}]
}
},
"podAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "foo",
"operator": "In",
"values": ["abc"]
}]
},
"topologyKey": "region"
}]
}
}`,
},
},
},
pods: []*api.Pod{
{Spec: api.PodSpec{NodeName: "nodeA"}, ObjectMeta: api.ObjectMeta{Labels: map[string]string{"foo": "abc"}}},
{Spec: api.PodSpec{NodeName: "nodeB"}, ObjectMeta: api.ObjectMeta{Labels: map[string]string{"foo": "def"}}},
},
nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "nodeA", Labels: map[string]string{"region": "r1", "hostname": "h1"}}},
{ObjectMeta: api.ObjectMeta{Name: "nodeB", Labels: map[string]string{"region": "r1", "hostname": "h2"}}},
},
fits: map[string]bool{
"nodeA": false,
"nodeB": true,
},
test: "NodeA and nodeB have same topologyKey and label value. NodeA does not satisfy node affinity rule, but has an existing pod that match the inter pod affinity rule. The pod can be scheduled onto nodeB.",
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
Annotations: map[string]string{
api.AffinityAnnotationKey: `
{"podAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": {
"matchExpressions": [{
"key": "foo",
"operator": "In",
"values": ["bar"]
}]
},
"topologyKey": "zone"
}]
}}`,
},
},
},
pods: []*api.Pod{},
nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "nodeA", Labels: map[string]string{"zone": "az1", "hostname": "h1"}}},
{ObjectMeta: api.ObjectMeta{Name: "nodeB", Labels: map[string]string{"zone": "az2", "hostname": "h2"}}},
},
fits: map[string]bool{
"nodeA": true,
"nodeB": true,
},
test: "The affinity rule is to schedule all of the pods of this collection to the same zone. The first pod of the collection " +
"should not be blocked from being scheduled onto any node, even there's no existing pod that match the rule anywhere.",
},
}
for _, test := range tests {
nodeListInfo := FakeNodeListInfo(test.nodes)
for _, node := range test.nodes {
var podsOnNode []*api.Pod
for _, pod := range test.pods {
if pod.Spec.NodeName == node.Name {
podsOnNode = append(podsOnNode, pod)
}
}
testFit := PodAffinityChecker{
info: nodeListInfo,
podLister: algorithm.FakePodLister(test.pods),
failureDomains: priorityutil.Topologies{DefaultKeys: strings.Split(api.DefaultFailureDomains, ",")},
}
fits, err := testFit.InterPodAffinityMatches(test.pod, node.Name, schedulercache.NewNodeInfo(podsOnNode...))
if !reflect.DeepEqual(err, ErrPodAffinityNotMatch) && err != nil {
t.Errorf("%s: unexpected error %v", test.test, err)
}
affinity, err := api.GetAffinityFromPodAnnotations(test.pod.ObjectMeta.Annotations)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if affinity.NodeAffinity != nil {
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(&node)
fits2, err := PodSelectorMatches(test.pod, node.Name, nodeInfo)
if !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) && err != nil {
t.Errorf("unexpected error: %v", err)
}
fits = fits && fits2
}
if fits != test.fits[node.Name] {
t.Errorf("%s: expected %v for %s got %v", test.test, test.fits[node.Name], node.Name, fits)
}
}
}
}