Wait for workers to process their work before exit

This commit is contained in:
James Munnelly 2017-09-09 01:48:10 +01:00
parent 6cfaf3d012
commit b53ede4e5a
2 changed files with 19 additions and 8 deletions

View File

@ -2,6 +2,7 @@ package certificates
import (
"fmt"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
@ -50,6 +51,7 @@ type Controller struct {
queue workqueue.RateLimitingInterface
scheduledWorkQueue scheduler.ScheduledWorkQueue
workerWg sync.WaitGroup
}
// New returns a new Certificates controller. It sets up the informer handler
@ -131,8 +133,6 @@ func (c *Controller) ingressDeleted(obj interface{}) {
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
defer c.queue.ShutDown()
glog.V(4).Infof("Starting %s control loop", ControllerName)
// wait for all the informer caches we depend to sync
if !cache.WaitForCacheSync(stopCh,
@ -145,15 +145,21 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
glog.V(4).Infof("Synced all caches for %s control loop", ControllerName)
for i := 0; i < workers; i++ {
c.workerWg.Add(1)
// TODO (@munnerz): make time.Second duration configurable
go wait.Until(c.worker, time.Second, stopCh)
}
<-stopCh
glog.V(4).Infof("Shutting down queue as workqueue signalled shutdown")
glog.V(4).Infof("Shutting down queue as workqueue signaled shutdown")
c.queue.ShutDown()
glog.V(4).Infof("Waiting for workers to exit...")
c.workerWg.Wait()
glog.V(4).Infof("Workers exited.")
return nil
}
func (c *Controller) worker() {
defer c.workerWg.Done()
glog.V(4).Infof("Starting %s worker", ControllerName)
for {
obj, shutdown := c.queue.Get()

View File

@ -3,6 +3,7 @@ package issuers
import (
"fmt"
"log"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
@ -39,7 +40,8 @@ type Controller struct {
secretInformerSynced cache.InformerSynced
secretLister corelisters.SecretLister
queue workqueue.RateLimitingInterface
queue workqueue.RateLimitingInterface
workerWg sync.WaitGroup
}
func New(
@ -89,8 +91,6 @@ func (c *Controller) secretDeleted(obj interface{}) {
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
defer c.queue.ShutDown()
glog.V(4).Infof("Starting %s control loop", ControllerName)
// wait for all the informer caches we depend on are synced
if !cache.WaitForCacheSync(stopCh, c.issuerInformerSynced, c.secretInformerSynced) {
@ -99,16 +99,21 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
}
for i := 0; i < workers; i++ {
c.workerWg.Add(1)
// TODO (@munnerz): make time.Second duration configurable
go wait.Until(c.worker, time.Second, stopCh)
}
<-stopCh
log.Printf("shutting down queue as workqueue signalled shutdown")
glog.V(4).Infof("Shutting down queue as workqueue signaled shutdown")
c.queue.ShutDown()
glog.V(4).Infof("Waiting for workers to exit...")
c.workerWg.Wait()
glog.V(4).Infof("Workers exited.")
return nil
}
func (c *Controller) worker() {
defer c.workerWg.Done()
log.Printf("starting worker")
for {
obj, shutdown := c.queue.Get()