diff --git a/pkg/controller/certificates/controller.go b/pkg/controller/certificates/controller.go index 6a8f95b16..381d1aac7 100644 --- a/pkg/controller/certificates/controller.go +++ b/pkg/controller/certificates/controller.go @@ -2,6 +2,7 @@ package certificates import ( "fmt" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -50,6 +51,7 @@ type Controller struct { queue workqueue.RateLimitingInterface scheduledWorkQueue scheduler.ScheduledWorkQueue + workerWg sync.WaitGroup } // New returns a new Certificates controller. It sets up the informer handler @@ -131,8 +133,6 @@ func (c *Controller) ingressDeleted(obj interface{}) { } func (c *Controller) Run(workers int, stopCh <-chan struct{}) error { - defer c.queue.ShutDown() - glog.V(4).Infof("Starting %s control loop", ControllerName) // wait for all the informer caches we depend to sync if !cache.WaitForCacheSync(stopCh, @@ -145,15 +145,21 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error { glog.V(4).Infof("Synced all caches for %s control loop", ControllerName) for i := 0; i < workers; i++ { + c.workerWg.Add(1) // TODO (@munnerz): make time.Second duration configurable go wait.Until(c.worker, time.Second, stopCh) } <-stopCh - glog.V(4).Infof("Shutting down queue as workqueue signalled shutdown") + glog.V(4).Infof("Shutting down queue as workqueue signaled shutdown") + c.queue.ShutDown() + glog.V(4).Infof("Waiting for workers to exit...") + c.workerWg.Wait() + glog.V(4).Infof("Workers exited.") return nil } func (c *Controller) worker() { + defer c.workerWg.Done() glog.V(4).Infof("Starting %s worker", ControllerName) for { obj, shutdown := c.queue.Get() diff --git a/pkg/controller/issuers/controller.go b/pkg/controller/issuers/controller.go index 05317706e..8948a3bdb 100644 --- a/pkg/controller/issuers/controller.go +++ b/pkg/controller/issuers/controller.go @@ -3,6 +3,7 @@ package issuers import ( "fmt" "log" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -39,7 +40,8 @@ type Controller struct { secretInformerSynced cache.InformerSynced secretLister corelisters.SecretLister - queue workqueue.RateLimitingInterface + queue workqueue.RateLimitingInterface + workerWg sync.WaitGroup } func New( @@ -89,8 +91,6 @@ func (c *Controller) secretDeleted(obj interface{}) { } func (c *Controller) Run(workers int, stopCh <-chan struct{}) error { - defer c.queue.ShutDown() - glog.V(4).Infof("Starting %s control loop", ControllerName) // wait for all the informer caches we depend on are synced if !cache.WaitForCacheSync(stopCh, c.issuerInformerSynced, c.secretInformerSynced) { @@ -99,16 +99,21 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error { } for i := 0; i < workers; i++ { + c.workerWg.Add(1) // TODO (@munnerz): make time.Second duration configurable go wait.Until(c.worker, time.Second, stopCh) } - <-stopCh - log.Printf("shutting down queue as workqueue signalled shutdown") + glog.V(4).Infof("Shutting down queue as workqueue signaled shutdown") + c.queue.ShutDown() + glog.V(4).Infof("Waiting for workers to exit...") + c.workerWg.Wait() + glog.V(4).Infof("Workers exited.") return nil } func (c *Controller) worker() { + defer c.workerWg.Done() log.Printf("starting worker") for { obj, shutdown := c.queue.Get()