mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #117137 from p0lyn0mial/upstream-streaming-api-deadlock
cacher: prevent a potential deadlock
This commit is contained in:
		@@ -592,6 +592,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
 | 
				
			|||||||
		identifier,
 | 
							identifier,
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// note that c.waitUntilWatchCacheFreshAndForceAllEvents must be called without
 | 
				
			||||||
 | 
						// the c.watchCache.RLock held otherwise we are at risk of a deadlock
 | 
				
			||||||
 | 
						// mainly because c.watchCache.processEvent method won't be able to make progress
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						// moreover even though the c.waitUntilWatchCacheFreshAndForceAllEvents acquires a lock
 | 
				
			||||||
 | 
						// it is safe to release the lock after the method finishes because we don't require
 | 
				
			||||||
 | 
						// any atomicity between the call to the method and further calls that actually get the events.
 | 
				
			||||||
 | 
						forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return newErrWatcher(err), nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// We explicitly use thread unsafe version and do locking ourself to ensure that
 | 
						// We explicitly use thread unsafe version and do locking ourself to ensure that
 | 
				
			||||||
	// no new events will be processed in the meantime. The watchCache will be unlocked
 | 
						// no new events will be processed in the meantime. The watchCache will be unlocked
 | 
				
			||||||
	// on return from this function.
 | 
						// on return from this function.
 | 
				
			||||||
@@ -599,10 +611,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
 | 
				
			|||||||
	// underlying watchCache is calling processEvent under its lock.
 | 
						// underlying watchCache is calling processEvent under its lock.
 | 
				
			||||||
	c.watchCache.RLock()
 | 
						c.watchCache.RLock()
 | 
				
			||||||
	defer c.watchCache.RUnlock()
 | 
						defer c.watchCache.RUnlock()
 | 
				
			||||||
	forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts)
 | 
					
 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return newErrWatcher(err), nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	startWatchRV := startWatchResourceVersionFn()
 | 
						startWatchRV := startWatchResourceVersionFn()
 | 
				
			||||||
	var cacheInterval *watchCacheInterval
 | 
						var cacheInterval *watchCacheInterval
 | 
				
			||||||
	if forceAllEvents {
 | 
						if forceAllEvents {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1823,6 +1823,7 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
 | 
					func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
 | 
				
			||||||
 | 
						defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
 | 
				
			||||||
	backingStorage := &dummyStorage{}
 | 
						backingStorage := &dummyStorage{}
 | 
				
			||||||
	cacher, _, err := newTestCacher(backingStorage)
 | 
						cacher, _, err := newTestCacher(backingStorage)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -1830,17 +1831,41 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	forceAllEvents, err := cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)})
 | 
						opts := storage.ListOptions{
 | 
				
			||||||
	require.NotNil(t, err, "the target method should return non nil error")
 | 
							Predicate:         storage.Everything,
 | 
				
			||||||
	require.Equal(t, err.Error(), "Timeout: Too large resource version: 105, current: 100")
 | 
							SendInitialEvents: pointer.Bool(true),
 | 
				
			||||||
	require.False(t, forceAllEvents, "the target method after returning an error should NOT instruct the caller to ask for all events in the cache (full state)")
 | 
							ResourceVersion:   "105",
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						opts.Predicate.AllowWatchBookmarks = true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						w, err := cacher.Watch(context.Background(), "pods/ns", opts)
 | 
				
			||||||
 | 
						require.NoError(t, err, "failed to create watch: %v")
 | 
				
			||||||
 | 
						defer w.Stop()
 | 
				
			||||||
 | 
						verifyEvents(t, w, []watch.Event{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								Type: watch.Error,
 | 
				
			||||||
 | 
								Object: &metav1.Status{
 | 
				
			||||||
 | 
									Status:  metav1.StatusFailure,
 | 
				
			||||||
 | 
									Message: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).Error(),
 | 
				
			||||||
 | 
									Details: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).(*apierrors.StatusError).Status().Details,
 | 
				
			||||||
 | 
									Reason:  metav1.StatusReasonTimeout,
 | 
				
			||||||
 | 
									Code:    504,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}, true)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
		cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}))
 | 
							cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}))
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
	forceAllEvents, err = cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)})
 | 
						w, err = cacher.Watch(context.Background(), "pods/ns", opts)
 | 
				
			||||||
	require.NoError(t, err)
 | 
						require.NoError(t, err, "failed to create watch: %v")
 | 
				
			||||||
	require.True(t, forceAllEvents, "the target method should instruct the caller to ask for all events in the cache (full state)")
 | 
						defer w.Stop()
 | 
				
			||||||
 | 
						verifyEvents(t, w, []watch.Event{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								Type:   watch.Added,
 | 
				
			||||||
 | 
								Object: makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}, true)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type fakeStorage struct {
 | 
					type fakeStorage struct {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user