From 9d8070d9af9efc1928dceb1de10f528f63b67ff9 Mon Sep 17 00:00:00 2001 From: Michael Tsang Date: Thu, 23 May 2019 13:44:57 +0100 Subject: [PATCH] Add base controller Signed-off-by: Michael Tsang --- pkg/controller/BUILD.bazel | 3 + pkg/controller/base_controller.go | 128 ++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 pkg/controller/base_controller.go diff --git a/pkg/controller/BUILD.bazel b/pkg/controller/BUILD.bazel index 80db00715..1f98604ad 100644 --- a/pkg/controller/BUILD.bazel +++ b/pkg/controller/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "base_controller.go", "context.go", "helper.go", "register.go", @@ -14,8 +15,10 @@ go_library( "//pkg/apis/certmanager/v1alpha1:go_default_library", "//pkg/client/clientset/versioned:go_default_library", "//pkg/client/informers/externalversions:go_default_library", + "//pkg/logs:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", diff --git a/pkg/controller/base_controller.go b/pkg/controller/base_controller.go new file mode 100644 index 000000000..4e39fc334 --- /dev/null +++ b/pkg/controller/base_controller.go @@ -0,0 +1,128 @@ +/* +Copyright 2019 The Jetstack cert-manager contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + logf "github.com/jetstack/cert-manager/pkg/logs" +) + +type BaseController struct { + // the controllers root context, containing a controller scoped logger + Ctx context.Context + *Context + + // To allow injection for testing. + syncHandler func(ctx context.Context, key string) error + + watchedInformers []cache.InformerSynced + Queue workqueue.RateLimitingInterface +} + +func New(ctx *Context, controllerName string, syncHandler func(ctx context.Context, key string) error) *BaseController { + bctrl := &BaseController{Context: ctx} + bctrl.syncHandler = syncHandler + bctrl.Ctx = logf.NewContext(ctx.RootContext, nil, controllerName) + return bctrl +} + +func (bctrl *BaseController) AddQueuing(rateLimiter workqueue.RateLimiter, name string, informer cache.SharedIndexInformer) { + bctrl.Queue = workqueue.NewNamedRateLimitingQueue(rateLimiter, name) + informer.AddEventHandler(&QueuingEventHandler{Queue: bctrl.Queue}) + bctrl.watchedInformers = append(bctrl.watchedInformers, informer.HasSynced) +} + +func (bctrl *BaseController) AddHandled(informer cache.SharedIndexInformer, handler cache.ResourceEventHandler) { + informer.AddEventHandler(handler) + bctrl.watchedInformers = append(bctrl.watchedInformers, informer.HasSynced) +} + +func (bctrl *BaseController) AddWatched(informers ...cache.SharedIndexInformer) { + for _, informer := range informers { + bctrl.watchedInformers = append(bctrl.watchedInformers, informer.HasSynced) + } +} + +func (bc *BaseController) Run(workers int, stopCh <-chan struct{}) error { + ctx, cancel := context.WithCancel(bc.Ctx) + defer cancel() + log := logf.FromContext(ctx) + + log.Info("starting control loop") + // wait for all the informer caches we depend on are synced + if !cache.WaitForCacheSync(stopCh, bc.watchedInformers...) { + // TODO: replace with Errorf call to glog + return fmt.Errorf("error waiting for informer caches to sync") + } + + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Add(1) + // TODO (@munnerz): make time.Second duration configurable + go wait.Until(func() { + defer wg.Done() + bc.worker(ctx) + }, time.Second, stopCh) + } + <-stopCh + log.Info("shutting down queue as workqueue signaled shutdown") + bc.Queue.ShutDown() + log.V(logf.DebugLevel).Info("waiting for workers to exit...") + wg.Wait() + log.V(logf.DebugLevel).Info("workers exited") + return nil +} + +func (bc *BaseController) worker(ctx context.Context) { + log := logf.FromContext(bc.Ctx) + + log.V(logf.DebugLevel).Info("starting worker") + for { + obj, shutdown := bc.Queue.Get() + if shutdown { + break + } + + var key string + // use an inlined function so we can use defer + func() { + defer bc.Queue.Done(obj) + var ok bool + if key, ok = obj.(string); !ok { + return + } + log := log.WithValues("key", key) + log.Info("syncing item") + if err := bc.syncHandler(ctx, key); err != nil { + log.Error(err, "re-queuing item due to error processing") + bc.Queue.AddRateLimited(obj) + return + } + log.Info("finished processing work item") + bc.Queue.Forget(obj) + }() + } + log.V(logf.DebugLevel).Info("exiting worker loop") +}