diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index b3ff3c29a..ac99ef5d0 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -5,10 +5,18 @@ import ( "time" ) +// ProcessFunc is a function to process an item in the work queue. type ProcessFunc func(interface{}) +// ScheduledWorkQueue is an interface to describe a queue that will execute the +// given ProcessFunc with the object given to Add once the time.Duration is up, +// since the time of calling Add. type ScheduledWorkQueue interface { + // Add will add an item to this queue, executing the ProcessFunc after the + // Duration has come (since the time Add was called). If an existing Timer + // for obj already exists, the previous timer will be cancelled. Add(interface{}, time.Duration) + // Forget will cancel the timer for the given object, if the timer exists. Forget(interface{}) } @@ -18,10 +26,14 @@ type scheduledWorkQueue struct { workLock sync.Mutex } +// NewScheduledWorkQueue will create a new workqueue with the given processFunc func NewScheduledWorkQueue(processFunc ProcessFunc) ScheduledWorkQueue { return &scheduledWorkQueue{processFunc, make(map[interface{}]*time.Timer), sync.Mutex{}} } +// Add will add an item to this queue, executing the ProcessFunc after the +// Duration has come (since the time Add was called). If an existing Timer for +// obj already exists, the previous timer will be cancelled. func (s *scheduledWorkQueue) Add(obj interface{}, duration time.Duration) { s.clearTimer(obj) s.work[obj] = time.AfterFunc(duration, func() { @@ -30,6 +42,7 @@ func (s *scheduledWorkQueue) Add(obj interface{}, duration time.Duration) { }) } +// Forget will cancel the timer for the given object, if the timer exists. func (s *scheduledWorkQueue) Forget(obj interface{}) { s.clearTimer(obj) }