memory leak: clean up scheduler goroutine on certificate deletion

Signed-off-by: Maël Valais <mael@vls.dev>
This commit is contained in:
Maël Valais 2021-07-20 16:15:39 +02:00
parent 8e872632f4
commit 641960b666
3 changed files with 45 additions and 30 deletions

View File

@ -158,6 +158,13 @@ func NewController(
}
func (c *controller) ProcessItem(ctx context.Context, key string) error {
// We don't want to leak goroutines, so let's remove this key from the
// scheduled queue in case it has been scheduled. We only need to
// un-schedule the key for 'Deleted' events, but ProcessItem does not allow
// us to distinguish between 'Added', 'Updated' and 'Deleted'. Note that
// there is no unit test around this.
c.scheduledWorkQueue.Forget(key)
log := logf.FromContext(ctx)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {

View File

@ -124,6 +124,13 @@ func NewController(
}
func (c *controller) ProcessItem(ctx context.Context, key string) error {
// We don't want to leak goroutines, so let's remove this key from the
// scheduled queue in case it has been scheduled. We only need to
// un-schedule the key for 'Deleted' events, but ProcessItem does not allow
// us to distinguish between 'Added', 'Updated' and 'Deleted'. Note that
// there is no unit test around this.
c.scheduledWorkQueue.Forget(key)
log := logf.FromContext(ctx).WithValues("key", key)
ctx = logf.NewContext(ctx, log)
namespace, name, err := cache.SplitMetaNamespaceKey(key)

View File

@ -23,36 +23,6 @@ import (
"k8s.io/utils/clock"
)
// We are writting our own time.AfterFunc to be able to mock the clock. The
// cancel function can be called concurrently.
func afterFunc(c clock.Clock, d time.Duration, f func()) (cancel func()) {
t := c.NewTimer(d)
cancelCh := make(chan struct{})
cancelOnce := sync.Once{}
cancel = func() {
t.Stop()
cancelOnce.Do(func() {
close(cancelCh)
})
}
go func() {
defer cancel()
select {
case <-t.C():
// We don't need to check whether the channel has returned a zero
// value since t.C is never closed as per the timer.Stop
// documentation.
f()
case <-cancelCh:
return
}
}()
return cancel
}
// ProcessFunc is a function to process an item in the work queue.
type ProcessFunc func(interface{})
@ -118,3 +88,34 @@ func (s *scheduledWorkQueue) Forget(obj interface{}) {
delete(s.work, obj)
}
}
// We are writting our own time.AfterFunc to be able to mock the clock. The
// cancel function can be called concurrently.
func afterFunc(c clock.Clock, d time.Duration, f func()) (cancel func()) {
t := c.NewTimer(d)
// The caller expects `cancel` to stop the goroutine. Since `t.C` is not
// closed after calling `t.Stop`, we need to create our own `cancelCh`
// channel to stop the goroutine.
cancelCh := make(chan struct{})
cancelOnce := sync.Once{}
cancel = func() {
t.Stop()
cancelOnce.Do(func() {
close(cancelCh)
})
}
go func() {
defer cancel()
select {
case <-t.C():
f()
case <-cancelCh:
return
}
}()
return cancel
}