mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	cacher allow context cancellation if not ready (#116024)
* cacher allow context cancellation if not ready Replace the sync.Cond variable with a channel so we can use the context cancellation signal. Co-authored-by: Wojciech Tyczy<C5><84>ski <wojtekt@google.com> Change-Id: I2f75313a6337feee440ece4c1e873c32a12560dd * wait again on pending state Change-Id: I1ad79253a5a5d56a4d9611125825b1f7ad552be8 --------- Co-authored-by: Wojciech Tyczy<C5><84>ski <wojtekt@google.com>
This commit is contained in:
		@@ -252,7 +252,7 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
 | 
				
			|||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -510,7 +510,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
 | 
				
			|||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err := c.ready.wait(); err != nil {
 | 
						if err := c.ready.wait(ctx); err != nil {
 | 
				
			||||||
		return nil, errors.NewServiceUnavailable(err.Error())
 | 
							return nil, errors.NewServiceUnavailable(err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -622,7 +622,7 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// Do not create a trace - it's not for free and there are tons
 | 
						// Do not create a trace - it's not for free and there are tons
 | 
				
			||||||
	// of Get requests. We can add it if it will be really needed.
 | 
						// of Get requests. We can add it if it will be really needed.
 | 
				
			||||||
	if err := c.ready.wait(); err != nil {
 | 
						if err := c.ready.wait(ctx); err != nil {
 | 
				
			||||||
		return errors.NewServiceUnavailable(err.Error())
 | 
							return errors.NewServiceUnavailable(err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -714,7 +714,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
 | 
				
			|||||||
		attribute.Stringer("type", c.groupResource))
 | 
							attribute.Stringer("type", c.groupResource))
 | 
				
			||||||
	defer span.End(500 * time.Millisecond)
 | 
						defer span.End(500 * time.Millisecond)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err := c.ready.wait(); err != nil {
 | 
						if err := c.ready.wait(ctx); err != nil {
 | 
				
			||||||
		return errors.NewServiceUnavailable(err.Error())
 | 
							return errors.NewServiceUnavailable(err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	span.AddEvent("Ready")
 | 
						span.AddEvent("Ready")
 | 
				
			||||||
@@ -1149,7 +1149,7 @@ func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWit
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// LastSyncResourceVersion returns resource version to which the underlying cache is synced.
 | 
					// LastSyncResourceVersion returns resource version to which the underlying cache is synced.
 | 
				
			||||||
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
 | 
					func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
 | 
				
			||||||
	if err := c.ready.wait(); err != nil {
 | 
						if err := c.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		return 0, errors.NewServiceUnavailable(err.Error())
 | 
							return 0, errors.NewServiceUnavailable(err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -28,6 +28,7 @@ import (
 | 
				
			|||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	apiequality "k8s.io/apimachinery/pkg/api/equality"
 | 
						apiequality "k8s.io/apimachinery/pkg/api/equality"
 | 
				
			||||||
 | 
						apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/api/meta"
 | 
						"k8s.io/apimachinery/pkg/api/meta"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/fields"
 | 
						"k8s.io/apimachinery/pkg/fields"
 | 
				
			||||||
@@ -185,7 +186,7 @@ func TestGetListCacheBypass(t *testing.T) {
 | 
				
			|||||||
	result := &example.PodList{}
 | 
						result := &example.PodList{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -224,7 +225,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
 | 
				
			|||||||
	result := &example.PodList{}
 | 
						result := &example.PodList{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -258,7 +259,7 @@ func TestGetCacheBypass(t *testing.T) {
 | 
				
			|||||||
	result := &example.Pod{}
 | 
						result := &example.Pod{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -290,7 +291,7 @@ func TestWatchCacheBypass(t *testing.T) {
 | 
				
			|||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -313,6 +314,34 @@ func TestWatchCacheBypass(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestWatchNotHangingOnStartupFailure(t *testing.T) {
 | 
				
			||||||
 | 
						// Configure cacher so that it can't initialize, because of
 | 
				
			||||||
 | 
						// constantly failing lists to the underlying storage.
 | 
				
			||||||
 | 
						dummyErr := fmt.Errorf("dummy")
 | 
				
			||||||
 | 
						backingStorage := &dummyStorage{err: dummyErr}
 | 
				
			||||||
 | 
						cacher, _, err := newTestCacher(backingStorage)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Couldn't create cacher: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ctx, cancel := context.WithCancel(context.Background())
 | 
				
			||||||
 | 
						// Cancel the watch after some time to check if it will properly
 | 
				
			||||||
 | 
						// terminate instead of hanging forever.
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							defer cancel()
 | 
				
			||||||
 | 
							cacher.clock.Sleep(5 * time.Second)
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Watch hangs waiting on watchcache being initialized.
 | 
				
			||||||
 | 
						// Ensure that it terminates when its context is cancelled
 | 
				
			||||||
 | 
						// (e.g. the request is terminated for whatever reason).
 | 
				
			||||||
 | 
						_, err = cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0"})
 | 
				
			||||||
 | 
						if err == nil || err.Error() != apierrors.NewServiceUnavailable(context.Canceled.Error()).Error() {
 | 
				
			||||||
 | 
							t.Errorf("Unexpected error: %#v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestWatcherNotGoingBackInTime(t *testing.T) {
 | 
					func TestWatcherNotGoingBackInTime(t *testing.T) {
 | 
				
			||||||
	backingStorage := &dummyStorage{}
 | 
						backingStorage := &dummyStorage{}
 | 
				
			||||||
	cacher, _, err := newTestCacher(backingStorage)
 | 
						cacher, _, err := newTestCacher(backingStorage)
 | 
				
			||||||
@@ -322,7 +351,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
 | 
				
			|||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -408,7 +437,7 @@ func TestCacheDontAcceptRequestsStopped(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -471,7 +500,7 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
 | 
				
			|||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	pred := storage.Everything
 | 
						pred := storage.Everything
 | 
				
			||||||
@@ -569,7 +598,7 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo
 | 
				
			|||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	pred := storage.Everything
 | 
						pred := storage.Everything
 | 
				
			||||||
@@ -669,7 +698,7 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
 | 
				
			|||||||
	cacher.bookmarkWatchers.bookmarkFrequency = time.Second
 | 
						cacher.bookmarkWatchers.bookmarkFrequency = time.Second
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	pred := storage.Everything
 | 
						pred := storage.Everything
 | 
				
			||||||
@@ -739,7 +768,7 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
 | 
				
			|||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -817,7 +846,7 @@ func TestBookmarksOnResourceVersionUpdates(t *testing.T) {
 | 
				
			|||||||
	cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second)
 | 
						cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -895,7 +924,7 @@ func TestStartingResourceVersion(t *testing.T) {
 | 
				
			|||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -975,7 +1004,7 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
 | 
				
			|||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -1086,7 +1115,7 @@ func TestCachingDeleteEvents(t *testing.T) {
 | 
				
			|||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -1168,7 +1197,7 @@ func testCachingObjects(t *testing.T, watchersCount int) {
 | 
				
			|||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -1264,7 +1293,7 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
 | 
				
			|||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait until cacher is initialized.
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
	if err := cacher.ready.wait(); err != nil {
 | 
						if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// Ensure there is enough budget for slow processing since
 | 
						// Ensure there is enough budget for slow processing since
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -17,6 +17,7 @@ limitations under the License.
 | 
				
			|||||||
package cacher
 | 
					package cacher
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@@ -30,67 +31,111 @@ const (
 | 
				
			|||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ready is a three state condition variable that blocks until is Ready if is not Stopped.
 | 
					// ready is a three state condition variable that blocks until is Ready if is not Stopped.
 | 
				
			||||||
// Its initial state is Pending.
 | 
					// Its initial state is Pending and its state machine diagram is as follow.
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// Pending <------> Ready -----> Stopped
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					//	|                           ^
 | 
				
			||||||
 | 
					//	└---------------------------┘
 | 
				
			||||||
type ready struct {
 | 
					type ready struct {
 | 
				
			||||||
	state status
 | 
						state       status        // represent the state of the variable
 | 
				
			||||||
	c     *sync.Cond
 | 
						lock        sync.RWMutex  // protect the state variable
 | 
				
			||||||
 | 
						restartLock sync.Mutex    // protect the transition from ready to pending where the channel is recreated
 | 
				
			||||||
 | 
						waitCh      chan struct{} // blocks until is ready or stopped
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func newReady() *ready {
 | 
					func newReady() *ready {
 | 
				
			||||||
	return &ready{
 | 
						return &ready{
 | 
				
			||||||
		c:     sync.NewCond(&sync.RWMutex{}),
 | 
							waitCh: make(chan struct{}),
 | 
				
			||||||
		state: Pending,
 | 
							state:  Pending,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// done close the channel once the state is Ready or Stopped
 | 
				
			||||||
 | 
					func (r *ready) done() chan struct{} {
 | 
				
			||||||
 | 
						r.restartLock.Lock()
 | 
				
			||||||
 | 
						defer r.restartLock.Unlock()
 | 
				
			||||||
 | 
						return r.waitCh
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// wait blocks until it is Ready or Stopped, it returns an error if is Stopped.
 | 
					// wait blocks until it is Ready or Stopped, it returns an error if is Stopped.
 | 
				
			||||||
func (r *ready) wait() error {
 | 
					func (r *ready) wait(ctx context.Context) error {
 | 
				
			||||||
	r.c.L.Lock()
 | 
						for {
 | 
				
			||||||
	defer r.c.L.Unlock()
 | 
							// r.done() only blocks if state is Pending
 | 
				
			||||||
	for r.state == Pending {
 | 
							select {
 | 
				
			||||||
		r.c.Wait()
 | 
							case <-ctx.Done():
 | 
				
			||||||
	}
 | 
								return ctx.Err()
 | 
				
			||||||
	switch r.state {
 | 
							case <-r.done():
 | 
				
			||||||
	case Ready:
 | 
							}
 | 
				
			||||||
		return nil
 | 
					
 | 
				
			||||||
	case Stopped:
 | 
							r.lock.RLock()
 | 
				
			||||||
		return fmt.Errorf("apiserver cacher is stopped")
 | 
							switch r.state {
 | 
				
			||||||
	default:
 | 
							case Pending:
 | 
				
			||||||
		return fmt.Errorf("unexpected apiserver cache state: %v", r.state)
 | 
								// since we allow to switch between the states Pending and Ready
 | 
				
			||||||
 | 
								// if there is a quick transition from Pending -> Ready -> Pending
 | 
				
			||||||
 | 
								// a process that was waiting can get unblocked and see a Pending
 | 
				
			||||||
 | 
								// state again. If the state is Pending we have to wait again to
 | 
				
			||||||
 | 
								// avoid an inconsistent state on the system, with some processes not
 | 
				
			||||||
 | 
								// waiting despite the state moved back to Pending.
 | 
				
			||||||
 | 
								r.lock.RUnlock()
 | 
				
			||||||
 | 
							case Ready:
 | 
				
			||||||
 | 
								r.lock.RUnlock()
 | 
				
			||||||
 | 
								return nil
 | 
				
			||||||
 | 
							case Stopped:
 | 
				
			||||||
 | 
								r.lock.RUnlock()
 | 
				
			||||||
 | 
								return fmt.Errorf("apiserver cacher is stopped")
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
 | 
								r.lock.RUnlock()
 | 
				
			||||||
 | 
								return fmt.Errorf("unexpected apiserver cache state: %v", r.state)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// check returns true only if it is Ready.
 | 
					// check returns true only if it is Ready.
 | 
				
			||||||
func (r *ready) check() bool {
 | 
					func (r *ready) check() bool {
 | 
				
			||||||
	// TODO: Make check() function more sophisticated, in particular
 | 
						r.lock.RLock()
 | 
				
			||||||
	// allow it to behave as "waitWithTimeout".
 | 
						defer r.lock.RUnlock()
 | 
				
			||||||
	rwMutex := r.c.L.(*sync.RWMutex)
 | 
					 | 
				
			||||||
	rwMutex.RLock()
 | 
					 | 
				
			||||||
	defer rwMutex.RUnlock()
 | 
					 | 
				
			||||||
	return r.state == Ready
 | 
						return r.state == Ready
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
 | 
					// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
 | 
				
			||||||
func (r *ready) set(ok bool) {
 | 
					func (r *ready) set(ok bool) {
 | 
				
			||||||
	r.c.L.Lock()
 | 
						r.lock.Lock()
 | 
				
			||||||
	defer r.c.L.Unlock()
 | 
						defer r.lock.Unlock()
 | 
				
			||||||
	if r.state == Stopped {
 | 
						if r.state == Stopped {
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if ok {
 | 
						if ok && r.state == Pending {
 | 
				
			||||||
		r.state = Ready
 | 
							r.state = Ready
 | 
				
			||||||
	} else {
 | 
							select {
 | 
				
			||||||
 | 
							case <-r.waitCh:
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
 | 
								close(r.waitCh)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						} else if !ok && r.state == Ready {
 | 
				
			||||||
 | 
							// creating the waitCh can be racy if
 | 
				
			||||||
 | 
							// something enter the wait() method
 | 
				
			||||||
 | 
							select {
 | 
				
			||||||
 | 
							case <-r.waitCh:
 | 
				
			||||||
 | 
								r.restartLock.Lock()
 | 
				
			||||||
 | 
								r.waitCh = make(chan struct{})
 | 
				
			||||||
 | 
								r.restartLock.Unlock()
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		r.state = Pending
 | 
							r.state = Pending
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	r.c.Broadcast()
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// stop the condition variable and set it as Stopped. This state is irreversible.
 | 
					// stop the condition variable and set it as Stopped. This state is irreversible.
 | 
				
			||||||
func (r *ready) stop() {
 | 
					func (r *ready) stop() {
 | 
				
			||||||
	r.c.L.Lock()
 | 
						r.lock.Lock()
 | 
				
			||||||
	defer r.c.L.Unlock()
 | 
						defer r.lock.Unlock()
 | 
				
			||||||
	if r.state != Stopped {
 | 
						if r.state != Stopped {
 | 
				
			||||||
		r.state = Stopped
 | 
							r.state = Stopped
 | 
				
			||||||
		r.c.Broadcast()
 | 
						}
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-r.waitCh:
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							close(r.waitCh)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -17,7 +17,9 @@ limitations under the License.
 | 
				
			|||||||
package cacher
 | 
					package cacher
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func Test_newReady(t *testing.T) {
 | 
					func Test_newReady(t *testing.T) {
 | 
				
			||||||
@@ -27,9 +29,14 @@ func Test_newReady(t *testing.T) {
 | 
				
			|||||||
	// create 10 goroutines waiting for ready
 | 
						// create 10 goroutines waiting for ready
 | 
				
			||||||
	for i := 0; i < 10; i++ {
 | 
						for i := 0; i < 10; i++ {
 | 
				
			||||||
		go func() {
 | 
							go func() {
 | 
				
			||||||
			errCh <- ready.wait()
 | 
								errCh <- ready.wait(context.Background())
 | 
				
			||||||
		}()
 | 
							}()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-time.After(1 * time.Second):
 | 
				
			||||||
 | 
						case <-errCh:
 | 
				
			||||||
 | 
							t.Errorf("ready should be blocking")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	ready.set(true)
 | 
						ready.set(true)
 | 
				
			||||||
	for i := 0; i < 10; i++ {
 | 
						for i := 0; i < 10; i++ {
 | 
				
			||||||
		if err := <-errCh; err != nil {
 | 
							if err := <-errCh; err != nil {
 | 
				
			||||||
@@ -38,6 +45,61 @@ func Test_newReady(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func Test_newReadySetIdempotent(t *testing.T) {
 | 
				
			||||||
 | 
						errCh := make(chan error, 10)
 | 
				
			||||||
 | 
						ready := newReady()
 | 
				
			||||||
 | 
						ready.set(false)
 | 
				
			||||||
 | 
						ready.set(false)
 | 
				
			||||||
 | 
						ready.set(false)
 | 
				
			||||||
 | 
						ready.set(true)
 | 
				
			||||||
 | 
						ready.set(true)
 | 
				
			||||||
 | 
						ready.set(true)
 | 
				
			||||||
 | 
						ready.set(false)
 | 
				
			||||||
 | 
						// create 10 goroutines waiting for ready and stop
 | 
				
			||||||
 | 
						for i := 0; i < 10; i++ {
 | 
				
			||||||
 | 
							go func() {
 | 
				
			||||||
 | 
								errCh <- ready.wait(context.Background())
 | 
				
			||||||
 | 
							}()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-time.After(1 * time.Second):
 | 
				
			||||||
 | 
						case <-errCh:
 | 
				
			||||||
 | 
							t.Errorf("ready should be blocking")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						ready.set(true)
 | 
				
			||||||
 | 
						for i := 0; i < 10; i++ {
 | 
				
			||||||
 | 
							if err := <-errCh; err != nil {
 | 
				
			||||||
 | 
								t.Errorf("unexpected error on channel %d", i)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Test_newReadyRacy executes all the possible transitions randomly.
 | 
				
			||||||
 | 
					// It must run with the race detector enabled.
 | 
				
			||||||
 | 
					func Test_newReadyRacy(t *testing.T) {
 | 
				
			||||||
 | 
						concurrency := 1000
 | 
				
			||||||
 | 
						errCh := make(chan error, concurrency)
 | 
				
			||||||
 | 
						ready := newReady()
 | 
				
			||||||
 | 
						ready.set(false)
 | 
				
			||||||
 | 
						for i := 0; i < concurrency; i++ {
 | 
				
			||||||
 | 
							go func() {
 | 
				
			||||||
 | 
								errCh <- ready.wait(context.Background())
 | 
				
			||||||
 | 
							}()
 | 
				
			||||||
 | 
							go func() {
 | 
				
			||||||
 | 
								ready.set(false)
 | 
				
			||||||
 | 
							}()
 | 
				
			||||||
 | 
							go func() {
 | 
				
			||||||
 | 
								ready.set(true)
 | 
				
			||||||
 | 
							}()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						ready.set(true)
 | 
				
			||||||
 | 
						for i := 0; i < concurrency; i++ {
 | 
				
			||||||
 | 
							if err := <-errCh; err != nil {
 | 
				
			||||||
 | 
								t.Errorf("unexpected error %v on channel %d", err, i)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func Test_newReadyStop(t *testing.T) {
 | 
					func Test_newReadyStop(t *testing.T) {
 | 
				
			||||||
	errCh := make(chan error, 10)
 | 
						errCh := make(chan error, 10)
 | 
				
			||||||
	ready := newReady()
 | 
						ready := newReady()
 | 
				
			||||||
@@ -45,9 +107,14 @@ func Test_newReadyStop(t *testing.T) {
 | 
				
			|||||||
	// create 10 goroutines waiting for ready and stop
 | 
						// create 10 goroutines waiting for ready and stop
 | 
				
			||||||
	for i := 0; i < 10; i++ {
 | 
						for i := 0; i < 10; i++ {
 | 
				
			||||||
		go func() {
 | 
							go func() {
 | 
				
			||||||
			errCh <- ready.wait()
 | 
								errCh <- ready.wait(context.Background())
 | 
				
			||||||
		}()
 | 
							}()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-time.After(1 * time.Second):
 | 
				
			||||||
 | 
						case <-errCh:
 | 
				
			||||||
 | 
							t.Errorf("ready should be blocking")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	ready.stop()
 | 
						ready.stop()
 | 
				
			||||||
	for i := 0; i < 10; i++ {
 | 
						for i := 0; i < 10; i++ {
 | 
				
			||||||
		if err := <-errCh; err == nil {
 | 
							if err := <-errCh; err == nil {
 | 
				
			||||||
@@ -76,8 +143,32 @@ func Test_newReadyCheck(t *testing.T) {
 | 
				
			|||||||
	if ready.check() {
 | 
						if ready.check() {
 | 
				
			||||||
		t.Errorf("unexpected ready state %v", ready.check())
 | 
							t.Errorf("unexpected ready state %v", ready.check())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	err := ready.wait()
 | 
						err := ready.wait(context.Background())
 | 
				
			||||||
	if err == nil {
 | 
						if err == nil {
 | 
				
			||||||
		t.Errorf("expected error waiting on a stopped state")
 | 
							t.Errorf("expected error waiting on a stopped state")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func Test_newReadyCancelPending(t *testing.T) {
 | 
				
			||||||
 | 
						errCh := make(chan error, 10)
 | 
				
			||||||
 | 
						ready := newReady()
 | 
				
			||||||
 | 
						ready.set(false)
 | 
				
			||||||
 | 
						ctx, cancel := context.WithCancel(context.Background())
 | 
				
			||||||
 | 
						// create 10 goroutines stuck on pending
 | 
				
			||||||
 | 
						for i := 0; i < 10; i++ {
 | 
				
			||||||
 | 
							go func() {
 | 
				
			||||||
 | 
								errCh <- ready.wait(ctx)
 | 
				
			||||||
 | 
							}()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-time.After(1 * time.Second):
 | 
				
			||||||
 | 
						case <-errCh:
 | 
				
			||||||
 | 
							t.Errorf("ready should be blocking")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						cancel()
 | 
				
			||||||
 | 
						for i := 0; i < 10; i++ {
 | 
				
			||||||
 | 
							if err := <-errCh; err == nil {
 | 
				
			||||||
 | 
								t.Errorf("unexpected success on channel %d", i)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user