176 lines
3.8 KiB
Go
176 lines
3.8 KiB
Go
/*
|
|
Copyright 2019 The Jetstack cert-manager contributors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package scheduler
|
|
|
|
import (
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestAdd(t *testing.T) {
|
|
after := newMockAfter()
|
|
afterFunc = after.AfterFunc
|
|
|
|
var wg sync.WaitGroup
|
|
type testT struct {
|
|
obj string
|
|
duration time.Duration
|
|
}
|
|
tests := []testT{
|
|
{"test500", time.Millisecond * 500},
|
|
{"test1000", time.Second * 1},
|
|
{"test3000", time.Second * 3},
|
|
}
|
|
for _, test := range tests {
|
|
wg.Add(1)
|
|
t.Run(test.obj, func(test testT) func(*testing.T) {
|
|
waitSubtest := make(chan struct{})
|
|
return func(t *testing.T) {
|
|
startTime := after.currentTime
|
|
queue := NewScheduledWorkQueue(func(obj interface{}) {
|
|
defer wg.Done()
|
|
durationEarly := test.duration - after.currentTime.Sub(startTime)
|
|
|
|
if durationEarly > 0 {
|
|
t.Errorf("got queue item %.2f seconds too early", float64(durationEarly)/float64(time.Second))
|
|
}
|
|
if obj != test.obj {
|
|
t.Errorf("expected obj '%+v' but got obj '%+v'", test.obj, obj)
|
|
}
|
|
waitSubtest <- struct{}{}
|
|
})
|
|
queue.Add(test.obj, test.duration)
|
|
after.warp(test.duration + time.Millisecond)
|
|
<-waitSubtest
|
|
}
|
|
}(test))
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestForget(t *testing.T) {
|
|
after := newMockAfter()
|
|
afterFunc = after.AfterFunc
|
|
|
|
var wg sync.WaitGroup
|
|
type testT struct {
|
|
obj string
|
|
duration time.Duration
|
|
}
|
|
tests := []testT{
|
|
{"test500", time.Millisecond * 500},
|
|
{"test1000", time.Second * 1},
|
|
{"test3000", time.Second * 3},
|
|
}
|
|
for _, test := range tests {
|
|
wg.Add(1)
|
|
t.Run(test.obj, func(test testT) func(*testing.T) {
|
|
return func(t *testing.T) {
|
|
defer wg.Done()
|
|
queue := NewScheduledWorkQueue(func(obj interface{}) {
|
|
t.Errorf("scheduled function should never be called")
|
|
})
|
|
queue.Add(test.obj, test.duration)
|
|
queue.Forget(test.obj)
|
|
after.warp(test.duration * 2)
|
|
}
|
|
}(test))
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
// TestConcurrentAdd checks that if we add the same item concurrently, it
|
|
// doesn't end up hitting a data-race / leaking a timer.
|
|
func TestConcurrentAdd(t *testing.T) {
|
|
after := newMockAfter()
|
|
afterFunc = after.AfterFunc
|
|
var wg sync.WaitGroup
|
|
queue := NewScheduledWorkQueue(func(obj interface{}) {
|
|
t.Fatalf("should not be called, but was called with %v", obj)
|
|
})
|
|
|
|
for i := 0; i < 1000; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
queue.Add(1, 1*time.Second)
|
|
wg.Done()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
queue.Forget(1)
|
|
after.warp(5 * time.Second)
|
|
}
|
|
|
|
type timerQueueItem struct {
|
|
f func()
|
|
t time.Time
|
|
run bool
|
|
stopped bool
|
|
}
|
|
|
|
func (tq *timerQueueItem) Stop() bool {
|
|
stopped := tq.stopped
|
|
tq.stopped = true
|
|
return stopped
|
|
}
|
|
|
|
type mockAfter struct {
|
|
lock *sync.Mutex
|
|
startTime time.Time
|
|
currentTime time.Time
|
|
queue []*timerQueueItem
|
|
}
|
|
|
|
func newMockAfter() *mockAfter {
|
|
return &mockAfter{
|
|
queue: make([]*timerQueueItem, 0),
|
|
lock: &sync.Mutex{},
|
|
}
|
|
}
|
|
|
|
func (m *mockAfter) AfterFunc(d time.Duration, f func()) stoppable {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
|
|
item := &timerQueueItem{
|
|
f: f,
|
|
t: m.currentTime.Add(d),
|
|
}
|
|
m.queue = append(m.queue, item)
|
|
return item
|
|
}
|
|
|
|
func (m *mockAfter) warp(d time.Duration) {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
m.currentTime = m.currentTime.Add(d)
|
|
for _, item := range m.queue {
|
|
if item.run || item.stopped {
|
|
continue
|
|
}
|
|
|
|
if item.t.Before(m.currentTime) {
|
|
item.run = true
|
|
go item.f()
|
|
}
|
|
}
|
|
}
|