memory leak: the afterFunc goroutine now stops properly

Signed-off-by: Maël Valais <mael@vls.dev>
This commit is contained in:
Maël Valais 2021-07-20 15:52:55 +02:00
parent eb947f98ea
commit 8e872632f4
2 changed files with 42 additions and 30 deletions

View File

@ -23,27 +23,34 @@ import (
"k8s.io/utils/clock"
)
// For mocking purposes.
// This little bit of wrapping needs to be done because go doesn't do
// covariance, but it does coerce *time.Timer into stoppable implicitly if we
// write it out like so.
func afterFunc(c clock.Clock, d time.Duration, f func()) stoppable {
// We are writting our own time.AfterFunc to be able to mock the clock. The
// cancel function can be called concurrently.
func afterFunc(c clock.Clock, d time.Duration, f func()) (cancel func()) {
t := c.NewTimer(d)
cancelCh := make(chan struct{})
cancelOnce := sync.Once{}
cancel = func() {
t.Stop()
cancelOnce.Do(func() {
close(cancelCh)
})
}
go func() {
defer t.Stop()
if ti := <-t.C(); ti == (time.Time{}) {
defer cancel()
select {
case <-t.C():
// We don't need to check whether the channel has returned a zero
// value since t.C is never closed as per the timer.Stop
// documentation.
f()
case <-cancelCh:
return
}
f()
}()
return t
}
// stoppable is the subset of time.Timer which we use, split out for mocking purposes
type stoppable interface {
Stop() bool
return cancel
}
// ProcessFunc is a function to process an item in the work queue.
@ -64,11 +71,11 @@ type ScheduledWorkQueue interface {
type scheduledWorkQueue struct {
processFunc ProcessFunc
clock clock.Clock
work map[interface{}]stoppable
work map[interface{}]func()
workLock sync.Mutex
// Testing purposes.
afterFunc func(clock.Clock, time.Duration, func()) stoppable
afterFunc func(clock.Clock, time.Duration, func()) func()
}
// NewScheduledWorkQueue will create a new workqueue with the given processFunc
@ -76,8 +83,10 @@ func NewScheduledWorkQueue(clock clock.Clock, processFunc ProcessFunc) Scheduled
return &scheduledWorkQueue{
processFunc: processFunc,
clock: clock,
work: make(map[interface{}]stoppable),
work: make(map[interface{}]func()),
workLock: sync.Mutex{},
afterFunc: afterFunc,
}
}
@ -87,7 +96,12 @@ func NewScheduledWorkQueue(clock clock.Clock, processFunc ProcessFunc) Scheduled
func (s *scheduledWorkQueue) Add(obj interface{}, duration time.Duration) {
s.workLock.Lock()
defer s.workLock.Unlock()
s.forget(obj)
if cancel, ok := s.work[obj]; ok {
cancel()
delete(s.work, obj)
}
s.work[obj] = afterFunc(s.clock, duration, func() {
defer s.Forget(obj)
s.processFunc(obj)
@ -98,13 +112,9 @@ func (s *scheduledWorkQueue) Add(obj interface{}, duration time.Duration) {
func (s *scheduledWorkQueue) Forget(obj interface{}) {
s.workLock.Lock()
defer s.workLock.Unlock()
s.forget(obj)
}
// forget cancels and removes an item. It *must* be called with the lock already held
func (s *scheduledWorkQueue) forget(obj interface{}) {
if timer, ok := s.work[obj]; ok {
timer.Stop()
if cancel, ok := s.work[obj]; ok {
cancel()
delete(s.work, obj)
}
}

View File

@ -39,15 +39,15 @@ func Test_afterFunc(t *testing.T) {
// huge number of afterFunc and check that the number of goroutines before
// and after more or less match.
expected := runtime.NumGoroutine()
var items []stoppable
var cancels []func()
for i := 1; i <= 10000; i++ {
items = append(items, afterFunc(clock.RealClock{}, 1*time.Hour, func() {
cancels = append(cancels, afterFunc(clock.RealClock{}, 1*time.Hour, func() {
t.Errorf("should never be called")
}))
}
for _, item := range items {
item.Stop()
for _, cancel := range cancels {
cancel()
}
// We don't know when the goroutines will actually finish.
@ -198,7 +198,7 @@ func newMockAfter() *mockAfter {
}
}
func (m *mockAfter) AfterFunc(c clock.Clock, d time.Duration, f func()) stoppable {
func (m *mockAfter) AfterFunc(c clock.Clock, d time.Duration, f func()) func() {
m.lock.Lock()
defer m.lock.Unlock()
@ -207,7 +207,9 @@ func (m *mockAfter) AfterFunc(c clock.Clock, d time.Duration, f func()) stoppabl
t: m.currentTime.Add(d),
}
m.queue = append(m.queue, item)
return item
return func() {
item.Stop()
}
}
func (m *mockAfter) warp(d time.Duration) {