Refactor certificates controller to make methods shareable

Signed-off-by: James Munnelly <james@munnelly.eu>
This commit is contained in:
James Munnelly 2019-07-10 23:15:33 +01:00
parent 8d61145516
commit 7edbd829c4
4 changed files with 94 additions and 66 deletions

View File

@ -11,7 +11,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/api/util:go_default_library",
"//pkg/apis/certmanager:go_default_library",
"//pkg/apis/certmanager/v1alpha1:go_default_library",
"//pkg/apis/certmanager/validation:go_default_library",
"//pkg/client/clientset/versioned:go_default_library",

View File

@ -19,10 +19,14 @@ package certificates
import (
"fmt"
cmapi "github.com/jetstack/cert-manager/pkg/apis/certmanager/v1alpha1"
logf "github.com/jetstack/cert-manager/pkg/logs"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/util/workqueue"
cmapi "github.com/jetstack/cert-manager/pkg/apis/certmanager/v1alpha1"
cmlisters "github.com/jetstack/cert-manager/pkg/client/listers/certmanager/v1alpha1"
logf "github.com/jetstack/cert-manager/pkg/logs"
)
func (c *controller) handleGenericIssuer(obj interface{}) {
@ -51,34 +55,36 @@ func (c *controller) handleGenericIssuer(obj interface{}) {
}
}
func (c *controller) handleSecretResource(obj interface{}) {
log := c.log.WithName("handleSecretResource")
func secretResourceHandler(log logr.Logger, certificateLister cmlisters.CertificateLister, queue workqueue.Interface) func(obj interface{}) {
return func(obj interface{}) {
log := log.WithName("handleSecretResource")
secret, ok := obj.(*corev1.Secret)
if !ok {
log.Error(nil, "object is not a Secret resource")
return
}
log = logf.WithResource(log, secret)
crts, err := c.certificatesForSecret(secret)
if err != nil {
log.Error(err, "error looking up Certificates observing Secret")
return
}
for _, crt := range crts {
log := logf.WithRelatedResource(log, crt)
key, err := keyFunc(crt)
if err != nil {
log.Error(err, "error computing key for resource")
continue
secret, ok := obj.(*corev1.Secret)
if !ok {
log.Error(nil, "object is not a Secret resource")
return
}
log = logf.WithResource(log, secret)
crts, err := certificatesForSecret(certificateLister, secret)
if err != nil {
log.Error(err, "error looking up Certificates observing Secret")
return
}
for _, crt := range crts {
log := logf.WithRelatedResource(log, crt)
key, err := keyFunc(crt)
if err != nil {
log.Error(err, "error computing key for resource")
continue
}
queue.Add(key)
}
c.queue.Add(key)
}
}
func (c *controller) certificatesForSecret(secret *corev1.Secret) ([]*cmapi.Certificate, error) {
crts, err := c.certificateLister.List(labels.NewSelector())
func certificatesForSecret(certificateLister cmlisters.CertificateLister, secret *corev1.Secret) ([]*cmapi.Certificate, error) {
crts, err := certificateLister.List(labels.NewSelector())
if err != nil {
return nil, fmt.Errorf("error listing certificiates: %s", err.Error())

View File

@ -58,6 +58,7 @@ type controller struct {
// used for testing
clock clock.Clock
// used to record Events about resources to the API
recorder record.EventRecorder
@ -82,13 +83,15 @@ type controller struct {
// and certificate spec.
// This is a field on the controller struct to avoid having to maintain a reference
// to the controller context, and to make it easier to fake out this call during tests.
calculateDurationUntilRenew func(ctx context.Context, cert *x509.Certificate, crt *v1alpha1.Certificate) time.Duration
calculateDurationUntilRenew calculateDurationUntilRenewFn
// if addOwnerReferences is enabled then the controller will add owner references
// to the secret resources it creates
addOwnerReferences bool
}
type calculateDurationUntilRenewFn func(context.Context, *x509.Certificate, *v1alpha1.Certificate) time.Duration
// Register registers and constructs the controller using the provided context.
// It returns the workqueue to be used to enqueue items, a list of
// InformerSynced functions that must be synced, or an error.
@ -133,9 +136,9 @@ func (c *controller) Register(ctx *controllerpkg.Context) (workqueue.RateLimitin
// register handler functions
certificateInformer.Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{Queue: c.queue})
issuerInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{WorkFunc: c.handleGenericIssuer})
secretsInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{WorkFunc: c.handleSecretResource})
secretsInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{WorkFunc: secretResourceHandler(c.log, c.certificateLister, c.queue)})
ordersInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{
WorkFunc: controllerpkg.HandleOwnedResourceNamespacedFunc(c.log, c.queue, certificateGvk, c.certificateGetter),
WorkFunc: controllerpkg.HandleOwnedResourceNamespacedFunc(c.log, c.queue, certificateGvk, certificateGetter(c.certificateLister)),
})
// Create a scheduled work queue that calls the ctrl.queue.Add method for
@ -173,30 +176,48 @@ func (c *controller) Register(ctx *controllerpkg.Context) (workqueue.RateLimitin
}
func (c *controller) ProcessItem(ctx context.Context, key string) error {
ctx = logf.NewContext(ctx, nil, ControllerName)
log := logf.FromContext(ctx)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.Error(err, "invalid resource key")
crt, err := getCertificateForKey(ctx, key, c.certificateLister)
if k8sErrors.IsNotFound(err) {
log.Error(err, "certificate resource not found for key", "key", key)
return nil
}
if crt == nil {
log.Info("certificate resource not found for key", "key", key)
return nil
}
crt, err := c.certificateLister.Certificates(namespace).Get(name)
if err != nil {
if k8sErrors.IsNotFound(err) {
c.scheduledWorkQueue.Forget(key)
log.Error(err, "certificate in work queue no longer exists")
return nil
}
return err
}
ctx = logf.NewContext(ctx, logf.WithResource(log, crt))
return c.Sync(ctx, crt)
}
func (c *controller) certificateGetter(namespace, name string) (interface{}, error) {
return c.certificateLister.Certificates(namespace).Get(name)
type syncFn func(context.Context, *v1alpha1.Certificate) error
func getCertificateForKey(ctx context.Context, key string, lister cmlisters.CertificateLister) (*v1alpha1.Certificate, error) {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return nil, nil
}
crt, err := lister.Certificates(namespace).Get(name)
if k8sErrors.IsNotFound(err) {
return nil, nil
}
if err != nil {
return nil, err
}
return crt, nil
}
func certificateGetter(lister cmlisters.CertificateLister) func(namespace, name string) (interface{}, error) {
return func(namespace, name string) (interface{}, error) {
return lister.Certificates(namespace).Get(name)
}
}
var keyFunc = controllerpkg.KeyFunc

View File

@ -34,15 +34,17 @@ import (
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilfeature "k8s.io/apiserver/pkg/util/feature"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
apiutil "github.com/jetstack/cert-manager/pkg/api/util"
"github.com/jetstack/cert-manager/pkg/apis/certmanager"
"github.com/jetstack/cert-manager/pkg/apis/certmanager/v1alpha1"
"github.com/jetstack/cert-manager/pkg/apis/certmanager/validation"
cmclient "github.com/jetstack/cert-manager/pkg/client/clientset/versioned"
"github.com/jetstack/cert-manager/pkg/feature"
"github.com/jetstack/cert-manager/pkg/issuer"
logf "github.com/jetstack/cert-manager/pkg/logs"
"github.com/jetstack/cert-manager/pkg/metrics"
"github.com/jetstack/cert-manager/pkg/util"
"github.com/jetstack/cert-manager/pkg/util/errors"
"github.com/jetstack/cert-manager/pkg/util/kube"
@ -80,15 +82,15 @@ func (c *controller) Sync(ctx context.Context, crt *v1alpha1.Certificate) (err e
log := logf.FromContext(ctx)
dbg := log.V(logf.DebugLevel)
// TODO: if not 'certmanager.k8s.io, then use CertificateRequest stratagy if feature gate set
if !(crt.Spec.IssuerRef.Group == "" || crt.Spec.IssuerRef.Group == certmanager.GroupName) {
dbg.Info("certificate issuerRef group does not match certmanager group so skipping processing")
// if group name is set, use the new experimental controller implementation
if crt.Spec.IssuerRef.Group != "" {
log.Info("certificate issuerRef group is non-empty, skipping processing")
return nil
}
crtCopy := crt.DeepCopy()
defer func() {
if _, saveErr := c.updateCertificateStatus(ctx, crt, crtCopy); saveErr != nil {
if _, saveErr := updateCertificateStatus(ctx, c.metrics, c.cmClient, crt, crtCopy); saveErr != nil {
err = utilerrors.NewAggregate([]error{saveErr, err})
}
}()
@ -188,7 +190,7 @@ func (c *controller) Sync(ctx context.Context, crt *v1alpha1.Certificate) (err e
}
// begin checking if the TLS certificate is valid/needs a re-issue or renew
matches, matchErrs := c.certificateMatchesSpec(crtCopy, key, cert)
matches, matchErrs := certificateMatchesSpec(crtCopy, key, cert, c.secretLister)
if !matches {
dbg.Info("invoking issue function due to certificate not matching spec", "diff", strings.Join(matchErrs, ", "))
return c.issue(ctx, i, crtCopy)
@ -205,7 +207,7 @@ func (c *controller) Sync(ctx context.Context, crt *v1alpha1.Certificate) (err e
dbg.Info("Certificate does not need updating. Scheduling renewal.")
// If the Certificate is valid and up to date, we schedule a renewal in
// the future.
c.scheduleRenewal(ctx, crt)
scheduleRenewal(ctx, c.secretLister, c.calculateDurationUntilRenew, c.scheduledWorkQueue.Add, crt)
return nil
}
@ -222,7 +224,7 @@ func (c *controller) setCertificateStatus(crt *v1alpha1.Certificate, key crypto.
crt.Status.NotAfter = &metaNotAfter
// Derive & set 'Ready' condition on Certificate resource
matches, matchErrs := c.certificateMatchesSpec(crt, key, cert)
matches, matchErrs := certificateMatchesSpec(crt, key, cert, c.secretLister)
ready := v1alpha1.ConditionFalse
reason := ""
message := ""
@ -249,7 +251,7 @@ func (c *controller) setCertificateStatus(crt *v1alpha1.Certificate, key crypto.
return
}
func (c *controller) certificateMatchesSpec(crt *v1alpha1.Certificate, key crypto.Signer, cert *x509.Certificate) (bool, []string) {
func certificateMatchesSpec(crt *v1alpha1.Certificate, key crypto.Signer, cert *x509.Certificate, secretLister corelisters.SecretLister) (bool, []string) {
var errs []string
// TODO: add checks for KeySize, KeyAlgorithm fields
@ -284,7 +286,7 @@ func (c *controller) certificateMatchesSpec(crt *v1alpha1.Certificate, key crypt
// get a copy of the current secret resource
// Note that we already know that it exists, no need to check for errors
// TODO: Refactor so that the secret is passed as argument?
secret, err := c.secretLister.Secrets(crt.Namespace).Get(crt.Spec.SecretName)
secret, err := secretLister.Secrets(crt.Namespace).Get(crt.Spec.SecretName)
// validate that the issuer is correct
if crt.Spec.IssuerRef.Name != secret.Annotations[v1alpha1.IssuerNameAnnotationKey] {
@ -292,14 +294,14 @@ func (c *controller) certificateMatchesSpec(crt *v1alpha1.Certificate, key crypt
}
// validate that the issuer kind is correct
if issuerKind(crt) != secret.Annotations[v1alpha1.IssuerKindAnnotationKey] {
if issuerKind(crt.Spec.IssuerRef) != secret.Annotations[v1alpha1.IssuerKindAnnotationKey] {
errs = append(errs, fmt.Sprintf("Issuer kind of the certificate is not up to date: %q", secret.Annotations[v1alpha1.IssuerKindAnnotationKey]))
}
return len(errs) == 0, errs
}
func (c *controller) scheduleRenewal(ctx context.Context, crt *v1alpha1.Certificate) {
func scheduleRenewal(ctx context.Context, lister corelisters.SecretLister, calc calculateDurationUntilRenewFn, queueFn func(interface{}, time.Duration), crt *v1alpha1.Certificate) {
log := logf.FromContext(ctx)
log = log.WithValues(
logf.RelatedResourceNameKey, crt.Spec.SecretName,
@ -313,7 +315,7 @@ func (c *controller) scheduleRenewal(ctx context.Context, crt *v1alpha1.Certific
return
}
cert, err := kube.SecretTLSCert(ctx, c.secretLister, crt.Namespace, crt.Spec.SecretName)
cert, err := kube.SecretTLSCert(ctx, lister, crt.Namespace, crt.Spec.SecretName)
if err != nil {
if !errors.IsInvalidData(err) {
log.Error(err, "error getting secret for certificate resource")
@ -321,18 +323,18 @@ func (c *controller) scheduleRenewal(ctx context.Context, crt *v1alpha1.Certific
return
}
renewIn := c.calculateDurationUntilRenew(ctx, cert, crt)
c.scheduledWorkQueue.Add(key, renewIn)
renewIn := calc(ctx, cert, crt)
queueFn(key, renewIn)
log.WithValues("duration_until_renewal", renewIn.String()).Info("certificate scheduled for renewal")
}
// issuerKind returns the kind of issuer for a certificate
func issuerKind(crt *v1alpha1.Certificate) string {
if crt.Spec.IssuerRef.Kind == "" {
func issuerKind(ref v1alpha1.ObjectReference) string {
if ref.Kind == "" {
return v1alpha1.IssuerKind
}
return crt.Spec.IssuerRef.Kind
return ref.Kind
}
func ownerRef(crt *v1alpha1.Certificate) metav1.OwnerReference {
@ -448,7 +450,7 @@ func (c *controller) updateSecret(ctx context.Context, crt *v1alpha1.Certificate
// not just when a new certificate is issued
if x509Cert != nil {
secret.Annotations[v1alpha1.IssuerNameAnnotationKey] = crt.Spec.IssuerRef.Name
secret.Annotations[v1alpha1.IssuerKindAnnotationKey] = issuerKind(crt)
secret.Annotations[v1alpha1.IssuerKindAnnotationKey] = issuerKind(crt.Spec.IssuerRef)
secret.Annotations[v1alpha1.CommonNameAnnotationKey] = x509Cert.Subject.CommonName
secret.Annotations[v1alpha1.AltNamesAnnotationKey] = strings.Join(x509Cert.DNSNames, ",")
secret.Annotations[v1alpha1.IPSANAnnotationKey] = strings.Join(pki.IPAddressesToString(x509Cert.IPAddresses), ",")
@ -502,7 +504,7 @@ func (c *controller) issue(ctx context.Context, issuer issuer.Interface, crt *v1
if len(resp.Certificate) > 0 {
c.recorder.Event(crt, corev1.EventTypeNormal, successCertificateIssued, "Certificate issued successfully")
// as we have just written a certificate, we should schedule it for renewal
c.scheduleRenewal(ctx, crt)
scheduleRenewal(ctx, c.secretLister, c.calculateDurationUntilRenew, c.scheduledWorkQueue.Add, crt)
}
return nil
@ -584,8 +586,8 @@ func generateLocallySignedTemporaryCertificate(crt *v1alpha1.Certificate, pk []b
return b, nil
}
func (c *controller) updateCertificateStatus(ctx context.Context, old, new *v1alpha1.Certificate) (*v1alpha1.Certificate, error) {
defer c.metrics.UpdateCertificateStatus(new)
func updateCertificateStatus(ctx context.Context, m *metrics.Metrics, cmClient cmclient.Interface, old, new *v1alpha1.Certificate) (*v1alpha1.Certificate, error) {
defer m.UpdateCertificateStatus(new)
log := logf.FromContext(ctx, "updateStatus")
oldBytes, _ := json.Marshal(old.Status)
@ -597,5 +599,5 @@ func (c *controller) updateCertificateStatus(ctx context.Context, old, new *v1al
// TODO: replace Update call with UpdateStatus. This requires a custom API
// server with the /status subresource enabled and/or subresource support
// for CRDs (https://github.com/kubernetes/kubernetes/issues/38113)
return c.cmClient.CertmanagerV1alpha1().Certificates(new.Namespace).Update(new)
return cmClient.CertmanagerV1alpha1().Certificates(new.Namespace).Update(new)
}