memory leak: clean up scheduler goroutine on certificate deletion
Signed-off-by: Maël Valais <mael@vls.dev>
This commit is contained in:
parent
8e872632f4
commit
641960b666
@ -158,6 +158,13 @@ func NewController(
|
||||
}
|
||||
|
||||
func (c *controller) ProcessItem(ctx context.Context, key string) error {
|
||||
// We don't want to leak goroutines, so let's remove this key from the
|
||||
// scheduled queue in case it has been scheduled. We only need to
|
||||
// un-schedule the key for 'Deleted' events, but ProcessItem does not allow
|
||||
// us to distinguish between 'Added', 'Updated' and 'Deleted'. Note that
|
||||
// there is no unit test around this.
|
||||
c.scheduledWorkQueue.Forget(key)
|
||||
|
||||
log := logf.FromContext(ctx)
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
|
||||
@ -124,6 +124,13 @@ func NewController(
|
||||
}
|
||||
|
||||
func (c *controller) ProcessItem(ctx context.Context, key string) error {
|
||||
// We don't want to leak goroutines, so let's remove this key from the
|
||||
// scheduled queue in case it has been scheduled. We only need to
|
||||
// un-schedule the key for 'Deleted' events, but ProcessItem does not allow
|
||||
// us to distinguish between 'Added', 'Updated' and 'Deleted'. Note that
|
||||
// there is no unit test around this.
|
||||
c.scheduledWorkQueue.Forget(key)
|
||||
|
||||
log := logf.FromContext(ctx).WithValues("key", key)
|
||||
ctx = logf.NewContext(ctx, log)
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
|
||||
@ -23,36 +23,6 @@ import (
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
// We are writting our own time.AfterFunc to be able to mock the clock. The
|
||||
// cancel function can be called concurrently.
|
||||
func afterFunc(c clock.Clock, d time.Duration, f func()) (cancel func()) {
|
||||
t := c.NewTimer(d)
|
||||
cancelCh := make(chan struct{})
|
||||
cancelOnce := sync.Once{}
|
||||
cancel = func() {
|
||||
t.Stop()
|
||||
cancelOnce.Do(func() {
|
||||
close(cancelCh)
|
||||
})
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case <-t.C():
|
||||
// We don't need to check whether the channel has returned a zero
|
||||
// value since t.C is never closed as per the timer.Stop
|
||||
// documentation.
|
||||
f()
|
||||
case <-cancelCh:
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
return cancel
|
||||
}
|
||||
|
||||
// ProcessFunc is a function to process an item in the work queue.
|
||||
type ProcessFunc func(interface{})
|
||||
|
||||
@ -118,3 +88,34 @@ func (s *scheduledWorkQueue) Forget(obj interface{}) {
|
||||
delete(s.work, obj)
|
||||
}
|
||||
}
|
||||
|
||||
// We are writting our own time.AfterFunc to be able to mock the clock. The
|
||||
// cancel function can be called concurrently.
|
||||
func afterFunc(c clock.Clock, d time.Duration, f func()) (cancel func()) {
|
||||
t := c.NewTimer(d)
|
||||
|
||||
// The caller expects `cancel` to stop the goroutine. Since `t.C` is not
|
||||
// closed after calling `t.Stop`, we need to create our own `cancelCh`
|
||||
// channel to stop the goroutine.
|
||||
cancelCh := make(chan struct{})
|
||||
cancelOnce := sync.Once{}
|
||||
cancel = func() {
|
||||
t.Stop()
|
||||
cancelOnce.Do(func() {
|
||||
close(cancelCh)
|
||||
})
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case <-t.C():
|
||||
f()
|
||||
case <-cancelCh:
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
return cancel
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user