From eb947f98eaea83cc5b660f805daba2b6f109502b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20Valais?= Date: Tue, 20 Jul 2021 14:16:59 +0200 Subject: [PATCH 1/3] memory leak: add unit test to show scheduler leaking goroutines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Maël Valais --- pkg/scheduler/BUILD.bazel | 5 ++- pkg/scheduler/scheduler.go | 5 ++- pkg/scheduler/scheduler_test.go | 59 ++++++++++++++++++++++++++++++--- 3 files changed, 63 insertions(+), 6 deletions(-) diff --git a/pkg/scheduler/BUILD.bazel b/pkg/scheduler/BUILD.bazel index 866927c91..0c6bd8f3b 100644 --- a/pkg/scheduler/BUILD.bazel +++ b/pkg/scheduler/BUILD.bazel @@ -12,7 +12,10 @@ go_test( name = "go_default_test", srcs = ["scheduler_test.go"], embed = [":go_default_library"], - deps = ["@io_k8s_utils//clock:go_default_library"], + deps = [ + "@com_github_stretchr_testify//assert:go_default_library", + "@io_k8s_utils//clock:go_default_library", + ], ) filegroup( diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index adef79a23..776257411 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -27,7 +27,7 @@ import ( // This little bit of wrapping needs to be done because go doesn't do // covariance, but it does coerce *time.Timer into stoppable implicitly if we // write it out like so. -var afterFunc = func(c clock.Clock, d time.Duration, f func()) stoppable { +func afterFunc(c clock.Clock, d time.Duration, f func()) stoppable { t := c.NewTimer(d) go func() { @@ -66,6 +66,9 @@ type scheduledWorkQueue struct { clock clock.Clock work map[interface{}]stoppable workLock sync.Mutex + + // Testing purposes. + afterFunc func(clock.Clock, time.Duration, func()) stoppable } // NewScheduledWorkQueue will create a new workqueue with the given processFunc diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index ecab88ab7..532c1e26b 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -17,16 +17,65 @@ limitations under the License. package scheduler import ( + "runtime" "sync" "testing" "time" + "github.com/stretchr/testify/assert" "k8s.io/utils/clock" ) +func Test_afterFunc(t *testing.T) { + // Note that re-implimenting AfterFunc is not a good idea, since testing it + // is tricky as seen in time_test.go in the standard library. We will just + // focus on two important cases: "f" should be run after the duration + + t.Run("stop works", func(t *testing.T) { + // This test makes sure afterFunc does not leak goroutines. + // + // This test may be run concurrently to other tests, so we want to avoid + // being affected by the goroutines from other tests. To do that, we start a + // huge number of afterFunc and check that the number of goroutines before + // and after more or less match. + expected := runtime.NumGoroutine() + var items []stoppable + for i := 1; i <= 10000; i++ { + items = append(items, afterFunc(clock.RealClock{}, 1*time.Hour, func() { + t.Errorf("should never be called") + })) + } + + for _, item := range items { + item.Stop() + } + + // We don't know when the goroutines will actually finish. + time.Sleep(100 * time.Millisecond) + + t.Logf("%d goroutines before, %d goroutines after", expected, runtime.NumGoroutine()) + + assert.InDelta(t, expected, runtime.NumGoroutine(), 100) + }) + + t.Run("f is called after 100 milliseconds", func(t *testing.T) { + var end time.Time + wait := make(chan struct{}) + + start := time.Now() + + afterFunc(clock.RealClock{}, 100*time.Millisecond, func() { + end = time.Now() + close(wait) + }) + + <-wait + assert.InDelta(t, 100*time.Millisecond, end.Sub(start), float64(1*time.Millisecond)) + }) +} + func TestAdd(t *testing.T) { after := newMockAfter() - afterFunc = after.AfterFunc var wg sync.WaitGroup type testT struct { @@ -56,6 +105,7 @@ func TestAdd(t *testing.T) { } waitSubtest <- struct{}{} }) + queue.(*scheduledWorkQueue).afterFunc = after.AfterFunc queue.Add(test.obj, test.duration) after.warp(test.duration + time.Millisecond) <-waitSubtest @@ -68,7 +118,6 @@ func TestAdd(t *testing.T) { func TestForget(t *testing.T) { after := newMockAfter() - afterFunc = after.AfterFunc var wg sync.WaitGroup type testT struct { @@ -85,9 +134,10 @@ func TestForget(t *testing.T) { t.Run(test.obj, func(test testT) func(*testing.T) { return func(t *testing.T) { defer wg.Done() - queue := NewScheduledWorkQueue(clock.RealClock{}, func(obj interface{}) { + queue := NewScheduledWorkQueue(clock.RealClock{}, func(_ interface{}) { t.Errorf("scheduled function should never be called") }) + queue.(*scheduledWorkQueue).afterFunc = after.AfterFunc queue.Add(test.obj, test.duration) queue.Forget(test.obj) after.warp(test.duration * 2) @@ -102,11 +152,12 @@ func TestForget(t *testing.T) { // doesn't end up hitting a data-race / leaking a timer. func TestConcurrentAdd(t *testing.T) { after := newMockAfter() - afterFunc = after.AfterFunc + var wg sync.WaitGroup queue := NewScheduledWorkQueue(clock.RealClock{}, func(obj interface{}) { t.Fatalf("should not be called, but was called with %v", obj) }) + queue.(*scheduledWorkQueue).afterFunc = after.AfterFunc for i := 0; i < 1000; i++ { wg.Add(1) From 8e872632f4a1ab7536ade7163ff9d9c21ee01ed1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20Valais?= Date: Tue, 20 Jul 2021 15:52:55 +0200 Subject: [PATCH 2/3] memory leak: the afterFunc goroutine now stops properly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Maël Valais --- pkg/scheduler/scheduler.go | 58 +++++++++++++++++++-------------- pkg/scheduler/scheduler_test.go | 14 ++++---- 2 files changed, 42 insertions(+), 30 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 776257411..4a7624e3c 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -23,27 +23,34 @@ import ( "k8s.io/utils/clock" ) -// For mocking purposes. -// This little bit of wrapping needs to be done because go doesn't do -// covariance, but it does coerce *time.Timer into stoppable implicitly if we -// write it out like so. -func afterFunc(c clock.Clock, d time.Duration, f func()) stoppable { +// We are writting our own time.AfterFunc to be able to mock the clock. The +// cancel function can be called concurrently. +func afterFunc(c clock.Clock, d time.Duration, f func()) (cancel func()) { t := c.NewTimer(d) + cancelCh := make(chan struct{}) + cancelOnce := sync.Once{} + cancel = func() { + t.Stop() + cancelOnce.Do(func() { + close(cancelCh) + }) + } go func() { - defer t.Stop() - if ti := <-t.C(); ti == (time.Time{}) { + defer cancel() + + select { + case <-t.C(): + // We don't need to check whether the channel has returned a zero + // value since t.C is never closed as per the timer.Stop + // documentation. + f() + case <-cancelCh: return } - f() }() - return t -} - -// stoppable is the subset of time.Timer which we use, split out for mocking purposes -type stoppable interface { - Stop() bool + return cancel } // ProcessFunc is a function to process an item in the work queue. @@ -64,11 +71,11 @@ type ScheduledWorkQueue interface { type scheduledWorkQueue struct { processFunc ProcessFunc clock clock.Clock - work map[interface{}]stoppable + work map[interface{}]func() workLock sync.Mutex // Testing purposes. - afterFunc func(clock.Clock, time.Duration, func()) stoppable + afterFunc func(clock.Clock, time.Duration, func()) func() } // NewScheduledWorkQueue will create a new workqueue with the given processFunc @@ -76,8 +83,10 @@ func NewScheduledWorkQueue(clock clock.Clock, processFunc ProcessFunc) Scheduled return &scheduledWorkQueue{ processFunc: processFunc, clock: clock, - work: make(map[interface{}]stoppable), + work: make(map[interface{}]func()), workLock: sync.Mutex{}, + + afterFunc: afterFunc, } } @@ -87,7 +96,12 @@ func NewScheduledWorkQueue(clock clock.Clock, processFunc ProcessFunc) Scheduled func (s *scheduledWorkQueue) Add(obj interface{}, duration time.Duration) { s.workLock.Lock() defer s.workLock.Unlock() - s.forget(obj) + + if cancel, ok := s.work[obj]; ok { + cancel() + delete(s.work, obj) + } + s.work[obj] = afterFunc(s.clock, duration, func() { defer s.Forget(obj) s.processFunc(obj) @@ -98,13 +112,9 @@ func (s *scheduledWorkQueue) Add(obj interface{}, duration time.Duration) { func (s *scheduledWorkQueue) Forget(obj interface{}) { s.workLock.Lock() defer s.workLock.Unlock() - s.forget(obj) -} -// forget cancels and removes an item. It *must* be called with the lock already held -func (s *scheduledWorkQueue) forget(obj interface{}) { - if timer, ok := s.work[obj]; ok { - timer.Stop() + if cancel, ok := s.work[obj]; ok { + cancel() delete(s.work, obj) } } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 532c1e26b..390c8390f 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -39,15 +39,15 @@ func Test_afterFunc(t *testing.T) { // huge number of afterFunc and check that the number of goroutines before // and after more or less match. expected := runtime.NumGoroutine() - var items []stoppable + var cancels []func() for i := 1; i <= 10000; i++ { - items = append(items, afterFunc(clock.RealClock{}, 1*time.Hour, func() { + cancels = append(cancels, afterFunc(clock.RealClock{}, 1*time.Hour, func() { t.Errorf("should never be called") })) } - for _, item := range items { - item.Stop() + for _, cancel := range cancels { + cancel() } // We don't know when the goroutines will actually finish. @@ -198,7 +198,7 @@ func newMockAfter() *mockAfter { } } -func (m *mockAfter) AfterFunc(c clock.Clock, d time.Duration, f func()) stoppable { +func (m *mockAfter) AfterFunc(c clock.Clock, d time.Duration, f func()) func() { m.lock.Lock() defer m.lock.Unlock() @@ -207,7 +207,9 @@ func (m *mockAfter) AfterFunc(c clock.Clock, d time.Duration, f func()) stoppabl t: m.currentTime.Add(d), } m.queue = append(m.queue, item) - return item + return func() { + item.Stop() + } } func (m *mockAfter) warp(d time.Duration) { From 641960b666e94b8f592652f14f7b1fc313c7e97b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20Valais?= Date: Tue, 20 Jul 2021 16:15:39 +0200 Subject: [PATCH 3/3] memory leak: clean up scheduler goroutine on certificate deletion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Maël Valais --- pkg/controller/acmeorders/controller.go | 7 +++ .../trigger/trigger_controller.go | 7 +++ pkg/scheduler/scheduler.go | 61 ++++++++++--------- 3 files changed, 45 insertions(+), 30 deletions(-) diff --git a/pkg/controller/acmeorders/controller.go b/pkg/controller/acmeorders/controller.go index 58df696c8..19eec594a 100644 --- a/pkg/controller/acmeorders/controller.go +++ b/pkg/controller/acmeorders/controller.go @@ -158,6 +158,13 @@ func NewController( } func (c *controller) ProcessItem(ctx context.Context, key string) error { + // We don't want to leak goroutines, so let's remove this key from the + // scheduled queue in case it has been scheduled. We only need to + // un-schedule the key for 'Deleted' events, but ProcessItem does not allow + // us to distinguish between 'Added', 'Updated' and 'Deleted'. Note that + // there is no unit test around this. + c.scheduledWorkQueue.Forget(key) + log := logf.FromContext(ctx) namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { diff --git a/pkg/controller/certificates/trigger/trigger_controller.go b/pkg/controller/certificates/trigger/trigger_controller.go index 4f146e54e..6a3efe490 100644 --- a/pkg/controller/certificates/trigger/trigger_controller.go +++ b/pkg/controller/certificates/trigger/trigger_controller.go @@ -124,6 +124,13 @@ func NewController( } func (c *controller) ProcessItem(ctx context.Context, key string) error { + // We don't want to leak goroutines, so let's remove this key from the + // scheduled queue in case it has been scheduled. We only need to + // un-schedule the key for 'Deleted' events, but ProcessItem does not allow + // us to distinguish between 'Added', 'Updated' and 'Deleted'. Note that + // there is no unit test around this. + c.scheduledWorkQueue.Forget(key) + log := logf.FromContext(ctx).WithValues("key", key) ctx = logf.NewContext(ctx, log) namespace, name, err := cache.SplitMetaNamespaceKey(key) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 4a7624e3c..0d4f4c210 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -23,36 +23,6 @@ import ( "k8s.io/utils/clock" ) -// We are writting our own time.AfterFunc to be able to mock the clock. The -// cancel function can be called concurrently. -func afterFunc(c clock.Clock, d time.Duration, f func()) (cancel func()) { - t := c.NewTimer(d) - cancelCh := make(chan struct{}) - cancelOnce := sync.Once{} - cancel = func() { - t.Stop() - cancelOnce.Do(func() { - close(cancelCh) - }) - } - - go func() { - defer cancel() - - select { - case <-t.C(): - // We don't need to check whether the channel has returned a zero - // value since t.C is never closed as per the timer.Stop - // documentation. - f() - case <-cancelCh: - return - } - }() - - return cancel -} - // ProcessFunc is a function to process an item in the work queue. type ProcessFunc func(interface{}) @@ -118,3 +88,34 @@ func (s *scheduledWorkQueue) Forget(obj interface{}) { delete(s.work, obj) } } + +// We are writting our own time.AfterFunc to be able to mock the clock. The +// cancel function can be called concurrently. +func afterFunc(c clock.Clock, d time.Duration, f func()) (cancel func()) { + t := c.NewTimer(d) + + // The caller expects `cancel` to stop the goroutine. Since `t.C` is not + // closed after calling `t.Stop`, we need to create our own `cancelCh` + // channel to stop the goroutine. + cancelCh := make(chan struct{}) + cancelOnce := sync.Once{} + cancel = func() { + t.Stop() + cancelOnce.Do(func() { + close(cancelCh) + }) + } + + go func() { + defer cancel() + + select { + case <-t.C(): + f() + case <-cancelCh: + return + } + }() + + return cancel +}