Update clusterissuers controller

Signed-off-by: James Munnelly <james@munnelly.eu>
This commit is contained in:
James Munnelly 2019-02-28 22:25:19 +00:00
parent 6b24e9c966
commit c6c498338a
3 changed files with 38 additions and 28 deletions

View File

@ -16,17 +16,15 @@ go_library(
"//pkg/client/listers/certmanager/v1alpha1:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/issuer:go_default_library",
"//pkg/util:go_default_library",
"//pkg/logs:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -24,20 +24,19 @@ import (
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
cmlisters "github.com/jetstack/cert-manager/pkg/client/listers/certmanager/v1alpha1"
controllerpkg "github.com/jetstack/cert-manager/pkg/controller"
"github.com/jetstack/cert-manager/pkg/issuer"
"github.com/jetstack/cert-manager/pkg/util"
logf "github.com/jetstack/cert-manager/pkg/logs"
)
type Controller struct {
ctx context.Context
controllerpkg.Context
issuerFactory issuer.IssuerFactory
@ -66,28 +65,34 @@ func New(ctx *controllerpkg.Context) *Controller {
ctrl.watchedInformers = append(ctrl.watchedInformers, secretsInformer.Informer().HasSynced)
ctrl.secretLister = secretsInformer.Lister()
ctrl.issuerFactory = issuer.NewIssuerFactory(ctx)
ctrl.ctx = logf.NewContext(ctx.RootContext, nil, ControllerName)
return ctrl
}
// TODO: replace with generic handleObjet function (like Navigator)
func (c *Controller) secretDeleted(obj interface{}) {
log := logf.FromContext(c.ctx, "secretDeleted")
var secret *corev1.Secret
var ok bool
secret, ok = obj.(*corev1.Secret)
if !ok {
runtime.HandleError(fmt.Errorf("Object was not a Secret object %#v", obj))
log.Error(nil, "object was not a Secret object")
return
}
log = logf.WithResource(log, secret)
issuers, err := c.issuersForSecret(secret)
if err != nil {
runtime.HandleError(fmt.Errorf("Error looking up issuers observing Secret: %s/%s", secret.Namespace, secret.Name))
log.Error(err, "error looking up issuers observing secret")
return
}
for _, iss := range issuers {
log := logf.WithRelatedResource(log, iss)
key, err := keyFunc(iss)
if err != nil {
runtime.HandleError(err)
log.Error(err, "error computing key for resource")
continue
}
c.queue.AddRateLimited(key)
@ -95,7 +100,11 @@ func (c *Controller) secretDeleted(obj interface{}) {
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
klog.V(4).Infof("Starting %s control loop", ControllerName)
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
log := logf.FromContext(ctx)
log.Info("starting control loop")
// wait for all the informer caches we depend on are synced
if !cache.WaitForCacheSync(stopCh, c.watchedInformers...) {
// TODO: replace with Errorf call to glog
@ -108,20 +117,21 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
// TODO (@munnerz): make time.Second duration configurable
go wait.Until(func() {
defer wg.Done()
c.worker(stopCh)
c.worker(ctx)
}, time.Second, stopCh)
}
<-stopCh
klog.V(4).Infof("Shutting down queue as workqueue signaled shutdown")
log.V(logf.DebugLevel).Info("shutting down queue as workqueue signaled shutdown")
c.queue.ShutDown()
klog.V(4).Infof("Waiting for workers to exit...")
log.V(logf.DebugLevel).Info("waiting for workers to exit...")
wg.Wait()
klog.V(4).Infof("Workers exited.")
log.V(logf.DebugLevel).Info("workers exited.")
return nil
}
func (c *Controller) worker(stopCh <-chan struct{}) {
klog.V(4).Infof("Starting %q worker", ControllerName)
func (c *Controller) worker(ctx context.Context) {
log := logf.FromContext(ctx)
log.V(logf.DebugLevel).Info("starting worker")
for {
obj, shutdown := c.queue.Get()
if shutdown {
@ -136,40 +146,40 @@ func (c *Controller) worker(stopCh <-chan struct{}) {
if key, ok = obj.(string); !ok {
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = util.ContextWithStopCh(ctx, stopCh)
klog.Infof("%s controller: syncing item '%s'", ControllerName, key)
log := log.WithValues("key", key)
log.Info("syncing resource")
if err := c.syncHandler(ctx, key); err != nil {
klog.Errorf("%s controller: Re-queuing item %q due to error processing: %s", ControllerName, key, err.Error())
log.Error(err, "re-queuing item due to error processing")
c.queue.AddRateLimited(obj)
return
}
klog.Infof("%s controller: Finished processing work item %q", ControllerName, key)
log.Info("finished processing work item")
c.queue.Forget(obj)
}()
}
klog.V(4).Infof("Exiting %q worker loop", ControllerName)
log.V(logf.DebugLevel).Info("exiting worker loop")
}
func (c *Controller) processNextWorkItem(ctx context.Context, key string) error {
log := logf.FromContext(ctx)
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
log.Error(nil, "invalid resource key")
return nil
}
issuer, err := c.clusterIssuerLister.Get(name)
if err != nil {
if k8sErrors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("issuer %q in work queue no longer exists", key))
log.Error(err, "clusterissuer in work queue no longer exists")
return nil
}
return err
}
ctx = logf.NewContext(ctx, logf.WithResource(log, issuer))
return c.Sync(ctx, issuer)
}

View File

@ -23,11 +23,11 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog"
apiutil "github.com/jetstack/cert-manager/pkg/api/util"
"github.com/jetstack/cert-manager/pkg/apis/certmanager/v1alpha1"
"github.com/jetstack/cert-manager/pkg/apis/certmanager/validation"
logf "github.com/jetstack/cert-manager/pkg/logs"
)
const (
@ -38,6 +38,8 @@ const (
)
func (c *Controller) Sync(ctx context.Context, iss *v1alpha1.ClusterIssuer) (err error) {
log := logf.FromContext(ctx)
issuerCopy := iss.DeepCopy()
defer func() {
if _, saveErr := c.updateIssuerStatus(iss, issuerCopy); saveErr != nil {
@ -71,7 +73,7 @@ func (c *Controller) Sync(ctx context.Context, iss *v1alpha1.ClusterIssuer) (err
err = i.Setup(ctx)
if err != nil {
s := messageErrorInitIssuer + err.Error()
klog.Info(s)
log.Error(err, "error setting up issuer")
c.Recorder.Event(issuerCopy, v1.EventTypeWarning, errorInitIssuer, s)
return err
}