diff --git a/pkg/controller/node/cidr_allocator.go b/pkg/controller/node/cidr_allocator.go index 066a9dca7be..4af5748aa9d 100644 --- a/pkg/controller/node/cidr_allocator.go +++ b/pkg/controller/node/cidr_allocator.go @@ -17,144 +17,175 @@ limitations under the License. package node import ( - "encoding/binary" "errors" "fmt" - "math/big" "net" - "sync" + + "k8s.io/kubernetes/pkg/api" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/util/wait" + + "github.com/golang/glog" +) + +// TODO: figure out the good setting for those constants. +const ( + // controls how many NodeSpec updates NC can process concurrently. + cidrUpdateWorkers = 10 + cidrUpdateQueueSize = 5000 + // podCIDRUpdateRetry controls the number of retries of writing Node.Spec.PodCIDR update. + podCIDRUpdateRetry = 5 ) var errCIDRRangeNoCIDRsRemaining = errors.New("CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range") +type nodeAndCIDR struct { + cidr *net.IPNet + nodeName string +} + // CIDRAllocator is an interface implemented by things that know how to allocate/occupy/recycle CIDR for nodes. type CIDRAllocator interface { - AllocateNext() (*net.IPNet, error) - Occupy(*net.IPNet) error - Release(*net.IPNet) error + AllocateOrOccupyCIDR(node *api.Node) error + ReleaseCIDR(node *api.Node) error } type rangeAllocator struct { - clusterCIDR *net.IPNet - clusterIP net.IP - clusterMaskSize int - subNetMaskSize int - maxCIDRs int - used big.Int - lock sync.Mutex - nextCandidate int + client clientset.Interface + cidrs *cidrSet + clusterCIDR *net.IPNet + maxCIDRs int + // Channel that is used to pass updating Nodes with assigned CIDRs to the background + // This increases a throughput of CIDR assignment by not blocking on long operations. + nodeCIDRUpdateChannel chan nodeAndCIDR + recorder record.EventRecorder } // NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node // Caller must ensure subNetMaskSize is not less than cluster CIDR mask size. -func NewCIDRRangeAllocator(clusterCIDR *net.IPNet, subNetMaskSize int) CIDRAllocator { - clusterMask := clusterCIDR.Mask - clusterMaskSize, _ := clusterMask.Size() +func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int) CIDRAllocator { + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "cidrAllocator"}) + eventBroadcaster.StartLogging(glog.Infof) ra := &rangeAllocator{ - clusterCIDR: clusterCIDR, - clusterIP: clusterCIDR.IP.To4(), - clusterMaskSize: clusterMaskSize, - subNetMaskSize: subNetMaskSize, - maxCIDRs: 1 << uint32(subNetMaskSize-clusterMaskSize), - nextCandidate: 0, + client: client, + cidrs: newCIDRSet(clusterCIDR, subNetMaskSize), + clusterCIDR: clusterCIDR, + nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize), + recorder: recorder, } + + if serviceCIDR != nil { + ra.filterOutServiceRange(serviceCIDR) + } else { + glog.V(0).Info("No Service CIDR provided. Skipping filtering out service addresses.") + } + for i := 0; i < cidrUpdateWorkers; i++ { + go func(stopChan <-chan struct{}) { + for { + select { + case workItem, ok := <-ra.nodeCIDRUpdateChannel: + if !ok { + glog.Warning("NodeCIDRUpdateChannel read returned false.") + return + } + ra.updateCIDRAllocation(workItem) + case <-stopChan: + return + } + } + }(wait.NeverStop) + } + return ra } -func (r *rangeAllocator) AllocateNext() (*net.IPNet, error) { - r.lock.Lock() - defer r.lock.Unlock() +// AllocateOrOccupyCIDR looks at the given node, assigns it a valid CIDR +// if it doesn't currently have one or mark the CIDR as used if the node already have one. +func (r *rangeAllocator) AllocateOrOccupyCIDR(node *api.Node) error { + if node.Spec.PodCIDR != "" { + _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) + if err != nil { + return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) + } + if err := r.cidrs.occupy(podCIDR); err != nil { + return fmt.Errorf("failed to mark cidr as occupied: %v", err) + } + return nil + } + podCIDR, err := r.cidrs.allocateNext() + if err != nil { + recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") + return fmt.Errorf("failed to allocate cidr: %v", err) + } - nextUnused := -1 - for i := 0; i < r.maxCIDRs; i++ { - candidate := (i + r.nextCandidate) % r.maxCIDRs - if r.used.Bit(candidate) == 0 { - nextUnused = candidate + glog.V(10).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR) + r.nodeCIDRUpdateChannel <- nodeAndCIDR{ + nodeName: node.Name, + cidr: podCIDR, + } + return nil +} + +// ReleaseCIDR releases the CIDR of the removed node +func (r *rangeAllocator) ReleaseCIDR(node *api.Node) error { + if node.Spec.PodCIDR == "" { + return nil + } + _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) + if err != nil { + return fmt.Errorf("Failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) + } + + glog.V(4).Infof("recycle node %s CIDR %s", node.Name, podCIDR) + if err = r.cidrs.release(podCIDR); err != nil { + return fmt.Errorf("Failed to release cidr: %v", err) + } + return err +} + +// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used, so that they won't be +// assignable. +func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) { + // Checks if service CIDR has a nonempty intersection with cluster CIDR. It is the case if either + // clusterCIDR contains serviceCIDR with clusterCIDR's Mask applied (this means that clusterCIDR contains serviceCIDR) + // or vice versa (which means that serviceCIDR contains clusterCIDR). + if !r.clusterCIDR.Contains(serviceCIDR.IP.Mask(r.clusterCIDR.Mask)) && !serviceCIDR.Contains(r.clusterCIDR.IP.Mask(serviceCIDR.Mask)) { + return + } + + if err := r.cidrs.occupy(serviceCIDR); err != nil { + glog.Errorf("Error filtering out service cidr %v: %v", serviceCIDR, err) + } +} + +// Assigns CIDR to Node and sends an update to the API server. +func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error { + var err error + var node *api.Node + for rep := 0; rep < podCIDRUpdateRetry; rep++ { + // TODO: change it to using PATCH instead of full Node updates. + node, err = r.client.Core().Nodes().Get(data.nodeName) + glog.Infof("Got Node: %v", node) + if err != nil { + glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err) + continue + } + node.Spec.PodCIDR = data.cidr.String() + if _, err := r.client.Core().Nodes().Update(node); err != nil { + glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err) + } else { break } } - if nextUnused == -1 { - return nil, errCIDRRangeNoCIDRsRemaining - } - r.nextCandidate = (nextUnused + 1) % r.maxCIDRs - - r.used.SetBit(&r.used, nextUnused, 1) - - j := uint32(nextUnused) << uint32(32-r.subNetMaskSize) - ipInt := (binary.BigEndian.Uint32(r.clusterIP)) | j - ip := make([]byte, 4) - binary.BigEndian.PutUint32(ip, ipInt) - - return &net.IPNet{ - IP: ip, - Mask: net.CIDRMask(r.subNetMaskSize, 32), - }, nil -} - -func (r *rangeAllocator) Release(cidr *net.IPNet) error { - used, err := r.getIndexForCIDR(cidr) if err != nil { - return err - } - - r.lock.Lock() - defer r.lock.Unlock() - r.used.SetBit(&r.used, used, 0) - - return nil -} - -func (r *rangeAllocator) MaxCIDRs() int { - return r.maxCIDRs -} - -func (r *rangeAllocator) Occupy(cidr *net.IPNet) (err error) { - begin, end := 0, r.maxCIDRs - cidrMask := cidr.Mask - maskSize, _ := cidrMask.Size() - - if !r.clusterCIDR.Contains(cidr.IP.Mask(r.clusterCIDR.Mask)) && !cidr.Contains(r.clusterCIDR.IP.Mask(cidr.Mask)) { - return fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, r.clusterCIDR) - } - - if r.clusterMaskSize < maskSize { - subNetMask := net.CIDRMask(r.subNetMaskSize, 32) - begin, err = r.getIndexForCIDR(&net.IPNet{ - IP: cidr.IP.To4().Mask(subNetMask), - Mask: subNetMask, - }) - if err != nil { - return err - } - - ip := make([]byte, 4) - ipInt := binary.BigEndian.Uint32(cidr.IP) | (^binary.BigEndian.Uint32(cidr.Mask)) - binary.BigEndian.PutUint32(ip, ipInt) - end, err = r.getIndexForCIDR(&net.IPNet{ - IP: net.IP(ip).To4().Mask(subNetMask), - Mask: subNetMask, - }) - if err != nil { - return err + recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed") + glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err) + if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil { + glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr) } } - - r.lock.Lock() - defer r.lock.Unlock() - for i := begin; i <= end; i++ { - r.used.SetBit(&r.used, i, 1) - } - - return nil -} - -func (r *rangeAllocator) getIndexForCIDR(cidr *net.IPNet) (int, error) { - cidrIndex := (binary.BigEndian.Uint32(r.clusterIP) ^ binary.BigEndian.Uint32(cidr.IP.To4())) >> uint32(32-r.subNetMaskSize) - - if cidrIndex >= uint32(r.maxCIDRs) { - return 0, fmt.Errorf("CIDR: %v is out of the range of CIDR allocator", cidr) - } - - return int(cidrIndex), nil + return err } diff --git a/pkg/controller/node/cidr_allocator_test.go b/pkg/controller/node/cidr_allocator_test.go index 8fd75788546..4d946fd60f6 100644 --- a/pkg/controller/node/cidr_allocator_test.go +++ b/pkg/controller/node/cidr_allocator_test.go @@ -17,334 +17,380 @@ limitations under the License. package node import ( - "github.com/golang/glog" - "math/big" "net" - "reflect" "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/util/wait" ) -func TestRangeAllocatorFullyAllocated(t *testing.T) { - _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/30") - a := NewCIDRRangeAllocator(clusterCIDR, 30) - p, err := a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if p.String() != "127.123.234.0/30" { - t.Fatalf("unexpected allocated cidr: %s", p.String()) - } +const ( + nodePollInterval = 100 * time.Millisecond +) - _, err = a.AllocateNext() - if err == nil { - t.Fatalf("expected error because of fully-allocated range") - } - - a.Release(p) - p, err = a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if p.String() != "127.123.234.0/30" { - t.Fatalf("unexpected allocated cidr: %s", p.String()) - } - _, err = a.AllocateNext() - if err == nil { - t.Fatalf("expected error because of fully-allocated range") - } +func waitForUpdatedNodeWithTimeout(nodeHandler *FakeNodeHandler, number int, timeout time.Duration) error { + return wait.Poll(nodePollInterval, timeout, func() (bool, error) { + if len(nodeHandler.getUpdatedNodesCopy()) >= number { + return true, nil + } + return false, nil + }) } -func TestRangeAllocator_RandomishAllocation(t *testing.T) { - _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/16") - a := NewCIDRRangeAllocator(clusterCIDR, 24) - - // allocate all the CIDRs - var err error - cidrs := make([]*net.IPNet, 256) - - for i := 0; i < 256; i++ { - cidrs[i], err = a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - } - - _, err = a.AllocateNext() - if err == nil { - t.Fatalf("expected error because of fully-allocated range") - } - // release them all - for i := 0; i < 256; i++ { - a.Release(cidrs[i]) - } - - // allocate the CIDRs again - rcidrs := make([]*net.IPNet, 256) - for i := 0; i < 256; i++ { - rcidrs[i], err = a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %d, %v", i, err) - } - } - _, err = a.AllocateNext() - if err == nil { - t.Fatalf("expected error because of fully-allocated range") - } - - if !reflect.DeepEqual(cidrs, rcidrs) { - t.Fatalf("expected re-allocated cidrs are the same collection") - } -} - -func TestRangeAllocator_AllocationOccupied(t *testing.T) { - _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/16") - a := NewCIDRRangeAllocator(clusterCIDR, 24) - - // allocate all the CIDRs - var err error - cidrs := make([]*net.IPNet, 256) - - for i := 0; i < 256; i++ { - cidrs[i], err = a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - } - - _, err = a.AllocateNext() - if err == nil { - t.Fatalf("expected error because of fully-allocated range") - } - // release them all - for i := 0; i < 256; i++ { - a.Release(cidrs[i]) - } - // occupy the last 128 CIDRs - for i := 128; i < 256; i++ { - a.Occupy(cidrs[i]) - } - - // allocate the first 128 CIDRs again - rcidrs := make([]*net.IPNet, 128) - for i := 0; i < 128; i++ { - rcidrs[i], err = a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %d, %v", i, err) - } - } - _, err = a.AllocateNext() - if err == nil { - t.Fatalf("expected error because of fully-allocated range") - } - - // check Occupy() work properly - for i := 128; i < 256; i++ { - rcidrs = append(rcidrs, cidrs[i]) - } - if !reflect.DeepEqual(cidrs, rcidrs) { - t.Fatalf("expected re-allocated cidrs are the same collection") - } -} - -func TestGetBitforCIDR(t *testing.T) { - cases := []struct { - clusterCIDRStr string - subNetMaskSize int - subNetCIDRStr string - expectedBit int - expectErr bool +func TestAllocateOrOccupyCIDRSuccess(t *testing.T) { + testCases := []struct { + description string + fakeNodeHandler *FakeNodeHandler + clusterCIDR *net.IPNet + serviceCIDR *net.IPNet + subNetMaskSize int + expectedAllocatedCIDR string + allocatedCIDRs []string }{ { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 16, - subNetCIDRStr: "127.0.0.0/16", - expectedBit: 0, - expectErr: false, + description: "When there's no ServiceCIDR return first CIDR in range", + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + }, + }, + }, + Clientset: fake.NewSimpleClientset(), + }, + clusterCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/24") + return clusterCIDR + }(), + serviceCIDR: nil, + subNetMaskSize: 30, + expectedAllocatedCIDR: "127.123.234.0/30", }, { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 16, - subNetCIDRStr: "127.123.0.0/16", - expectedBit: 123, - expectErr: false, + description: "Correctly filter out ServiceCIDR", + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + }, + }, + }, + Clientset: fake.NewSimpleClientset(), + }, + clusterCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/24") + return clusterCIDR + }(), + serviceCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/26") + return clusterCIDR + }(), + subNetMaskSize: 30, + // it should return first /30 CIDR after service range + expectedAllocatedCIDR: "127.123.234.64/30", }, { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 16, - subNetCIDRStr: "127.168.0.0/16", - expectedBit: 168, - expectErr: false, - }, - { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 16, - subNetCIDRStr: "127.224.0.0/16", - expectedBit: 224, - expectErr: false, - }, - { - clusterCIDRStr: "192.168.0.0/16", - subNetMaskSize: 24, - subNetCIDRStr: "192.168.12.0/24", - expectedBit: 12, - expectErr: false, - }, - { - clusterCIDRStr: "192.168.0.0/16", - subNetMaskSize: 24, - subNetCIDRStr: "192.168.151.0/24", - expectedBit: 151, - expectErr: false, - }, - { - clusterCIDRStr: "192.168.0.0/16", - subNetMaskSize: 24, - subNetCIDRStr: "127.168.224.0/24", - expectErr: true, + description: "Correctly ignore already allocated CIDRs", + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + }, + }, + }, + Clientset: fake.NewSimpleClientset(), + }, + clusterCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/24") + return clusterCIDR + }(), + serviceCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/26") + return clusterCIDR + }(), + subNetMaskSize: 30, + allocatedCIDRs: []string{"127.123.234.64/30", "127.123.234.68/30", "127.123.234.72/30", "127.123.234.80/30"}, + expectedAllocatedCIDR: "127.123.234.76/30", }, } - for _, tc := range cases { - _, clusterCIDR, err := net.ParseCIDR(tc.clusterCIDRStr) - clusterMask := clusterCIDR.Mask - clusterMaskSize, _ := clusterMask.Size() - if err != nil { - t.Fatalf("unexpected error: %v", err) + testFunc := func(tc struct { + description string + fakeNodeHandler *FakeNodeHandler + clusterCIDR *net.IPNet + serviceCIDR *net.IPNet + subNetMaskSize int + expectedAllocatedCIDR string + allocatedCIDRs []string + }) { + allocator := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize) + // this is a bit of white box testing + for _, allocated := range tc.allocatedCIDRs { + _, cidr, err := net.ParseCIDR(allocated) + if err != nil { + t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err) + } + rangeAllocator, ok := allocator.(*rangeAllocator) + if !ok { + t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description) + return + } + if err = rangeAllocator.cidrs.occupy(cidr); err != nil { + t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) + } } + if err := allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]); err != nil { + t.Errorf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err) + } + if err := waitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil { + t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err) + } + found := false + seenCIDRs := []string{} + for _, updatedNode := range tc.fakeNodeHandler.getUpdatedNodesCopy() { + seenCIDRs = append(seenCIDRs, updatedNode.Spec.PodCIDR) + if updatedNode.Spec.PodCIDR == tc.expectedAllocatedCIDR { + found = true + break + } + } + if !found { + t.Errorf("%v: Unable to find allocated CIDR %v, found updated Nodes with CIDRs: %v", + tc.description, tc.expectedAllocatedCIDR, seenCIDRs) + } + } - ra := &rangeAllocator{ - clusterIP: clusterCIDR.IP.To4(), - clusterMaskSize: clusterMaskSize, - subNetMaskSize: tc.subNetMaskSize, - maxCIDRs: 1 << uint32(tc.subNetMaskSize-clusterMaskSize), - } - - _, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - got, err := ra.getIndexForCIDR(subnetCIDR) - if err == nil && tc.expectErr { - glog.Errorf("expected error but got null") - continue - } - - if err != nil && !tc.expectErr { - glog.Errorf("unexpected error: %v", err) - continue - } - - if got != tc.expectedBit { - glog.Errorf("expected %v, but got %v", tc.expectedBit, got) - } + for _, tc := range testCases { + testFunc(tc) } } -func TestOccupy(t *testing.T) { - cases := []struct { - clusterCIDRStr string - subNetMaskSize int - subNetCIDRStr string - expectedUsedBegin int - expectedUsedEnd int - expectErr bool +func TestAllocateOrOccupyCIDRFailure(t *testing.T) { + testCases := []struct { + description string + fakeNodeHandler *FakeNodeHandler + clusterCIDR *net.IPNet + serviceCIDR *net.IPNet + subNetMaskSize int + allocatedCIDRs []string }{ { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 16, - subNetCIDRStr: "127.0.0.0/8", - expectedUsedBegin: 0, - expectedUsedEnd: 256, - expectErr: false, - }, - { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 16, - subNetCIDRStr: "127.0.0.0/2", - expectedUsedBegin: 0, - expectedUsedEnd: 256, - expectErr: false, - }, - { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 16, - subNetCIDRStr: "127.0.0.0/16", - expectedUsedBegin: 0, - expectedUsedEnd: 0, - expectErr: false, - }, - { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 32, - subNetCIDRStr: "127.0.0.0/16", - expectedUsedBegin: 0, - expectedUsedEnd: 65535, - expectErr: false, - }, - { - clusterCIDRStr: "127.0.0.0/7", - subNetMaskSize: 16, - subNetCIDRStr: "127.0.0.0/15", - expectedUsedBegin: 256, - expectedUsedEnd: 257, - expectErr: false, - }, - { - clusterCIDRStr: "127.0.0.0/7", - subNetMaskSize: 15, - subNetCIDRStr: "127.0.0.0/15", - expectedUsedBegin: 128, - expectedUsedEnd: 128, - expectErr: false, - }, - { - clusterCIDRStr: "127.0.0.0/7", - subNetMaskSize: 18, - subNetCIDRStr: "127.0.0.0/15", - expectedUsedBegin: 1024, - expectedUsedEnd: 1031, - expectErr: false, + description: "When there's no ServiceCIDR return first CIDR in range", + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + }, + }, + }, + Clientset: fake.NewSimpleClientset(), + }, + clusterCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/28") + return clusterCIDR + }(), + serviceCIDR: nil, + subNetMaskSize: 30, + allocatedCIDRs: []string{"127.123.234.0/30", "127.123.234.4/30", "127.123.234.8/30", "127.123.234.12/30"}, }, } - for _, tc := range cases { - _, clusterCIDR, err := net.ParseCIDR(tc.clusterCIDRStr) - if err != nil { - t.Fatalf("unexpected error: %v", err) + testFunc := func(tc struct { + description string + fakeNodeHandler *FakeNodeHandler + clusterCIDR *net.IPNet + serviceCIDR *net.IPNet + subNetMaskSize int + allocatedCIDRs []string + }) { + allocator := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize) + // this is a bit of white box testing + for _, allocated := range tc.allocatedCIDRs { + _, cidr, err := net.ParseCIDR(allocated) + if err != nil { + t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err) + } + rangeAllocator, ok := allocator.(*rangeAllocator) + if !ok { + t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description) + return + } + err = rangeAllocator.cidrs.occupy(cidr) + if err != nil { + t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) + } } - clusterMask := clusterCIDR.Mask - clusterMaskSize, _ := clusterMask.Size() - - ra := &rangeAllocator{ - clusterCIDR: clusterCIDR, - clusterIP: clusterCIDR.IP.To4(), - clusterMaskSize: clusterMaskSize, - subNetMaskSize: tc.subNetMaskSize, - maxCIDRs: 1 << uint32(tc.subNetMaskSize-clusterMaskSize), + if err := allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]); err == nil { + t.Errorf("%v: unexpected success in AllocateOrOccupyCIDR: %v", tc.description, err) } - - _, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr) - if err != nil { - t.Fatalf("unexpected error: %v", err) + // We don't expect any updates, so just sleep for some time + time.Sleep(time.Second) + if len(tc.fakeNodeHandler.getUpdatedNodesCopy()) != 0 { + t.Fatalf("%v: unexpected update of nodes: %v", tc.description, tc.fakeNodeHandler.getUpdatedNodesCopy()) } - - err = ra.Occupy(subnetCIDR) - if err == nil && tc.expectErr { - t.Errorf("expected error but got none") - continue + seenCIDRs := []string{} + for _, updatedNode := range tc.fakeNodeHandler.getUpdatedNodesCopy() { + if updatedNode.Spec.PodCIDR != "" { + seenCIDRs = append(seenCIDRs, updatedNode.Spec.PodCIDR) + } } - if err != nil && !tc.expectErr { - t.Errorf("unexpected error: %v", err) - continue - } - - expectedUsed := big.Int{} - for i := tc.expectedUsedBegin; i <= tc.expectedUsedEnd; i++ { - expectedUsed.SetBit(&expectedUsed, i, 1) - } - if expectedUsed.Cmp(&ra.used) != 0 { - t.Errorf("error") + if len(seenCIDRs) != 0 { + t.Errorf("%v: Seen assigned CIDRs when not expected: %v", + tc.description, seenCIDRs) } } + for _, tc := range testCases { + testFunc(tc) + } +} + +func TestReleaseCIDRSuccess(t *testing.T) { + testCases := []struct { + description string + fakeNodeHandler *FakeNodeHandler + clusterCIDR *net.IPNet + serviceCIDR *net.IPNet + subNetMaskSize int + expectedAllocatedCIDRFirstRound string + expectedAllocatedCIDRSecondRound string + allocatedCIDRs []string + cidrsToRelease []string + }{ + { + description: "Correctly release preallocated CIDR", + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + }, + }, + }, + Clientset: fake.NewSimpleClientset(), + }, + clusterCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/28") + return clusterCIDR + }(), + serviceCIDR: nil, + subNetMaskSize: 30, + allocatedCIDRs: []string{"127.123.234.0/30", "127.123.234.4/30", "127.123.234.8/30", "127.123.234.12/30"}, + expectedAllocatedCIDRFirstRound: "", + cidrsToRelease: []string{"127.123.234.4/30"}, + expectedAllocatedCIDRSecondRound: "127.123.234.4/30", + }, + { + description: "Correctly recycle CIDR", + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + }, + }, + }, + Clientset: fake.NewSimpleClientset(), + }, + clusterCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/28") + return clusterCIDR + }(), + serviceCIDR: nil, + subNetMaskSize: 30, + expectedAllocatedCIDRFirstRound: "127.123.234.0/30", + cidrsToRelease: []string{"127.123.234.0/30"}, + expectedAllocatedCIDRSecondRound: "127.123.234.0/30", + }, + } + + testFunc := func(tc struct { + description string + fakeNodeHandler *FakeNodeHandler + clusterCIDR *net.IPNet + serviceCIDR *net.IPNet + subNetMaskSize int + expectedAllocatedCIDRFirstRound string + expectedAllocatedCIDRSecondRound string + allocatedCIDRs []string + cidrsToRelease []string + }) { + allocator := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize) + // this is a bit of white box testing + for _, allocated := range tc.allocatedCIDRs { + _, cidr, err := net.ParseCIDR(allocated) + if err != nil { + t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err) + } + rangeAllocator, ok := allocator.(*rangeAllocator) + if !ok { + t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description) + return + } + err = rangeAllocator.cidrs.occupy(cidr) + if err != nil { + t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) + } + } + err := allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]) + if tc.expectedAllocatedCIDRFirstRound != "" { + if err != nil { + t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err) + } + if err := waitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil { + t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err) + } + } else { + if err == nil { + t.Fatalf("%v: unexpected success in AllocateOrOccupyCIDR: %v", tc.description, err) + } + // We don't expect any updates here + time.Sleep(time.Second) + if len(tc.fakeNodeHandler.getUpdatedNodesCopy()) != 0 { + t.Fatalf("%v: unexpected update of nodes: %v", tc.description, tc.fakeNodeHandler.getUpdatedNodesCopy()) + } + } + + for _, cidrToRelease := range tc.cidrsToRelease { + nodeToRelease := api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "node0", + }, + } + nodeToRelease.Spec.PodCIDR = cidrToRelease + err = allocator.ReleaseCIDR(&nodeToRelease) + if err != nil { + t.Fatalf("%v: unexpected error in ReleaseCIDR: %v", tc.description, err) + } + } + + if err = allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]); err != nil { + t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err) + } + if err := waitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil { + t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err) + } + + found := false + seenCIDRs := []string{} + for _, updatedNode := range tc.fakeNodeHandler.getUpdatedNodesCopy() { + seenCIDRs = append(seenCIDRs, updatedNode.Spec.PodCIDR) + if updatedNode.Spec.PodCIDR == tc.expectedAllocatedCIDRSecondRound { + found = true + break + } + } + if !found { + t.Errorf("%v: Unable to find allocated CIDR %v, found updated Nodes with CIDRs: %v", + tc.description, tc.expectedAllocatedCIDRSecondRound, seenCIDRs) + } + } + for _, tc := range testCases { + testFunc(tc) + } } diff --git a/pkg/controller/node/cidr_set.go b/pkg/controller/node/cidr_set.go new file mode 100644 index 00000000000..353c4f0161c --- /dev/null +++ b/pkg/controller/node/cidr_set.go @@ -0,0 +1,142 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "encoding/binary" + "fmt" + "math/big" + "net" + "sync" +) + +type cidrSet struct { + sync.Mutex + clusterCIDR *net.IPNet + clusterIP net.IP + clusterMaskSize int + maxCIDRs int + nextCandidate int + used big.Int + subNetMaskSize int +} + +func newCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *cidrSet { + clusterMask := clusterCIDR.Mask + clusterMaskSize, _ := clusterMask.Size() + maxCIDRs := 1 << uint32(subNetMaskSize-clusterMaskSize) + return &cidrSet{ + clusterCIDR: clusterCIDR, + clusterIP: clusterCIDR.IP.To4(), + clusterMaskSize: clusterMaskSize, + maxCIDRs: maxCIDRs, + subNetMaskSize: subNetMaskSize, + } +} + +func (s *cidrSet) allocateNext() (*net.IPNet, error) { + s.Lock() + defer s.Unlock() + + nextUnused := -1 + for i := 0; i < s.maxCIDRs; i++ { + candidate := (i + s.nextCandidate) % s.maxCIDRs + if s.used.Bit(candidate) == 0 { + nextUnused = candidate + break + } + } + if nextUnused == -1 { + return nil, errCIDRRangeNoCIDRsRemaining + } + s.nextCandidate = (nextUnused + 1) % s.maxCIDRs + + s.used.SetBit(&s.used, nextUnused, 1) + + j := uint32(nextUnused) << uint32(32-s.subNetMaskSize) + ipInt := (binary.BigEndian.Uint32(s.clusterIP)) | j + ip := make([]byte, 4) + binary.BigEndian.PutUint32(ip, ipInt) + + return &net.IPNet{ + IP: ip, + Mask: net.CIDRMask(s.subNetMaskSize, 32), + }, nil +} + +func (s *cidrSet) release(cidr *net.IPNet) error { + used, err := s.getIndexForCIDR(cidr) + if err != nil { + return err + } + + s.Lock() + defer s.Unlock() + s.used.SetBit(&s.used, used, 0) + + return nil +} + +func (s *cidrSet) occupy(cidr *net.IPNet) (err error) { + begin, end := 0, s.maxCIDRs + cidrMask := cidr.Mask + maskSize, _ := cidrMask.Size() + + if !s.clusterCIDR.Contains(cidr.IP.Mask(s.clusterCIDR.Mask)) && !cidr.Contains(s.clusterCIDR.IP.Mask(cidr.Mask)) { + return fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR) + } + + if s.clusterMaskSize < maskSize { + subNetMask := net.CIDRMask(s.subNetMaskSize, 32) + begin, err = s.getIndexForCIDR(&net.IPNet{ + IP: cidr.IP.To4().Mask(subNetMask), + Mask: subNetMask, + }) + if err != nil { + return err + } + + ip := make([]byte, 4) + ipInt := binary.BigEndian.Uint32(cidr.IP) | (^binary.BigEndian.Uint32(cidr.Mask)) + binary.BigEndian.PutUint32(ip, ipInt) + end, err = s.getIndexForCIDR(&net.IPNet{ + IP: net.IP(ip).To4().Mask(subNetMask), + Mask: subNetMask, + }) + if err != nil { + return err + } + } + + s.Lock() + defer s.Unlock() + for i := begin; i <= end; i++ { + s.used.SetBit(&s.used, i, 1) + } + + return nil +} + +func (s *cidrSet) getIndexForCIDR(cidr *net.IPNet) (int, error) { + cidrIndex := (binary.BigEndian.Uint32(s.clusterIP) ^ binary.BigEndian.Uint32(cidr.IP.To4())) >> uint32(32-s.subNetMaskSize) + + if cidrIndex >= uint32(s.maxCIDRs) { + return 0, fmt.Errorf("CIDR: %v is out of the range of CIDR allocator", cidr) + } + + return int(cidrIndex), nil +} diff --git a/pkg/controller/node/cidr_set_test.go b/pkg/controller/node/cidr_set_test.go new file mode 100644 index 00000000000..f3e7165f052 --- /dev/null +++ b/pkg/controller/node/cidr_set_test.go @@ -0,0 +1,335 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "github.com/golang/glog" + "math/big" + "net" + "reflect" + "testing" +) + +func TestCIDRSetFullyAllocated(t *testing.T) { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/30") + a := newCIDRSet(clusterCIDR, 30) + + p, err := a.allocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if p.String() != "127.123.234.0/30" { + t.Fatalf("unexpected allocated cidr: %s", p.String()) + } + + _, err = a.allocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + + a.release(p) + p, err = a.allocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if p.String() != "127.123.234.0/30" { + t.Fatalf("unexpected allocated cidr: %s", p.String()) + } + _, err = a.allocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } +} + +func TestCIDRSet_RandomishAllocation(t *testing.T) { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/16") + a := newCIDRSet(clusterCIDR, 24) + // allocate all the CIDRs + var err error + cidrs := make([]*net.IPNet, 256) + + for i := 0; i < 256; i++ { + cidrs[i], err = a.allocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + + _, err = a.allocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + // release them all + for i := 0; i < 256; i++ { + a.release(cidrs[i]) + } + + // allocate the CIDRs again + rcidrs := make([]*net.IPNet, 256) + for i := 0; i < 256; i++ { + rcidrs[i], err = a.allocateNext() + if err != nil { + t.Fatalf("unexpected error: %d, %v", i, err) + } + } + _, err = a.allocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + + if !reflect.DeepEqual(cidrs, rcidrs) { + t.Fatalf("expected re-allocated cidrs are the same collection") + } +} + +func TestCIDRSet_AllocationOccupied(t *testing.T) { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/16") + a := newCIDRSet(clusterCIDR, 24) + + // allocate all the CIDRs + var err error + cidrs := make([]*net.IPNet, 256) + + for i := 0; i < 256; i++ { + cidrs[i], err = a.allocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + + _, err = a.allocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + // release them all + for i := 0; i < 256; i++ { + a.release(cidrs[i]) + } + // occupy the last 128 CIDRs + for i := 128; i < 256; i++ { + a.occupy(cidrs[i]) + } + + // allocate the first 128 CIDRs again + rcidrs := make([]*net.IPNet, 128) + for i := 0; i < 128; i++ { + rcidrs[i], err = a.allocateNext() + if err != nil { + t.Fatalf("unexpected error: %d, %v", i, err) + } + } + _, err = a.allocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + + // check Occupy() work properly + for i := 128; i < 256; i++ { + rcidrs = append(rcidrs, cidrs[i]) + } + if !reflect.DeepEqual(cidrs, rcidrs) { + t.Fatalf("expected re-allocated cidrs are the same collection") + } +} + +func TestGetBitforCIDR(t *testing.T) { + cases := []struct { + clusterCIDRStr string + subNetMaskSize int + subNetCIDRStr string + expectedBit int + expectErr bool + }{ + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.0.0.0/16", + expectedBit: 0, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.123.0.0/16", + expectedBit: 123, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.168.0.0/16", + expectedBit: 168, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.224.0.0/16", + expectedBit: 224, + expectErr: false, + }, + { + clusterCIDRStr: "192.168.0.0/16", + subNetMaskSize: 24, + subNetCIDRStr: "192.168.12.0/24", + expectedBit: 12, + expectErr: false, + }, + { + clusterCIDRStr: "192.168.0.0/16", + subNetMaskSize: 24, + subNetCIDRStr: "192.168.151.0/24", + expectedBit: 151, + expectErr: false, + }, + { + clusterCIDRStr: "192.168.0.0/16", + subNetMaskSize: 24, + subNetCIDRStr: "127.168.224.0/24", + expectErr: true, + }, + } + + for _, tc := range cases { + _, clusterCIDR, err := net.ParseCIDR(tc.clusterCIDRStr) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + cs := newCIDRSet(clusterCIDR, tc.subNetMaskSize) + + _, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + got, err := cs.getIndexForCIDR(subnetCIDR) + if err == nil && tc.expectErr { + glog.Errorf("expected error but got null") + continue + } + + if err != nil && !tc.expectErr { + glog.Errorf("unexpected error: %v", err) + continue + } + + if got != tc.expectedBit { + glog.Errorf("expected %v, but got %v", tc.expectedBit, got) + } + } +} + +func TestOccupy(t *testing.T) { + cases := []struct { + clusterCIDRStr string + subNetMaskSize int + subNetCIDRStr string + expectedUsedBegin int + expectedUsedEnd int + expectErr bool + }{ + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.0.0.0/8", + expectedUsedBegin: 0, + expectedUsedEnd: 256, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.0.0.0/2", + expectedUsedBegin: 0, + expectedUsedEnd: 256, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.0.0.0/16", + expectedUsedBegin: 0, + expectedUsedEnd: 0, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 32, + subNetCIDRStr: "127.0.0.0/16", + expectedUsedBegin: 0, + expectedUsedEnd: 65535, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/7", + subNetMaskSize: 16, + subNetCIDRStr: "127.0.0.0/15", + expectedUsedBegin: 256, + expectedUsedEnd: 257, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/7", + subNetMaskSize: 15, + subNetCIDRStr: "127.0.0.0/15", + expectedUsedBegin: 128, + expectedUsedEnd: 128, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/7", + subNetMaskSize: 18, + subNetCIDRStr: "127.0.0.0/15", + expectedUsedBegin: 1024, + expectedUsedEnd: 1031, + expectErr: false, + }, + } + + for _, tc := range cases { + _, clusterCIDR, err := net.ParseCIDR(tc.clusterCIDRStr) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + cs := newCIDRSet(clusterCIDR, tc.subNetMaskSize) + + _, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + err = cs.occupy(subnetCIDR) + if err == nil && tc.expectErr { + t.Errorf("expected error but got none") + continue + } + if err != nil && !tc.expectErr { + t.Errorf("unexpected error: %v", err) + continue + } + + expectedUsed := big.Int{} + for i := tc.expectedUsedBegin; i <= tc.expectedUsedEnd; i++ { + expectedUsed.SetBit(&expectedUsed, i, 1) + } + if expectedUsed.Cmp(&cs.used) != 0 { + t.Errorf("error") + } + } +} diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index f36ac78e553..7a79b6e98c5 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -57,13 +57,8 @@ var ( const ( // nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update. nodeStatusUpdateRetry = 5 - // podCIDRUpdateRetry controls the number of retries of writing Node.Spec.PodCIDR update. - podCIDRUpdateRetry = 5 // controls how often NodeController will try to evict Pods from non-responsive Nodes. nodeEvictionPeriod = 100 * time.Millisecond - // controls how many NodeSpec updates NC can process in any moment. - cidrUpdateWorkers = 10 - cidrUpdateQueueSize = 5000 ) type nodeStatusData struct { @@ -72,11 +67,6 @@ type nodeStatusData struct { status api.NodeStatus } -type nodeAndCIDR struct { - nodeName string - cidr *net.IPNet -} - type NodeController struct { allocateNodeCIDRs bool cloud cloudprovider.Interface @@ -142,8 +132,6 @@ type NodeController struct { // It is enabled when all Nodes observed by the NodeController are NotReady and disabled // when NC sees any healthy Node. This is a temporary fix for v1.3. networkSegmentationMode bool - - nodeCIDRUpdateChannel chan nodeAndCIDR } // NewNodeController returns a new node controller to sync instances from cloudprovider. @@ -206,7 +194,6 @@ func NewNodeController( allocateNodeCIDRs: allocateNodeCIDRs, forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) }, nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) }, - nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize), } nc.podStore.Indexer, nc.podController = framework.NewIndexerInformer( @@ -233,8 +220,20 @@ func NewNodeController( nodeEventHandlerFuncs := framework.ResourceEventHandlerFuncs{} if nc.allocateNodeCIDRs { nodeEventHandlerFuncs = framework.ResourceEventHandlerFuncs{ - AddFunc: nc.allocateOrOccupyCIDR, - DeleteFunc: nc.recycleCIDR, + AddFunc: func(obj interface{}) { + node := obj.(*api.Node) + err := nc.cidrAllocator.AllocateOrOccupyCIDR(node) + if err != nil { + glog.Errorf("Error allocating CIDR: %v", err) + } + }, + DeleteFunc: func(obj interface{}) { + node := obj.(*api.Node) + err := nc.cidrAllocator.ReleaseCIDR(node) + if err != nil { + glog.Errorf("Error releasing CIDR: %v", err) + } + }, } } @@ -267,7 +266,7 @@ func NewNodeController( ) if allocateNodeCIDRs { - nc.cidrAllocator = NewCIDRRangeAllocator(clusterCIDR, nodeCIDRMaskSize) + nc.cidrAllocator = NewCIDRRangeAllocator(kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize) } return nc @@ -275,14 +274,6 @@ func NewNodeController( // Run starts an asynchronous loop that monitors the status of cluster nodes. func (nc *NodeController) Run(period time.Duration) { - if nc.allocateNodeCIDRs { - if nc.serviceCIDR != nil { - nc.filterOutServiceRange() - } else { - glog.Info("No Service CIDR provided. Skipping filtering out service addresses.") - } - } - go nc.nodeController.Run(wait.NeverStop) go nc.podController.Run(wait.NeverStop) go nc.daemonSetController.Run(wait.NeverStop) @@ -351,107 +342,6 @@ func (nc *NodeController) Run(period time.Duration) { }, nodeEvictionPeriod, wait.NeverStop) go wait.Until(nc.cleanupOrphanedPods, 30*time.Second, wait.NeverStop) - - for i := 0; i < cidrUpdateWorkers; i++ { - go func(stopChan <-chan struct{}) { - for { - select { - case workItem, ok := <-nc.nodeCIDRUpdateChannel: - if !ok { - glog.Warning("NodeCIDRUpdateChannel read returned false.") - return - } - nc.updateCIDRAllocation(workItem) - case <-stopChan: - glog.V(0).Info("StopChannel is closed.") - return - } - } - }(wait.NeverStop) - } -} - -func (nc *NodeController) filterOutServiceRange() { - if !nc.clusterCIDR.Contains(nc.serviceCIDR.IP.Mask(nc.clusterCIDR.Mask)) && !nc.serviceCIDR.Contains(nc.clusterCIDR.IP.Mask(nc.serviceCIDR.Mask)) { - return - } - - if err := nc.cidrAllocator.Occupy(nc.serviceCIDR); err != nil { - glog.Errorf("Error filtering out service cidr: %v", err) - } -} - -func (nc *NodeController) updateCIDRAllocation(data nodeAndCIDR) { - var err error - var node *api.Node - for rep := 0; rep < podCIDRUpdateRetry; rep++ { - node, err = nc.kubeClient.Core().Nodes().Get(data.nodeName) - if err != nil { - glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err) - continue - } - node.Spec.PodCIDR = data.cidr.String() - if _, err := nc.kubeClient.Core().Nodes().Update(node); err != nil { - glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err) - } else { - break - } - } - if err != nil { - nc.recordNodeStatusChange(node, "CIDRAssignmentFailed") - glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err) - err := nc.cidrAllocator.Release(data.cidr) - glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, err) - } -} - -// allocateOrOccupyCIDR looks at each new observed node, assigns it a valid CIDR -// if it doesn't currently have one or mark the CIDR as used if the node already have one. -func (nc *NodeController) allocateOrOccupyCIDR(obj interface{}) { - node := obj.(*api.Node) - if node.Spec.PodCIDR != "" { - _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) - if err != nil { - glog.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) - return - } - if err := nc.cidrAllocator.Occupy(podCIDR); err != nil { - glog.Errorf("failed to mark cidr as occupied :%v", err) - return - } - return - } - podCIDR, err := nc.cidrAllocator.AllocateNext() - if err != nil { - nc.recordNodeStatusChange(node, "CIDRNotAvailable") - return - } - - glog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR) - nc.nodeCIDRUpdateChannel <- nodeAndCIDR{ - nodeName: node.Name, - cidr: podCIDR, - } -} - -// recycleCIDR recycles the CIDR of a removed node -func (nc *NodeController) recycleCIDR(obj interface{}) { - node := obj.(*api.Node) - - if node.Spec.PodCIDR == "" { - return - } - - _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) - if err != nil { - glog.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) - return - } - - glog.V(4).Infof("recycle node %s CIDR %s", node.Name, podCIDR) - if err := nc.cidrAllocator.Release(podCIDR); err != nil { - glog.Errorf("failed to release cidr: %v", err) - } } var gracefulDeletionVersion = version.MustParse("v1.1.0") @@ -625,7 +515,7 @@ func (nc *NodeController) monitorNodeStatus() error { // Report node event. if currentReadyCondition.Status != api.ConditionTrue && observedReadyCondition.Status == api.ConditionTrue { - nc.recordNodeStatusChange(node, "NodeNotReady") + recordNodeStatusChange(nc.recorder, node, "NodeNotReady") if err = nc.markAllPodsNotReady(node.Name); err != nil { utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err)) } @@ -721,7 +611,7 @@ func (nc *NodeController) recordNodeEvent(nodeName, eventtype, reason, event str nc.recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event) } -func (nc *NodeController) recordNodeStatusChange(node *api.Node, new_status string) { +func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_status string) { ref := &api.ObjectReference{ Kind: "Node", Name: node.Name, @@ -731,7 +621,7 @@ func (nc *NodeController) recordNodeStatusChange(node *api.Node, new_status stri glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name) // TODO: This requires a transaction, either both node status is updated // and event is recorded or neither should happen, see issue #6055. - nc.recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status) + recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status) } // For a given node checks its conditions and tries to update it. Returns grace period to which given node diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 9791fe4d60c..8a3d392099a 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -17,24 +17,19 @@ limitations under the License. package node import ( - "errors" - "sync" "testing" "time" "k8s.io/kubernetes/pkg/api" - apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" - unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" "k8s.io/kubernetes/pkg/util/diff" "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/wait" - "k8s.io/kubernetes/pkg/watch" ) const ( @@ -43,133 +38,6 @@ const ( testNodeMonitorPeriod = 5 * time.Second ) -// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It -// allows test cases to have fine-grained control over mock behaviors. We also need -// PodsInterface and PodInterface to test list & delet pods, which is implemented in -// the embedded client.Fake field. -type FakeNodeHandler struct { - *fake.Clientset - - // Input: Hooks determine if request is valid or not - CreateHook func(*FakeNodeHandler, *api.Node) bool - Existing []*api.Node - - // Output - CreatedNodes []*api.Node - DeletedNodes []*api.Node - UpdatedNodes []*api.Node - UpdatedNodeStatuses []*api.Node - RequestCount int - - // Synchronization - createLock sync.Mutex - deleteWaitChan chan struct{} -} - -type FakeLegacyHandler struct { - unversionedcore.CoreInterface - n *FakeNodeHandler -} - -func (c *FakeNodeHandler) Core() unversionedcore.CoreInterface { - return &FakeLegacyHandler{c.Clientset.Core(), c} -} - -func (m *FakeLegacyHandler) Nodes() unversionedcore.NodeInterface { - return m.n -} - -func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) { - m.createLock.Lock() - defer func() { - m.RequestCount++ - m.createLock.Unlock() - }() - for _, n := range m.Existing { - if n.Name == node.Name { - return nil, apierrors.NewAlreadyExists(api.Resource("nodes"), node.Name) - } - } - if m.CreateHook == nil || m.CreateHook(m, node) { - nodeCopy := *node - m.CreatedNodes = append(m.CreatedNodes, &nodeCopy) - return node, nil - } else { - return nil, errors.New("Create error.") - } -} - -func (m *FakeNodeHandler) Get(name string) (*api.Node, error) { - return nil, nil -} - -func (m *FakeNodeHandler) List(opts api.ListOptions) (*api.NodeList, error) { - defer func() { m.RequestCount++ }() - var nodes []*api.Node - for i := 0; i < len(m.UpdatedNodes); i++ { - if !contains(m.UpdatedNodes[i], m.DeletedNodes) { - nodes = append(nodes, m.UpdatedNodes[i]) - } - } - for i := 0; i < len(m.Existing); i++ { - if !contains(m.Existing[i], m.DeletedNodes) && !contains(m.Existing[i], nodes) { - nodes = append(nodes, m.Existing[i]) - } - } - for i := 0; i < len(m.CreatedNodes); i++ { - if !contains(m.Existing[i], m.DeletedNodes) && !contains(m.CreatedNodes[i], nodes) { - nodes = append(nodes, m.CreatedNodes[i]) - } - } - nodeList := &api.NodeList{} - for _, node := range nodes { - nodeList.Items = append(nodeList.Items, *node) - } - return nodeList, nil -} - -func (m *FakeNodeHandler) Delete(id string, opt *api.DeleteOptions) error { - defer func() { - if m.deleteWaitChan != nil { - m.deleteWaitChan <- struct{}{} - } - }() - m.DeletedNodes = append(m.DeletedNodes, newNode(id)) - m.RequestCount++ - return nil -} - -func (m *FakeNodeHandler) DeleteCollection(opt *api.DeleteOptions, listOpts api.ListOptions) error { - return nil -} - -func (m *FakeNodeHandler) Update(node *api.Node) (*api.Node, error) { - nodeCopy := *node - m.UpdatedNodes = append(m.UpdatedNodes, &nodeCopy) - m.RequestCount++ - return node, nil -} - -func (m *FakeNodeHandler) UpdateStatus(node *api.Node) (*api.Node, error) { - nodeCopy := *node - m.UpdatedNodeStatuses = append(m.UpdatedNodeStatuses, &nodeCopy) - m.RequestCount++ - return node, nil -} - -func (m *FakeNodeHandler) PatchStatus(nodeName string, data []byte) (*api.Node, error) { - m.RequestCount++ - return &api.Node{}, nil -} - -func (m *FakeNodeHandler) Watch(opts api.ListOptions) (watch.Interface, error) { - return nil, nil -} - -func (m *FakeNodeHandler) Patch(name string, pt api.PatchType, data []byte) (*api.Node, error) { - return nil, nil -} - func TestMonitorNodeStatusEvictPods(t *testing.T) { fakeNow := unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) evictionTimeout := 10 * time.Minute @@ -1409,49 +1277,3 @@ func TestCleanupOrphanedPods(t *testing.T) { t.Fatalf("expected deleted pod name to be 'c', but got: %q", deletedPodName) } } - -func newNode(name string) *api.Node { - return &api.Node{ - ObjectMeta: api.ObjectMeta{Name: name}, - Spec: api.NodeSpec{ - ExternalID: name, - }, - Status: api.NodeStatus{ - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceCPU): resource.MustParse("10"), - api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), - }, - }, - } -} - -func newPod(name, host string) *api.Pod { - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Namespace: "default", - Name: name, - }, - Spec: api.PodSpec{ - NodeName: host, - }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ - { - Type: api.PodReady, - Status: api.ConditionTrue, - }, - }, - }, - } - - return pod -} - -func contains(node *api.Node, nodes []*api.Node) bool { - for i := 0; i < len(nodes); i++ { - if node.Name == nodes[i].Name { - return true - } - } - return false -} diff --git a/pkg/controller/node/test_utils.go b/pkg/controller/node/test_utils.go new file mode 100644 index 00000000000..a7529b9868c --- /dev/null +++ b/pkg/controller/node/test_utils.go @@ -0,0 +1,237 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "errors" + "sync" + + "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" + "k8s.io/kubernetes/pkg/watch" +) + +// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It +// allows test cases to have fine-grained control over mock behaviors. We also need +// PodsInterface and PodInterface to test list & delet pods, which is implemented in +// the embedded client.Fake field. +type FakeNodeHandler struct { + *fake.Clientset + + // Input: Hooks determine if request is valid or not + CreateHook func(*FakeNodeHandler, *api.Node) bool + Existing []*api.Node + + // Output + CreatedNodes []*api.Node + DeletedNodes []*api.Node + UpdatedNodes []*api.Node + UpdatedNodeStatuses []*api.Node + RequestCount int + + // Synchronization + lock sync.Mutex + deleteWaitChan chan struct{} +} + +type FakeLegacyHandler struct { + unversionedcore.CoreInterface + n *FakeNodeHandler +} + +func (c *FakeNodeHandler) getUpdatedNodesCopy() []*api.Node { + c.lock.Lock() + defer c.lock.Unlock() + updatedNodesCopy := make([]*api.Node, len(c.UpdatedNodes), len(c.UpdatedNodes)) + for i, ptr := range c.UpdatedNodes { + updatedNodesCopy[i] = ptr + } + return updatedNodesCopy +} + +func (c *FakeNodeHandler) Core() unversionedcore.CoreInterface { + return &FakeLegacyHandler{c.Clientset.Core(), c} +} + +func (m *FakeLegacyHandler) Nodes() unversionedcore.NodeInterface { + return m.n +} + +func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) { + m.lock.Lock() + defer func() { + m.RequestCount++ + m.lock.Unlock() + }() + for _, n := range m.Existing { + if n.Name == node.Name { + return nil, apierrors.NewAlreadyExists(api.Resource("nodes"), node.Name) + } + } + if m.CreateHook == nil || m.CreateHook(m, node) { + nodeCopy := *node + m.CreatedNodes = append(m.CreatedNodes, &nodeCopy) + return node, nil + } else { + return nil, errors.New("Create error.") + } +} + +func (m *FakeNodeHandler) Get(name string) (*api.Node, error) { + m.lock.Lock() + defer func() { + m.RequestCount++ + m.lock.Unlock() + }() + for i := range m.Existing { + if m.Existing[i].Name == name { + nodeCopy := *m.Existing[i] + return &nodeCopy, nil + } + } + return nil, nil +} + +func (m *FakeNodeHandler) List(opts api.ListOptions) (*api.NodeList, error) { + m.lock.Lock() + defer func() { + m.RequestCount++ + m.lock.Unlock() + }() + var nodes []*api.Node + for i := 0; i < len(m.UpdatedNodes); i++ { + if !contains(m.UpdatedNodes[i], m.DeletedNodes) { + nodes = append(nodes, m.UpdatedNodes[i]) + } + } + for i := 0; i < len(m.Existing); i++ { + if !contains(m.Existing[i], m.DeletedNodes) && !contains(m.Existing[i], nodes) { + nodes = append(nodes, m.Existing[i]) + } + } + for i := 0; i < len(m.CreatedNodes); i++ { + if !contains(m.Existing[i], m.DeletedNodes) && !contains(m.CreatedNodes[i], nodes) { + nodes = append(nodes, m.CreatedNodes[i]) + } + } + nodeList := &api.NodeList{} + for _, node := range nodes { + nodeList.Items = append(nodeList.Items, *node) + } + return nodeList, nil +} + +func (m *FakeNodeHandler) Delete(id string, opt *api.DeleteOptions) error { + m.lock.Lock() + defer func() { + m.RequestCount++ + if m.deleteWaitChan != nil { + m.deleteWaitChan <- struct{}{} + } + m.lock.Unlock() + }() + m.DeletedNodes = append(m.DeletedNodes, newNode(id)) + return nil +} + +func (m *FakeNodeHandler) DeleteCollection(opt *api.DeleteOptions, listOpts api.ListOptions) error { + return nil +} + +func (m *FakeNodeHandler) Update(node *api.Node) (*api.Node, error) { + m.lock.Lock() + defer func() { + m.RequestCount++ + m.lock.Unlock() + }() + nodeCopy := *node + m.UpdatedNodes = append(m.UpdatedNodes, &nodeCopy) + return node, nil +} + +func (m *FakeNodeHandler) UpdateStatus(node *api.Node) (*api.Node, error) { + m.lock.Lock() + defer func() { + m.RequestCount++ + m.lock.Unlock() + }() + nodeCopy := *node + m.UpdatedNodeStatuses = append(m.UpdatedNodeStatuses, &nodeCopy) + return node, nil +} + +func (m *FakeNodeHandler) PatchStatus(nodeName string, data []byte) (*api.Node, error) { + m.RequestCount++ + return &api.Node{}, nil +} + +func (m *FakeNodeHandler) Watch(opts api.ListOptions) (watch.Interface, error) { + return nil, nil +} + +func (m *FakeNodeHandler) Patch(name string, pt api.PatchType, data []byte) (*api.Node, error) { + return nil, nil +} + +func newNode(name string) *api.Node { + return &api.Node{ + ObjectMeta: api.ObjectMeta{Name: name}, + Spec: api.NodeSpec{ + ExternalID: name, + }, + Status: api.NodeStatus{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceCPU): resource.MustParse("10"), + api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), + }, + }, + } +} + +func newPod(name, host string) *api.Pod { + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Namespace: "default", + Name: name, + }, + Spec: api.PodSpec{ + NodeName: host, + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: api.ConditionTrue, + }, + }, + }, + } + + return pod +} + +func contains(node *api.Node, nodes []*api.Node) bool { + for i := 0; i < len(nodes); i++ { + if node.Name == nodes[i].Name { + return true + } + } + return false +}