Use 'clock' package in pkg/scheduler

Signed-off-by: James Munnelly <james@munnelly.eu>
This commit is contained in:
James Munnelly 2020-06-23 16:15:04 +01:00
parent 281b9ffcbd
commit 1d6424b8f2
4 changed files with 35 additions and 13 deletions

View File

@ -119,13 +119,13 @@ func (c *certificateRequestManager) Register(ctx *controllerpkg.Context) (workqu
certificateRequestInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{WorkFunc: controllerpkg.HandleOwnedResourceNamespacedFunc(log, c.queue, certificateGvk, certificateGetter(c.certificateLister))})
secretsInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{WorkFunc: secretResourceHandler(log, c.certificateLister, c.queue)})
// clock is used to determine whether certificates need renewal
c.clock = ctx.Clock
// Create a scheduled work queue that calls the ctrl.queue.Add method for
// each object in the queue. This is used to schedule re-checks of
// Certificate resources when they get near to expiry
c.scheduledWorkQueue = scheduler.NewScheduledWorkQueue(c.queue.Add)
// clock is used to determine whether certificates need renewal
c.clock = clock.RealClock{}
c.scheduledWorkQueue = scheduler.NewScheduledWorkQueue(c.clock, c.queue.Add)
// recorder records events about resources to the Kubernetes api
c.recorder = ctx.Recorder

View File

@ -5,12 +5,14 @@ go_library(
srcs = ["scheduler.go"],
importpath = "github.com/jetstack/cert-manager/pkg/scheduler",
visibility = ["//visibility:public"],
deps = ["@io_k8s_utils//clock:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = ["scheduler_test.go"],
embed = [":go_default_library"],
deps = ["@io_k8s_utils//clock:go_default_library"],
)
filegroup(

View File

@ -19,14 +19,26 @@ package scheduler
import (
"sync"
"time"
"k8s.io/utils/clock"
)
// For mocking purposes.
// This little bit of wrapping needs to be done because go doesn't do
// covariance, but it does coerce *time.Timer into stoppable implicitly if we
// write it out like so.
var afterFunc = func(d time.Duration, f func()) stoppable {
return time.AfterFunc(d, f)
var afterFunc = func(c clock.Clock, d time.Duration, f func()) stoppable {
t := c.NewTimer(d)
go func() {
defer t.Stop()
if ti := <-t.C(); ti == (time.Time{}) {
return
}
f()
}()
return t
}
// stoppable is the subset of time.Timer which we use, split out for mocking purposes
@ -51,13 +63,19 @@ type ScheduledWorkQueue interface {
type scheduledWorkQueue struct {
processFunc ProcessFunc
clock clock.Clock
work map[interface{}]stoppable
workLock sync.Mutex
}
// NewScheduledWorkQueue will create a new workqueue with the given processFunc
func NewScheduledWorkQueue(processFunc ProcessFunc) ScheduledWorkQueue {
return &scheduledWorkQueue{processFunc, make(map[interface{}]stoppable), sync.Mutex{}}
func NewScheduledWorkQueue(clock clock.Clock, processFunc ProcessFunc) ScheduledWorkQueue {
return &scheduledWorkQueue{
processFunc: processFunc,
clock: clock,
work: make(map[interface{}]stoppable),
workLock: sync.Mutex{},
}
}
// Add will add an item to this queue, executing the ProcessFunc after the
@ -67,7 +85,7 @@ func (s *scheduledWorkQueue) Add(obj interface{}, duration time.Duration) {
s.workLock.Lock()
defer s.workLock.Unlock()
s.forget(obj)
s.work[obj] = afterFunc(duration, func() {
s.work[obj] = afterFunc(s.clock, duration, func() {
defer s.Forget(obj)
s.processFunc(obj)
})

View File

@ -20,6 +20,8 @@ import (
"sync"
"testing"
"time"
"k8s.io/utils/clock"
)
func TestAdd(t *testing.T) {
@ -42,7 +44,7 @@ func TestAdd(t *testing.T) {
waitSubtest := make(chan struct{})
return func(t *testing.T) {
startTime := after.currentTime
queue := NewScheduledWorkQueue(func(obj interface{}) {
queue := NewScheduledWorkQueue(clock.RealClock{}, func(obj interface{}) {
defer wg.Done()
durationEarly := test.duration - after.currentTime.Sub(startTime)
@ -83,7 +85,7 @@ func TestForget(t *testing.T) {
t.Run(test.obj, func(test testT) func(*testing.T) {
return func(t *testing.T) {
defer wg.Done()
queue := NewScheduledWorkQueue(func(obj interface{}) {
queue := NewScheduledWorkQueue(clock.RealClock{}, func(obj interface{}) {
t.Errorf("scheduled function should never be called")
})
queue.Add(test.obj, test.duration)
@ -102,7 +104,7 @@ func TestConcurrentAdd(t *testing.T) {
after := newMockAfter()
afterFunc = after.AfterFunc
var wg sync.WaitGroup
queue := NewScheduledWorkQueue(func(obj interface{}) {
queue := NewScheduledWorkQueue(clock.RealClock{}, func(obj interface{}) {
t.Fatalf("should not be called, but was called with %v", obj)
})
@ -146,7 +148,7 @@ func newMockAfter() *mockAfter {
}
}
func (m *mockAfter) AfterFunc(d time.Duration, f func()) stoppable {
func (m *mockAfter) AfterFunc(c clock.Clock, d time.Duration, f func()) stoppable {
m.lock.Lock()
defer m.lock.Unlock()