Update acmechallenges controller

Signed-off-by: James Munnelly <james@munnelly.eu>
This commit is contained in:
James Munnelly 2019-03-01 00:15:36 +00:00
parent f4c0de2627
commit eaeefdf5b2
3 changed files with 52 additions and 44 deletions

View File

@ -20,17 +20,15 @@ go_library(
"//pkg/issuer/acme/dns:go_default_library",
"//pkg/issuer/acme/dns/util:go_default_library",
"//pkg/issuer/acme/http:go_default_library",
"//pkg/util:go_default_library",
"//pkg/logs:go_default_library",
"//third_party/crypto/acme:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -24,12 +24,10 @@ import (
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"github.com/jetstack/cert-manager/pkg/acme"
cmlisters "github.com/jetstack/cert-manager/pkg/client/listers/certmanager/v1alpha1"
@ -38,10 +36,12 @@ import (
"github.com/jetstack/cert-manager/pkg/issuer"
"github.com/jetstack/cert-manager/pkg/issuer/acme/dns"
"github.com/jetstack/cert-manager/pkg/issuer/acme/http"
"github.com/jetstack/cert-manager/pkg/util"
logf "github.com/jetstack/cert-manager/pkg/logs"
)
type Controller struct {
// the controllers root context, containing a controller scoped logger
ctx context.Context
controllerpkg.Context
helper issuer.Helper
@ -108,12 +108,17 @@ func New(ctx *controllerpkg.Context) *Controller {
ctrl.httpSolver = http.NewSolver(ctx)
ctrl.dnsSolver = dns.NewSolver(ctx)
ctrl.scheduler = scheduler.New(ctrl.challengeLister)
ctrl.ctx = logf.NewContext(ctx.RootContext, nil, ControllerName)
return ctrl
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
klog.V(4).Infof("Starting %s control loop", ControllerName)
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
log := logf.FromContext(ctx)
log.V(logf.DebugLevel).Info("starting control loop")
// wait for all the informer caches we depend on are synced
if !cache.WaitForCacheSync(stopCh, c.watchedInformers...) {
// c.challengeInformerSynced) {
@ -126,20 +131,20 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
// TODO (@munnerz): make time.Second duration configurable
go wait.Until(func() {
defer wg.Done()
c.worker(stopCh)
c.worker(ctx)
},
time.Second, stopCh)
}
// TODO: properly plumb in stopCh and WaitGroup to scheduler
// Run the scheduler once per second
go wait.Until(c.runScheduler, time.Second*1, stopCh)
go wait.Until(func() { c.runScheduler(ctx) }, time.Second*1, stopCh)
<-stopCh
klog.V(4).Infof("Shutting down queue as workqueue signaled shutdown")
log.V(logf.DebugLevel).Info("shutting down queue as workqueue signaled shutdown")
c.queue.ShutDown()
klog.V(4).Infof("Waiting for workers to exit...")
log.V(logf.DebugLevel).Info("waiting for workers to exit")
wg.Wait()
klog.V(4).Infof("Workers exited.")
log.V(logf.DebugLevel).Info("workers exited")
return nil
}
@ -155,20 +160,23 @@ const MaxChallengesPerSchedule = 20
// which, if any, challenges should be rescheduled.
// TODO: it should also only re-run the scheduler if a change to challenges has
// been observed, to save needless work
func (c *Controller) runScheduler() {
func (c *Controller) runScheduler(ctx context.Context) {
log := logf.FromContext(ctx, "scheduler")
toSchedule, err := c.scheduler.ScheduleN(MaxChallengesPerSchedule)
if err != nil {
runtime.HandleError(fmt.Errorf("Error determining set of challenges that should be scheduled for processing: %v", err))
log.Error(err, "error determining set of challenges that should be scheduled for processing")
return
}
for _, ch := range toSchedule {
log := logf.WithResource(log, ch)
ch = ch.DeepCopy()
ch.Status.Processing = true
_, err := c.CMClient.CertmanagerV1alpha1().Challenges(ch.Namespace).Update(ch)
if err != nil {
runtime.HandleError(fmt.Errorf("Error scheduling challenge %s/%s for processing: %v", ch.Namespace, ch.Name, err))
log.Error(err, "error scheduling challenge for processing")
return
}
@ -176,16 +184,13 @@ func (c *Controller) runScheduler() {
}
if len(toSchedule) > 0 {
plural := ""
if len(toSchedule) > 1 {
plural = "s"
}
klog.V(4).Infof("Scheduled %d challenge%s for processing", len(toSchedule), plural)
log.V(logf.DebugLevel).Info("scheduled challenges for processing", "number_scheduled", len(toSchedule))
}
}
func (c *Controller) worker(stopCh <-chan struct{}) {
klog.V(4).Infof("Starting %q worker", ControllerName)
func (c *Controller) worker(ctx context.Context) {
log := logf.FromContext(ctx)
log.V(logf.DebugLevel).Info("starting worker")
for {
obj, shutdown := c.queue.Get()
if shutdown {
@ -200,26 +205,25 @@ func (c *Controller) worker(stopCh <-chan struct{}) {
if key, ok = obj.(string); !ok {
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = util.ContextWithStopCh(ctx, stopCh)
klog.Infof("%s controller: syncing item '%s'", ControllerName, key)
log := log.WithValues("key", key)
log.Info("syncing resource")
if err := c.syncHandler(ctx, key); err != nil {
klog.Errorf("%s controller: Re-queuing item %q due to error processing: %s", ControllerName, key, err.Error())
log.Error(err, "re-queuing item due to error processing")
c.queue.AddRateLimited(obj)
return
}
klog.Infof("%s controller: Finished processing work item %q", ControllerName, key)
log.Info("finished processing work item")
c.queue.Forget(obj)
}()
}
klog.V(4).Infof("Exiting %q worker loop", ControllerName)
log.V(logf.DebugLevel).Info("exiting worker loop")
}
func (c *Controller) processNextWorkItem(ctx context.Context, key string) error {
log := logf.FromContext(ctx)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
log.Error(err, "invalid resource key")
return nil
}
@ -227,13 +231,14 @@ func (c *Controller) processNextWorkItem(ctx context.Context, key string) error
if err != nil {
if k8sErrors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("ch '%s' in work queue no longer exists", key))
log.Error(err, "challenge in work queue no longer exists")
return nil
}
return err
}
ctx = logf.NewContext(ctx, logf.WithResource(log, ch))
return c.Sync(ctx, ch)
}

View File

@ -24,15 +24,14 @@ import (
corev1 "k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog"
"github.com/jetstack/cert-manager/pkg/acme"
acmecl "github.com/jetstack/cert-manager/pkg/acme/client"
cmapi "github.com/jetstack/cert-manager/pkg/apis/certmanager/v1alpha1"
controllerpkg "github.com/jetstack/cert-manager/pkg/controller"
acmeapi "github.com/jetstack/cert-manager/third_party/crypto/acme"
dnsutil "github.com/jetstack/cert-manager/pkg/issuer/acme/dns/util"
logf "github.com/jetstack/cert-manager/pkg/logs"
acmeapi "github.com/jetstack/cert-manager/third_party/crypto/acme"
)
const (
@ -55,6 +54,8 @@ type solver interface {
// Sync will process this ACME Challenge.
// It is the core control function for ACME challenges.
func (c *Controller) Sync(ctx context.Context, ch *cmapi.Challenge) (err error) {
log := logf.FromContext(ctx).WithValues("dnsName", ch.Spec.DNSName, "type", ch.Spec.Type)
ctx = logf.NewContext(ctx, log)
oldChal := ch
ch = ch.DeepCopy()
@ -90,13 +91,13 @@ func (c *Controller) Sync(ctx context.Context, ch *cmapi.Challenge) (err error)
if ch.Status.Presented {
solver, err := c.solverFor(ch.Spec.Type)
if err != nil {
klog.Errorf("Error getting solver for challenge %q (type %q): %v", ch.Name, ch.Spec.Type, err)
log.Error(err, "error getting solver for challenge")
return err
}
err = solver.CleanUp(ctx, genericIssuer, ch)
if err != nil {
klog.Errorf("Error cleaning up challenge %q on deletion: %v", ch.Name, err)
log.Error(err, "error cleaning up challenge")
return err
}
@ -170,7 +171,7 @@ func (c *Controller) Sync(ctx context.Context, ch *cmapi.Challenge) (err error)
err = solver.Check(ctx, genericIssuer, ch)
if err != nil {
klog.Infof("propagation check failed: %v", err)
log.Error(err, "propagation check failed")
ch.Status.Reason = fmt.Sprintf("Waiting for %s challenge propagation: %s", ch.Spec.Type, err)
key, err := controllerpkg.KeyFunc(ch)
@ -194,11 +195,12 @@ func (c *Controller) Sync(ctx context.Context, ch *cmapi.Challenge) (err error)
}
func (c *Controller) handleFinalizer(ctx context.Context, ch *cmapi.Challenge) error {
log := logf.FromContext(ctx, "finalizer")
if len(ch.Finalizers) == 0 {
return nil
}
if ch.Finalizers[0] != cmapi.ACMEFinalizer {
klog.V(4).Infof("Waiting to run challenge %q finalization...", ch.Name)
log.V(logf.DebugLevel).Info("waiting to run challenge finalization...")
return nil
}
ch.Finalizers = ch.Finalizers[1:]
@ -214,13 +216,13 @@ func (c *Controller) handleFinalizer(ctx context.Context, ch *cmapi.Challenge) e
solver, err := c.solverFor(ch.Spec.Type)
if err != nil {
klog.Errorf("Error getting solver for challenge %q (type %q): %v", ch.Name, ch.Spec.Type, err)
log.Error(err, "error getting solver for challenge")
return nil
}
err = solver.CleanUp(ctx, genericIssuer, ch)
if err != nil {
klog.Errorf("Error cleaning up challenge %q on deletion: %v", ch.Name, err)
log.Error(err, "error cleaning up challenge")
return nil
}
@ -263,7 +265,9 @@ func (c *Controller) syncChallengeStatus(ctx context.Context, cl acmecl.Interfac
// challenge if it failed, or the final state of the challenge's authorization
// if accepting the challenge succeeds.
func (c *Controller) acceptChallenge(ctx context.Context, cl acmecl.Interface, ch *cmapi.Challenge) error {
klog.Infof("Accepting challenge for domain %q", ch.Spec.DNSName)
log := logf.FromContext(ctx, "acceptChallenge")
log.Info("accepting challenge with ACME server")
// We manually construct an ACME challenge here from our own internal type
// to save additional round trips to the ACME server.
acmeChal := &acmeapi.Challenge{
@ -275,17 +279,18 @@ func (c *Controller) acceptChallenge(ctx context.Context, cl acmecl.Interface, c
ch.Status.State = cmapi.State(acmeChal.Status)
}
if err != nil {
klog.Infof("%s: Error accepting challenge: %v", ch.Name, err)
log.Error(err, "error accepting challenge")
ch.Status.Reason = fmt.Sprintf("Error accepting challenge: %v", err)
return err
}
klog.Infof("Waiting for authorization for domain %q", ch.Spec.DNSName)
log.Info("waiting for authorization for domain")
authorization, err := cl.WaitAuthorization(ctx, ch.Spec.AuthzURL)
if err != nil {
log.Error(err, "error waiting for authorization")
authErr, ok := err.(acmeapi.AuthorizationError)
if !ok {
klog.Infof("%s: Unexpected error waiting for authorization: %v", ch.Name, err)
return err
}