From 66d1fdd77b5a239e1269134967ade6fb91473f96 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Thu, 14 Apr 2016 10:28:40 -0400 Subject: [PATCH] UPSTREAM: 24208: Honor starting resourceVersion in watch cache --- .../k8s.io/kubernetes/pkg/storage/cacher.go | 13 +++-- .../kubernetes/pkg/storage/cacher_test.go | 48 +++++++++++++++++++ .../kubernetes/pkg/storage/watch_cache.go | 9 ++-- 3 files changed, 61 insertions(+), 9 deletions(-) diff --git a/Godeps/_workspace/src/k8s.io/kubernetes/pkg/storage/cacher.go b/Godeps/_workspace/src/k8s.io/kubernetes/pkg/storage/cacher.go index 652a03b7281d..c28d55874d98 100644 --- a/Godeps/_workspace/src/k8s.io/kubernetes/pkg/storage/cacher.go +++ b/Godeps/_workspace/src/k8s.io/kubernetes/pkg/storage/cacher.go @@ -269,7 +269,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, c.Lock() defer c.Unlock() - watcher := newCacheWatcher(initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx)) + watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx)) c.watchers[c.watcherIdx] = watcher c.watcherIdx++ return watcher, nil @@ -470,7 +470,7 @@ type cacheWatcher struct { forget func(bool) } -func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher { +func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher { watcher := &cacheWatcher{ input: make(chan watchCacheEvent, 10), result: make(chan watch.Event, 10), @@ -478,7 +478,7 @@ func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget fun stopped: false, forget: forget, } - go watcher.process(initEvents) + go watcher.process(initEvents, resourceVersion) return watcher } @@ -540,7 +540,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) { } } -func (c *cacheWatcher) process(initEvents []watchCacheEvent) { +func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64) { defer utilruntime.HandleCrash() for _, event := range initEvents { @@ -553,6 +553,9 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent) { if !ok { return } - c.sendWatchCacheEvent(event) + // only send events newer than resourceVersion + if event.ResourceVersion > resourceVersion { + c.sendWatchCacheEvent(event) + } } } diff --git a/Godeps/_workspace/src/k8s.io/kubernetes/pkg/storage/cacher_test.go b/Godeps/_workspace/src/k8s.io/kubernetes/pkg/storage/cacher_test.go index 7356646ce1bc..9fb0e0774aab 100644 --- a/Godeps/_workspace/src/k8s.io/kubernetes/pkg/storage/cacher_test.go +++ b/Godeps/_workspace/src/k8s.io/kubernetes/pkg/storage/cacher_test.go @@ -343,3 +343,51 @@ func TestFiltering(t *testing.T) { verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime) } + +func TestStartingResourceVersion(t *testing.T) { + server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) + defer server.Terminate(t) + cacher := newTestCacher(etcdStorage) + defer cacher.Stop() + + // add 1 object + podFoo := makeTestPod("foo") + fooCreated := updatePod(t, etcdStorage, podFoo, nil) + + // Set up Watch starting at fooCreated.ResourceVersion + 10 + rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + rv += 10 + startVersion := strconv.Itoa(int(rv)) + + watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer watcher.Stop() + + lastFoo := fooCreated + for i := 0; i < 11; i++ { + podFooForUpdate := makeTestPod("foo") + podFooForUpdate.Labels = map[string]string{"foo": strconv.Itoa(i)} + lastFoo = updatePod(t, etcdStorage, podFooForUpdate, lastFoo) + } + + select { + case e := <-watcher.ResultChan(): + pod := e.Object.(*api.Pod) + podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // event should have at least rv + 1, since we're starting the watch at rv + if podRV <= rv { + t.Errorf("expected event with resourceVersion of at least %d, got %d", rv+1, podRV) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timed out waiting for event") + } +} diff --git a/Godeps/_workspace/src/k8s.io/kubernetes/pkg/storage/watch_cache.go b/Godeps/_workspace/src/k8s.io/kubernetes/pkg/storage/watch_cache.go index 87bce0c0e6a3..3b34479fa2ac 100644 --- a/Godeps/_workspace/src/k8s.io/kubernetes/pkg/storage/watch_cache.go +++ b/Godeps/_workspace/src/k8s.io/kubernetes/pkg/storage/watch_cache.go @@ -42,9 +42,10 @@ const ( // the previous value of the object to enable proper filtering in the // upper layers. type watchCacheEvent struct { - Type watch.EventType - Object runtime.Object - PrevObject runtime.Object + Type watch.EventType + Object runtime.Object + PrevObject runtime.Object + ResourceVersion uint64 } // watchCacheElement is a single "watch event" stored in a cache. @@ -179,7 +180,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd if exists { prevObject = previous.(runtime.Object) } - watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject} + watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion} if w.onEvent != nil { w.onEvent(watchCacheEvent) }