cert-manager/pkg/controller/base.go
2017-07-19 20:29:02 +01:00

63 lines
1.6 KiB
Go

package controller
import (
"reflect"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type Base struct {
Context Context
// TODO (@munnerz): come up with some way to swap out this queue type
Queue workqueue.RateLimitingInterface
Worker func() bool
hasSynced []cache.InformerSynced
}
func (b *Base) AddHandler(informer cache.SharedIndexInformer) {
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: b.Queue.Add,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
b.Queue.Add(cur)
}
},
DeleteFunc: b.Queue.Add,
})
b.hasSynced = append(b.hasSynced, informer.HasSynced)
}
// Run will start this controllers run loop, with the specified number of
// worker goroutines. It will block until a message is placed onto the stopCh.
func (b *Base) Run(workers int, stopCh <-chan struct{}) {
defer b.Queue.ShutDown()
b.Context.Logger.Printf("Starting control loop")
// wait for all the informer caches we depend on are synced
if !cache.WaitForCacheSync(stopCh, b.hasSynced...) {
b.Context.Logger.Errorf("error waiting for informer caches to sync")
return
}
for i := 0; i < workers; i++ {
// TODO (@munnerz): make time.Second duration configurable
go wait.Until(b.worker, time.Second, stopCh)
}
<-stopCh
b.Context.Logger.Printf("shutting down queue as workqueue signalled shutdown")
}
func (b *Base) worker() {
b.Context.Logger.Printf("starting worker")
for b.Worker() {
b.Context.Logger.Printf("finished processing work item")
}
b.Context.Logger.Printf("exiting worker loop")
}