mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Merge pull request #51633 from deads2k/controller-03-wait
Automatic merge from submit-queue (batch tested with PRs 51707, 51662, 51723, 50163, 51633) update GC controller to wait until controllers have been initialized … fixes #51013 Alternative to https://github.com/kubernetes/kubernetes/pull/51492 which keeps those few controllers (only one) from starting the informers early.
This commit is contained in:
		@@ -157,6 +157,7 @@ func Run(s *options.CMServer) error {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		ctx.InformerFactory.Start(ctx.Stop)
 | 
							ctx.InformerFactory.Start(ctx.Stop)
 | 
				
			||||||
 | 
							close(ctx.InformersStarted)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		select {}
 | 
							select {}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -264,6 +265,10 @@ type ControllerContext struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// Stop is the stop channel
 | 
						// Stop is the stop channel
 | 
				
			||||||
	Stop <-chan struct{}
 | 
						Stop <-chan struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// InformersStarted is closed after all of the controllers have been initialized and are running.  After this point it is safe,
 | 
				
			||||||
 | 
						// for an individual controller to start the shared informers. Before it is closed, they should not.
 | 
				
			||||||
 | 
						InformersStarted chan struct{}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c ControllerContext) IsControllerEnabled(name string) bool {
 | 
					func (c ControllerContext) IsControllerEnabled(name string) bool {
 | 
				
			||||||
@@ -443,6 +448,7 @@ func CreateControllerContext(s *options.CMServer, rootClientBuilder, clientBuild
 | 
				
			|||||||
		AvailableResources: availableResources,
 | 
							AvailableResources: availableResources,
 | 
				
			||||||
		Cloud:              cloud,
 | 
							Cloud:              cloud,
 | 
				
			||||||
		Stop:               stop,
 | 
							Stop:               stop,
 | 
				
			||||||
 | 
							InformersStarted:   make(chan struct{}),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return ctx, nil
 | 
						return ctx, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -330,6 +330,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
 | 
				
			|||||||
		deletableResources,
 | 
							deletableResources,
 | 
				
			||||||
		ignoredResources,
 | 
							ignoredResources,
 | 
				
			||||||
		ctx.InformerFactory,
 | 
							ctx.InformerFactory,
 | 
				
			||||||
 | 
							ctx.InformersStarted,
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return true, fmt.Errorf("Failed to start the generic garbage collector: %v", err)
 | 
							return true, fmt.Errorf("Failed to start the generic garbage collector: %v", err)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -86,6 +86,7 @@ func NewGarbageCollector(
 | 
				
			|||||||
	deletableResources map[schema.GroupVersionResource]struct{},
 | 
						deletableResources map[schema.GroupVersionResource]struct{},
 | 
				
			||||||
	ignoredResources map[schema.GroupResource]struct{},
 | 
						ignoredResources map[schema.GroupResource]struct{},
 | 
				
			||||||
	sharedInformers informers.SharedInformerFactory,
 | 
						sharedInformers informers.SharedInformerFactory,
 | 
				
			||||||
 | 
						informersStarted <-chan struct{},
 | 
				
			||||||
) (*GarbageCollector, error) {
 | 
					) (*GarbageCollector, error) {
 | 
				
			||||||
	attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
 | 
						attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
 | 
				
			||||||
	attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan")
 | 
						attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan")
 | 
				
			||||||
@@ -100,6 +101,7 @@ func NewGarbageCollector(
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	gb := &GraphBuilder{
 | 
						gb := &GraphBuilder{
 | 
				
			||||||
		metaOnlyClientPool:                  metaOnlyClientPool,
 | 
							metaOnlyClientPool:                  metaOnlyClientPool,
 | 
				
			||||||
 | 
							informersStarted:                    informersStarted,
 | 
				
			||||||
		registeredRateLimiterForControllers: NewRegisteredRateLimiter(deletableResources),
 | 
							registeredRateLimiterForControllers: NewRegisteredRateLimiter(deletableResources),
 | 
				
			||||||
		restMapper:                          mapper,
 | 
							restMapper:                          mapper,
 | 
				
			||||||
		graphChanges:                        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"),
 | 
							graphChanges:                        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"),
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -72,7 +72,9 @@ func TestGarbageCollectorConstruction(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// No monitor will be constructed for the non-core resource, but the GC
 | 
						// No monitor will be constructed for the non-core resource, but the GC
 | 
				
			||||||
	// construction will not fail.
 | 
						// construction will not fail.
 | 
				
			||||||
	gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, twoResources, map[schema.GroupResource]struct{}{}, sharedInformers)
 | 
						alwaysStarted := make(chan struct{})
 | 
				
			||||||
 | 
						close(alwaysStarted)
 | 
				
			||||||
 | 
						gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, twoResources, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		t.Fatal(err)
 | 
							t.Fatal(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -174,7 +176,9 @@ func setupGC(t *testing.T, config *restclient.Config) garbageCollector {
 | 
				
			|||||||
	podResource := map[schema.GroupVersionResource]struct{}{{Version: "v1", Resource: "pods"}: {}}
 | 
						podResource := map[schema.GroupVersionResource]struct{}{{Version: "v1", Resource: "pods"}: {}}
 | 
				
			||||||
	client := fake.NewSimpleClientset()
 | 
						client := fake.NewSimpleClientset()
 | 
				
			||||||
	sharedInformers := informers.NewSharedInformerFactory(client, 0)
 | 
						sharedInformers := informers.NewSharedInformerFactory(client, 0)
 | 
				
			||||||
	gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, &testRESTMapper{api.Registry.RESTMapper()}, podResource, ignoredResources, sharedInformers)
 | 
						alwaysStarted := make(chan struct{})
 | 
				
			||||||
 | 
						close(alwaysStarted)
 | 
				
			||||||
 | 
						gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, &testRESTMapper{api.Registry.RESTMapper()}, podResource, ignoredResources, sharedInformers, alwaysStarted)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		t.Fatal(err)
 | 
							t.Fatal(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -358,8 +362,11 @@ func TestProcessEvent(t *testing.T) {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						alwaysStarted := make(chan struct{})
 | 
				
			||||||
 | 
						close(alwaysStarted)
 | 
				
			||||||
	for _, scenario := range testScenarios {
 | 
						for _, scenario := range testScenarios {
 | 
				
			||||||
		dependencyGraphBuilder := &GraphBuilder{
 | 
							dependencyGraphBuilder := &GraphBuilder{
 | 
				
			||||||
 | 
								informersStarted: alwaysStarted,
 | 
				
			||||||
			graphChanges:     workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
 | 
								graphChanges:     workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
 | 
				
			||||||
			uidToNode: &concurrentUIDToNode{
 | 
								uidToNode: &concurrentUIDToNode{
 | 
				
			||||||
				uidToNodeLock: sync.RWMutex{},
 | 
									uidToNodeLock: sync.RWMutex{},
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -78,6 +78,10 @@ type GraphBuilder struct {
 | 
				
			|||||||
	// dependencyGraphBuilder
 | 
						// dependencyGraphBuilder
 | 
				
			||||||
	monitors    monitors
 | 
						monitors    monitors
 | 
				
			||||||
	monitorLock sync.Mutex
 | 
						monitorLock sync.Mutex
 | 
				
			||||||
 | 
						// informersStarted is closed after after all of the controllers have been initialized and are running.
 | 
				
			||||||
 | 
						// After that it is safe to start them here, before that it is not.
 | 
				
			||||||
 | 
						informersStarted <-chan struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// stopCh drives shutdown. If it is nil, it indicates that Run() has not been
 | 
						// stopCh drives shutdown. If it is nil, it indicates that Run() has not been
 | 
				
			||||||
	// called yet. If it is non-nil, then when closed it indicates everything
 | 
						// called yet. If it is non-nil, then when closed it indicates everything
 | 
				
			||||||
	// should shut down.
 | 
						// should shut down.
 | 
				
			||||||
@@ -279,6 +283,10 @@ func (gb *GraphBuilder) startMonitors() {
 | 
				
			|||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// we're waiting until after the informer start that happens once all the controllers are initialized.  This ensures
 | 
				
			||||||
 | 
						// that they don't get unexpected events on their work queues.
 | 
				
			||||||
 | 
						<-gb.informersStarted
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	monitors := gb.monitors
 | 
						monitors := gb.monitors
 | 
				
			||||||
	started := 0
 | 
						started := 0
 | 
				
			||||||
	for _, monitor := range monitors {
 | 
						for _, monitor := range monitors {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -242,6 +242,8 @@ func setup(t *testing.T, workerCount int) *testContext {
 | 
				
			|||||||
	metaOnlyClientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc)
 | 
						metaOnlyClientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc)
 | 
				
			||||||
	clientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc)
 | 
						clientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc)
 | 
				
			||||||
	sharedInformers := informers.NewSharedInformerFactory(clientSet, 0)
 | 
						sharedInformers := informers.NewSharedInformerFactory(clientSet, 0)
 | 
				
			||||||
 | 
						alwaysStarted := make(chan struct{})
 | 
				
			||||||
 | 
						close(alwaysStarted)
 | 
				
			||||||
	gc, err := garbagecollector.NewGarbageCollector(
 | 
						gc, err := garbagecollector.NewGarbageCollector(
 | 
				
			||||||
		metaOnlyClientPool,
 | 
							metaOnlyClientPool,
 | 
				
			||||||
		clientPool,
 | 
							clientPool,
 | 
				
			||||||
@@ -249,6 +251,7 @@ func setup(t *testing.T, workerCount int) *testContext {
 | 
				
			|||||||
		deletableResources,
 | 
							deletableResources,
 | 
				
			||||||
		garbagecollector.DefaultIgnoredResources(),
 | 
							garbagecollector.DefaultIgnoredResources(),
 | 
				
			||||||
		sharedInformers,
 | 
							sharedInformers,
 | 
				
			||||||
 | 
							alwaysStarted,
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		t.Fatalf("failed to create garbage collector: %v", err)
 | 
							t.Fatalf("failed to create garbage collector: %v", err)
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user