Merge pull request #4233 from maelvls/goroutine-leak

Memory leak: fix the scheduler's goroutine leakage
This commit is contained in:
jetstack-bot 2021-07-23 20:34:19 +01:00 committed by GitHub
commit 1021b58286
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 123 additions and 39 deletions

View File

@ -158,6 +158,13 @@ func NewController(
}
func (c *controller) ProcessItem(ctx context.Context, key string) error {
// We don't want to leak goroutines, so let's remove this key from the
// scheduled queue in case it has been scheduled. We only need to
// un-schedule the key for 'Deleted' events, but ProcessItem does not allow
// us to distinguish between 'Added', 'Updated' and 'Deleted'. Note that
// there is no unit test around this.
c.scheduledWorkQueue.Forget(key)
log := logf.FromContext(ctx)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {

View File

@ -124,6 +124,13 @@ func NewController(
}
func (c *controller) ProcessItem(ctx context.Context, key string) error {
// We don't want to leak goroutines, so let's remove this key from the
// scheduled queue in case it has been scheduled. We only need to
// un-schedule the key for 'Deleted' events, but ProcessItem does not allow
// us to distinguish between 'Added', 'Updated' and 'Deleted'. Note that
// there is no unit test around this.
c.scheduledWorkQueue.Forget(key)
log := logf.FromContext(ctx).WithValues("key", key)
ctx = logf.NewContext(ctx, log)
namespace, name, err := cache.SplitMetaNamespaceKey(key)

View File

@ -12,7 +12,10 @@ go_test(
name = "go_default_test",
srcs = ["scheduler_test.go"],
embed = [":go_default_library"],
deps = ["@io_k8s_utils//clock:go_default_library"],
deps = [
"@com_github_stretchr_testify//assert:go_default_library",
"@io_k8s_utils//clock:go_default_library",
],
)
filegroup(

View File

@ -23,29 +23,6 @@ import (
"k8s.io/utils/clock"
)
// For mocking purposes.
// This little bit of wrapping needs to be done because go doesn't do
// covariance, but it does coerce *time.Timer into stoppable implicitly if we
// write it out like so.
var afterFunc = func(c clock.Clock, d time.Duration, f func()) stoppable {
t := c.NewTimer(d)
go func() {
defer t.Stop()
if ti := <-t.C(); ti == (time.Time{}) {
return
}
f()
}()
return t
}
// stoppable is the subset of time.Timer which we use, split out for mocking purposes
type stoppable interface {
Stop() bool
}
// ProcessFunc is a function to process an item in the work queue.
type ProcessFunc func(interface{})
@ -64,8 +41,11 @@ type ScheduledWorkQueue interface {
type scheduledWorkQueue struct {
processFunc ProcessFunc
clock clock.Clock
work map[interface{}]stoppable
work map[interface{}]func()
workLock sync.Mutex
// Testing purposes.
afterFunc func(clock.Clock, time.Duration, func()) func()
}
// NewScheduledWorkQueue will create a new workqueue with the given processFunc
@ -73,8 +53,10 @@ func NewScheduledWorkQueue(clock clock.Clock, processFunc ProcessFunc) Scheduled
return &scheduledWorkQueue{
processFunc: processFunc,
clock: clock,
work: make(map[interface{}]stoppable),
work: make(map[interface{}]func()),
workLock: sync.Mutex{},
afterFunc: afterFunc,
}
}
@ -84,7 +66,12 @@ func NewScheduledWorkQueue(clock clock.Clock, processFunc ProcessFunc) Scheduled
func (s *scheduledWorkQueue) Add(obj interface{}, duration time.Duration) {
s.workLock.Lock()
defer s.workLock.Unlock()
s.forget(obj)
if cancel, ok := s.work[obj]; ok {
cancel()
delete(s.work, obj)
}
s.work[obj] = afterFunc(s.clock, duration, func() {
defer s.Forget(obj)
s.processFunc(obj)
@ -95,13 +82,40 @@ func (s *scheduledWorkQueue) Add(obj interface{}, duration time.Duration) {
func (s *scheduledWorkQueue) Forget(obj interface{}) {
s.workLock.Lock()
defer s.workLock.Unlock()
s.forget(obj)
}
// forget cancels and removes an item. It *must* be called with the lock already held
func (s *scheduledWorkQueue) forget(obj interface{}) {
if timer, ok := s.work[obj]; ok {
timer.Stop()
if cancel, ok := s.work[obj]; ok {
cancel()
delete(s.work, obj)
}
}
// We are writting our own time.AfterFunc to be able to mock the clock. The
// cancel function can be called concurrently.
func afterFunc(c clock.Clock, d time.Duration, f func()) (cancel func()) {
t := c.NewTimer(d)
// The caller expects `cancel` to stop the goroutine. Since `t.C` is not
// closed after calling `t.Stop`, we need to create our own `cancelCh`
// channel to stop the goroutine.
cancelCh := make(chan struct{})
cancelOnce := sync.Once{}
cancel = func() {
t.Stop()
cancelOnce.Do(func() {
close(cancelCh)
})
}
go func() {
defer cancel()
select {
case <-t.C():
f()
case <-cancelCh:
return
}
}()
return cancel
}

View File

@ -17,16 +17,65 @@ limitations under the License.
package scheduler
import (
"runtime"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/utils/clock"
)
func Test_afterFunc(t *testing.T) {
// Note that re-implimenting AfterFunc is not a good idea, since testing it
// is tricky as seen in time_test.go in the standard library. We will just
// focus on two important cases: "f" should be run after the duration
t.Run("stop works", func(t *testing.T) {
// This test makes sure afterFunc does not leak goroutines.
//
// This test may be run concurrently to other tests, so we want to avoid
// being affected by the goroutines from other tests. To do that, we start a
// huge number of afterFunc and check that the number of goroutines before
// and after more or less match.
expected := runtime.NumGoroutine()
var cancels []func()
for i := 1; i <= 10000; i++ {
cancels = append(cancels, afterFunc(clock.RealClock{}, 1*time.Hour, func() {
t.Errorf("should never be called")
}))
}
for _, cancel := range cancels {
cancel()
}
// We don't know when the goroutines will actually finish.
time.Sleep(100 * time.Millisecond)
t.Logf("%d goroutines before, %d goroutines after", expected, runtime.NumGoroutine())
assert.InDelta(t, expected, runtime.NumGoroutine(), 100)
})
t.Run("f is called after 100 milliseconds", func(t *testing.T) {
var end time.Time
wait := make(chan struct{})
start := time.Now()
afterFunc(clock.RealClock{}, 100*time.Millisecond, func() {
end = time.Now()
close(wait)
})
<-wait
assert.InDelta(t, 100*time.Millisecond, end.Sub(start), float64(1*time.Millisecond))
})
}
func TestAdd(t *testing.T) {
after := newMockAfter()
afterFunc = after.AfterFunc
var wg sync.WaitGroup
type testT struct {
@ -56,6 +105,7 @@ func TestAdd(t *testing.T) {
}
waitSubtest <- struct{}{}
})
queue.(*scheduledWorkQueue).afterFunc = after.AfterFunc
queue.Add(test.obj, test.duration)
after.warp(test.duration + time.Millisecond)
<-waitSubtest
@ -68,7 +118,6 @@ func TestAdd(t *testing.T) {
func TestForget(t *testing.T) {
after := newMockAfter()
afterFunc = after.AfterFunc
var wg sync.WaitGroup
type testT struct {
@ -85,9 +134,10 @@ func TestForget(t *testing.T) {
t.Run(test.obj, func(test testT) func(*testing.T) {
return func(t *testing.T) {
defer wg.Done()
queue := NewScheduledWorkQueue(clock.RealClock{}, func(obj interface{}) {
queue := NewScheduledWorkQueue(clock.RealClock{}, func(_ interface{}) {
t.Errorf("scheduled function should never be called")
})
queue.(*scheduledWorkQueue).afterFunc = after.AfterFunc
queue.Add(test.obj, test.duration)
queue.Forget(test.obj)
after.warp(test.duration * 2)
@ -102,11 +152,12 @@ func TestForget(t *testing.T) {
// doesn't end up hitting a data-race / leaking a timer.
func TestConcurrentAdd(t *testing.T) {
after := newMockAfter()
afterFunc = after.AfterFunc
var wg sync.WaitGroup
queue := NewScheduledWorkQueue(clock.RealClock{}, func(obj interface{}) {
t.Fatalf("should not be called, but was called with %v", obj)
})
queue.(*scheduledWorkQueue).afterFunc = after.AfterFunc
for i := 0; i < 1000; i++ {
wg.Add(1)
@ -147,7 +198,7 @@ func newMockAfter() *mockAfter {
}
}
func (m *mockAfter) AfterFunc(c clock.Clock, d time.Duration, f func()) stoppable {
func (m *mockAfter) AfterFunc(c clock.Clock, d time.Duration, f func()) func() {
m.lock.Lock()
defer m.lock.Unlock()
@ -156,7 +207,9 @@ func (m *mockAfter) AfterFunc(c clock.Clock, d time.Duration, f func()) stoppabl
t: m.currentTime.Add(d),
}
m.queue = append(m.queue, item)
return item
return func() {
item.Stop()
}
}
func (m *mockAfter) warp(d time.Duration) {