diff --git a/pkg/controller/certificates/controller.go b/pkg/controller/certificates/controller.go index 1ea21a8fe..000de0acf 100644 --- a/pkg/controller/certificates/controller.go +++ b/pkg/controller/certificates/controller.go @@ -119,13 +119,13 @@ func (c *certificateRequestManager) Register(ctx *controllerpkg.Context) (workqu certificateRequestInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{WorkFunc: controllerpkg.HandleOwnedResourceNamespacedFunc(log, c.queue, certificateGvk, certificateGetter(c.certificateLister))}) secretsInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{WorkFunc: secretResourceHandler(log, c.certificateLister, c.queue)}) + // clock is used to determine whether certificates need renewal + c.clock = ctx.Clock + // Create a scheduled work queue that calls the ctrl.queue.Add method for // each object in the queue. This is used to schedule re-checks of // Certificate resources when they get near to expiry - c.scheduledWorkQueue = scheduler.NewScheduledWorkQueue(c.queue.Add) - - // clock is used to determine whether certificates need renewal - c.clock = clock.RealClock{} + c.scheduledWorkQueue = scheduler.NewScheduledWorkQueue(c.clock, c.queue.Add) // recorder records events about resources to the Kubernetes api c.recorder = ctx.Recorder diff --git a/pkg/scheduler/BUILD.bazel b/pkg/scheduler/BUILD.bazel index 20810b4c4..fbc03a1cb 100644 --- a/pkg/scheduler/BUILD.bazel +++ b/pkg/scheduler/BUILD.bazel @@ -5,12 +5,14 @@ go_library( srcs = ["scheduler.go"], importpath = "github.com/jetstack/cert-manager/pkg/scheduler", visibility = ["//visibility:public"], + deps = ["@io_k8s_utils//clock:go_default_library"], ) go_test( name = "go_default_test", srcs = ["scheduler_test.go"], embed = [":go_default_library"], + deps = ["@io_k8s_utils//clock:go_default_library"], ) filegroup( diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 7142a56ee..b31e5dbb7 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -19,14 +19,26 @@ package scheduler import ( "sync" "time" + + "k8s.io/utils/clock" ) // For mocking purposes. // This little bit of wrapping needs to be done because go doesn't do // covariance, but it does coerce *time.Timer into stoppable implicitly if we // write it out like so. -var afterFunc = func(d time.Duration, f func()) stoppable { - return time.AfterFunc(d, f) +var afterFunc = func(c clock.Clock, d time.Duration, f func()) stoppable { + t := c.NewTimer(d) + + go func() { + defer t.Stop() + if ti := <-t.C(); ti == (time.Time{}) { + return + } + f() + }() + + return t } // stoppable is the subset of time.Timer which we use, split out for mocking purposes @@ -51,13 +63,19 @@ type ScheduledWorkQueue interface { type scheduledWorkQueue struct { processFunc ProcessFunc + clock clock.Clock work map[interface{}]stoppable workLock sync.Mutex } // NewScheduledWorkQueue will create a new workqueue with the given processFunc -func NewScheduledWorkQueue(processFunc ProcessFunc) ScheduledWorkQueue { - return &scheduledWorkQueue{processFunc, make(map[interface{}]stoppable), sync.Mutex{}} +func NewScheduledWorkQueue(clock clock.Clock, processFunc ProcessFunc) ScheduledWorkQueue { + return &scheduledWorkQueue{ + processFunc: processFunc, + clock: clock, + work: make(map[interface{}]stoppable), + workLock: sync.Mutex{}, + } } // Add will add an item to this queue, executing the ProcessFunc after the @@ -67,7 +85,7 @@ func (s *scheduledWorkQueue) Add(obj interface{}, duration time.Duration) { s.workLock.Lock() defer s.workLock.Unlock() s.forget(obj) - s.work[obj] = afterFunc(duration, func() { + s.work[obj] = afterFunc(s.clock, duration, func() { defer s.Forget(obj) s.processFunc(obj) }) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 06d7c900d..33cfd1b1f 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -20,6 +20,8 @@ import ( "sync" "testing" "time" + + "k8s.io/utils/clock" ) func TestAdd(t *testing.T) { @@ -42,7 +44,7 @@ func TestAdd(t *testing.T) { waitSubtest := make(chan struct{}) return func(t *testing.T) { startTime := after.currentTime - queue := NewScheduledWorkQueue(func(obj interface{}) { + queue := NewScheduledWorkQueue(clock.RealClock{}, func(obj interface{}) { defer wg.Done() durationEarly := test.duration - after.currentTime.Sub(startTime) @@ -83,7 +85,7 @@ func TestForget(t *testing.T) { t.Run(test.obj, func(test testT) func(*testing.T) { return func(t *testing.T) { defer wg.Done() - queue := NewScheduledWorkQueue(func(obj interface{}) { + queue := NewScheduledWorkQueue(clock.RealClock{}, func(obj interface{}) { t.Errorf("scheduled function should never be called") }) queue.Add(test.obj, test.duration) @@ -102,7 +104,7 @@ func TestConcurrentAdd(t *testing.T) { after := newMockAfter() afterFunc = after.AfterFunc var wg sync.WaitGroup - queue := NewScheduledWorkQueue(func(obj interface{}) { + queue := NewScheduledWorkQueue(clock.RealClock{}, func(obj interface{}) { t.Fatalf("should not be called, but was called with %v", obj) }) @@ -146,7 +148,7 @@ func newMockAfter() *mockAfter { } } -func (m *mockAfter) AfterFunc(d time.Duration, f func()) stoppable { +func (m *mockAfter) AfterFunc(c clock.Clock, d time.Duration, f func()) stoppable { m.lock.Lock() defer m.lock.Unlock()