Merge pull request #88 from jetstack-experimental/cleanup-on-exit
Plumb stopCh into workers
This commit is contained in:
commit
bd0be52548
@ -1,6 +1,7 @@
|
||||
package certificates
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
@ -28,6 +29,7 @@ import (
|
||||
controllerpkg "github.com/jetstack-experimental/cert-manager/pkg/controller"
|
||||
"github.com/jetstack-experimental/cert-manager/pkg/issuer"
|
||||
"github.com/jetstack-experimental/cert-manager/pkg/scheduler"
|
||||
"github.com/jetstack-experimental/cert-manager/pkg/util"
|
||||
)
|
||||
|
||||
type Controller struct {
|
||||
@ -37,7 +39,7 @@ type Controller struct {
|
||||
recorder record.EventRecorder
|
||||
|
||||
// To allow injection for testing.
|
||||
syncHandler func(key string) error
|
||||
syncHandler func(ctx context.Context, key string) error
|
||||
|
||||
issuerInformerSynced cache.InformerSynced
|
||||
issuerLister cmlisters.IssuerLister
|
||||
@ -154,7 +156,7 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
|
||||
for i := 0; i < workers; i++ {
|
||||
c.workerWg.Add(1)
|
||||
// TODO (@munnerz): make time.Second duration configurable
|
||||
go wait.Until(c.worker, time.Second, stopCh)
|
||||
go wait.Until(func() { c.worker(stopCh) }, time.Second, stopCh)
|
||||
}
|
||||
<-stopCh
|
||||
glog.V(4).Infof("Shutting down queue as workqueue signaled shutdown")
|
||||
@ -165,7 +167,7 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) worker() {
|
||||
func (c *Controller) worker(stopCh <-chan struct{}) {
|
||||
defer c.workerWg.Done()
|
||||
glog.V(4).Infof("Starting %s worker", ControllerName)
|
||||
for {
|
||||
@ -182,7 +184,10 @@ func (c *Controller) worker() {
|
||||
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %T", obj))
|
||||
return nil
|
||||
}
|
||||
if err := c.syncHandler(key); err != nil {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
ctx = util.ContextWithStopCh(ctx, stopCh)
|
||||
if err := c.syncHandler(ctx, key); err != nil {
|
||||
return err
|
||||
}
|
||||
c.queue.Forget(obj)
|
||||
@ -200,7 +205,7 @@ func (c *Controller) worker() {
|
||||
glog.V(4).Infof("Exiting %s worker loop", ControllerName)
|
||||
}
|
||||
|
||||
func (c *Controller) processNextWorkItem(key string) error {
|
||||
func (c *Controller) processNextWorkItem(ctx context.Context, key string) error {
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
|
||||
@ -219,7 +224,7 @@ func (c *Controller) processNextWorkItem(key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.Sync(crt)
|
||||
return c.Sync(ctx, crt)
|
||||
}
|
||||
|
||||
var keyFunc = controllerpkg.KeyFunc
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package certificates
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"reflect"
|
||||
@ -60,7 +61,7 @@ const (
|
||||
messageRenewalScheduled = "Certificate scheduled for renewal in %d hours"
|
||||
)
|
||||
|
||||
func (c *Controller) Sync(crt *v1alpha1.Certificate) (err error) {
|
||||
func (c *Controller) Sync(ctx context.Context, crt *v1alpha1.Certificate) (err error) {
|
||||
// step zero: check if the referenced issuer exists and is ready
|
||||
issuerObj, err := c.issuerLister.Issuers(crt.Namespace).Get(crt.Spec.Issuer)
|
||||
|
||||
@ -115,13 +116,13 @@ func (c *Controller) Sync(crt *v1alpha1.Certificate) (err error) {
|
||||
// if the certificate was not found, or the certificate data is invalid, we
|
||||
// should issue a new certificate
|
||||
if k8sErrors.IsNotFound(err) || errors.IsInvalidData(err) {
|
||||
return c.issue(i, crt)
|
||||
return c.issue(ctx, i, crt)
|
||||
}
|
||||
|
||||
// if the certificate is valid for a list of domains other than those
|
||||
// listed in the certificate spec, we should re-issue the certificate
|
||||
if !util.EqualUnsorted(crt.Spec.Domains, cert.DNSNames) {
|
||||
return c.issue(i, crt)
|
||||
return c.issue(ctx, i, crt)
|
||||
}
|
||||
|
||||
// calculate the amount of time until expiry
|
||||
@ -132,7 +133,7 @@ func (c *Controller) Sync(crt *v1alpha1.Certificate) (err error) {
|
||||
|
||||
// if we should being attempting to renew now, then trigger a renewal
|
||||
if renewIn <= 0 {
|
||||
return c.renew(i, crt)
|
||||
return c.renew(ctx, i, crt)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -173,9 +174,9 @@ func (c *Controller) scheduleRenewal(crt *v1alpha1.Certificate) {
|
||||
c.recorder.Event(crt, api.EventTypeNormal, successRenewalScheduled, s)
|
||||
}
|
||||
|
||||
func (c *Controller) prepare(issuer issuer.Interface, crt *v1alpha1.Certificate) (err error) {
|
||||
func (c *Controller) prepare(ctx context.Context, issuer issuer.Interface, crt *v1alpha1.Certificate) (err error) {
|
||||
var status v1alpha1.CertificateStatus
|
||||
status, err = issuer.Prepare(crt)
|
||||
status, err = issuer.Prepare(ctx, crt)
|
||||
|
||||
defer func() {
|
||||
if saveErr := c.updateCertificateStatus(crt, status); saveErr != nil {
|
||||
@ -192,12 +193,12 @@ func (c *Controller) prepare(issuer issuer.Interface, crt *v1alpha1.Certificate)
|
||||
|
||||
// return an error on failure. If retrieval is succesful, the certificate data
|
||||
// and private key will be stored in the named secret
|
||||
func (c *Controller) issue(issuer issuer.Interface, crt *v1alpha1.Certificate) (err error) {
|
||||
func (c *Controller) issue(ctx context.Context, issuer issuer.Interface, crt *v1alpha1.Certificate) (err error) {
|
||||
s := messagePreparingCertificate
|
||||
glog.Info(s)
|
||||
c.recorder.Event(crt, api.EventTypeNormal, reasonPreparingCertificate, s)
|
||||
|
||||
if err := c.prepare(issuer, crt); err != nil {
|
||||
if err := c.prepare(ctx, issuer, crt); err != nil {
|
||||
s := messageErrorPreparingCertificate + err.Error()
|
||||
glog.Info(s)
|
||||
c.recorder.Event(crt, api.EventTypeWarning, errorPreparingCertificate, s)
|
||||
@ -208,7 +209,7 @@ func (c *Controller) issue(issuer issuer.Interface, crt *v1alpha1.Certificate) (
|
||||
glog.Info(s)
|
||||
c.recorder.Event(crt, api.EventTypeNormal, reasonIssuingCertificate, s)
|
||||
|
||||
status, key, cert, err := issuer.Issue(crt)
|
||||
status, key, cert, err := issuer.Issue(ctx, crt)
|
||||
|
||||
defer func() {
|
||||
if saveErr := c.updateCertificateStatus(crt, status); saveErr != nil {
|
||||
@ -255,12 +256,12 @@ func (c *Controller) issue(issuer issuer.Interface, crt *v1alpha1.Certificate) (
|
||||
// renew will attempt to renew a certificate from the specified issuer, or
|
||||
// return an error on failure. If renewal is succesful, the certificate data
|
||||
// and private key will be stored in the named secret
|
||||
func (c *Controller) renew(issuer issuer.Interface, crt *v1alpha1.Certificate) error {
|
||||
func (c *Controller) renew(ctx context.Context, issuer issuer.Interface, crt *v1alpha1.Certificate) error {
|
||||
s := messagePreparingCertificate
|
||||
glog.Info(s)
|
||||
c.recorder.Event(crt, api.EventTypeNormal, reasonPreparingCertificate, s)
|
||||
|
||||
if err := c.prepare(issuer, crt); err != nil {
|
||||
if err := c.prepare(ctx, issuer, crt); err != nil {
|
||||
s := messageErrorPreparingCertificate + err.Error()
|
||||
glog.Info(s)
|
||||
c.recorder.Event(crt, api.EventTypeWarning, errorPreparingCertificate, s)
|
||||
@ -271,7 +272,7 @@ func (c *Controller) renew(issuer issuer.Interface, crt *v1alpha1.Certificate) e
|
||||
glog.Info(s)
|
||||
c.recorder.Event(crt, api.EventTypeNormal, reasonRenewingCertificate, s)
|
||||
|
||||
status, key, cert, err := issuer.Renew(crt)
|
||||
status, key, cert, err := issuer.Renew(ctx, crt)
|
||||
|
||||
defer func() {
|
||||
if saveErr := c.updateCertificateStatus(crt, status); saveErr != nil {
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package issuers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
@ -25,6 +26,7 @@ import (
|
||||
cmlisters "github.com/jetstack-experimental/cert-manager/pkg/client/listers/certmanager/v1alpha1"
|
||||
controllerpkg "github.com/jetstack-experimental/cert-manager/pkg/controller"
|
||||
"github.com/jetstack-experimental/cert-manager/pkg/issuer"
|
||||
"github.com/jetstack-experimental/cert-manager/pkg/util"
|
||||
)
|
||||
|
||||
type Controller struct {
|
||||
@ -34,7 +36,7 @@ type Controller struct {
|
||||
recorder record.EventRecorder
|
||||
|
||||
// To allow injection for testing.
|
||||
syncHandler func(key string) error
|
||||
syncHandler func(ctx context.Context, key string) error
|
||||
|
||||
issuerInformerSynced cache.InformerSynced
|
||||
issuerLister cmlisters.IssuerLister
|
||||
@ -104,7 +106,7 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
|
||||
for i := 0; i < workers; i++ {
|
||||
c.workerWg.Add(1)
|
||||
// TODO (@munnerz): make time.Second duration configurable
|
||||
go wait.Until(c.worker, time.Second, stopCh)
|
||||
go wait.Until(func() { c.worker(stopCh) }, time.Second, stopCh)
|
||||
}
|
||||
<-stopCh
|
||||
glog.V(4).Infof("Shutting down queue as workqueue signaled shutdown")
|
||||
@ -115,7 +117,7 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) worker() {
|
||||
func (c *Controller) worker(stopCh <-chan struct{}) {
|
||||
defer c.workerWg.Done()
|
||||
log.Printf("starting worker")
|
||||
for {
|
||||
@ -132,7 +134,10 @@ func (c *Controller) worker() {
|
||||
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %T", obj))
|
||||
return nil
|
||||
}
|
||||
if err := c.syncHandler(key); err != nil {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
ctx = util.ContextWithStopCh(ctx, stopCh)
|
||||
if err := c.syncHandler(ctx, key); err != nil {
|
||||
return err
|
||||
}
|
||||
c.queue.Forget(obj)
|
||||
@ -150,7 +155,7 @@ func (c *Controller) worker() {
|
||||
log.Printf("exiting worker loop")
|
||||
}
|
||||
|
||||
func (c *Controller) processNextWorkItem(key string) error {
|
||||
func (c *Controller) processNextWorkItem(ctx context.Context, key string) error {
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
|
||||
@ -168,7 +173,7 @@ func (c *Controller) processNextWorkItem(key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.Sync(issuer)
|
||||
return c.Sync(ctx, issuer)
|
||||
}
|
||||
|
||||
var keyFunc = controllerpkg.KeyFunc
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package issuers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
|
||||
"github.com/golang/glog"
|
||||
@ -16,7 +17,7 @@ const (
|
||||
messageErrorInitIssuer = "Error initializing issuer: "
|
||||
)
|
||||
|
||||
func (c *Controller) Sync(iss *v1alpha1.Issuer) (err error) {
|
||||
func (c *Controller) Sync(ctx context.Context, iss *v1alpha1.Issuer) (err error) {
|
||||
i, err := c.issuerFactory.IssuerFor(iss)
|
||||
|
||||
if err != nil {
|
||||
@ -24,7 +25,7 @@ func (c *Controller) Sync(iss *v1alpha1.Issuer) (err error) {
|
||||
}
|
||||
|
||||
var status v1alpha1.IssuerStatus
|
||||
status, err = i.Setup()
|
||||
status, err = i.Setup(ctx)
|
||||
|
||||
defer func() {
|
||||
if saveErr := c.updateIssuerStatus(iss, status); saveErr != nil {
|
||||
|
||||
@ -17,6 +17,7 @@ import (
|
||||
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
corev1listers "k8s.io/client-go/listers/core/v1"
|
||||
@ -432,14 +433,19 @@ func testReachability(ctx context.Context, domain, path, key string) error {
|
||||
// CleanUp will ensure the created service and ingress are clean/deleted of any
|
||||
// cert-manager created data.
|
||||
func (s *Solver) CleanUp(ctx context.Context, crt *v1alpha1.Certificate, domain, token, key string) error {
|
||||
var errs []error
|
||||
if err := s.cleanupJob(crt, domain); err != nil {
|
||||
return fmt.Errorf("[%s] Error cleaning up job: %s", domain, err.Error())
|
||||
errs = append(errs, fmt.Errorf("[%s] Error cleaning up job: %s", domain, err.Error()))
|
||||
}
|
||||
if err := s.cleanupService(crt, domain); err != nil {
|
||||
return fmt.Errorf("[%s] Error cleaning up service: %s", domain, err.Error())
|
||||
errs = append(errs, fmt.Errorf("[%s] Error cleaning up service: %s", domain, err.Error()))
|
||||
}
|
||||
if err := s.cleanupIngress(crt, svcNameFunc(crt.Name, domain), domain, token, labelsForCert(crt, domain)); err != nil {
|
||||
return fmt.Errorf("[%s] Error cleaning up ingress: %s", domain, err.Error())
|
||||
errs = append(errs, fmt.Errorf("[%s] Error cleaning up ingress: %s", domain, err.Error()))
|
||||
}
|
||||
|
||||
if errs != nil {
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@ -27,7 +27,7 @@ const (
|
||||
messageCertIssued = "Certificate issued successfully"
|
||||
)
|
||||
|
||||
func (a *Acme) obtainCertificate(crt *v1alpha1.Certificate) ([]byte, []byte, error) {
|
||||
func (a *Acme) obtainCertificate(ctx context.Context, crt *v1alpha1.Certificate) ([]byte, []byte, error) {
|
||||
if crt.Spec.ACME == nil {
|
||||
return nil, nil, fmt.Errorf("acme config must be specified")
|
||||
}
|
||||
@ -68,7 +68,7 @@ func (a *Acme) obtainCertificate(crt *v1alpha1.Certificate) ([]byte, []byte, err
|
||||
}
|
||||
|
||||
certSlice, certURL, err := cl.CreateCert(
|
||||
context.Background(),
|
||||
ctx,
|
||||
csr,
|
||||
0,
|
||||
true,
|
||||
@ -87,9 +87,9 @@ func (a *Acme) obtainCertificate(crt *v1alpha1.Certificate) ([]byte, []byte, err
|
||||
return pki.EncodePKCS1PrivateKey(key), certBuffer.Bytes(), nil
|
||||
}
|
||||
|
||||
func (a *Acme) Issue(crt *v1alpha1.Certificate) (v1alpha1.CertificateStatus, []byte, []byte, error) {
|
||||
func (a *Acme) Issue(ctx context.Context, crt *v1alpha1.Certificate) (v1alpha1.CertificateStatus, []byte, []byte, error) {
|
||||
update := crt.DeepCopy()
|
||||
key, cert, err := a.obtainCertificate(crt)
|
||||
key, cert, err := a.obtainCertificate(ctx, crt)
|
||||
if err != nil {
|
||||
s := messageErrorIssueCert + err.Error()
|
||||
update.UpdateStatusCondition(v1alpha1.CertificateConditionReady, v1alpha1.ConditionFalse, errorIssueCert, s)
|
||||
|
||||
@ -38,7 +38,7 @@ const (
|
||||
//
|
||||
// It will send the appropriate Letsencrypt authorizations, and complete
|
||||
// challenge requests if neccessary.
|
||||
func (a *Acme) Prepare(crt *v1alpha1.Certificate) (v1alpha1.CertificateStatus, error) {
|
||||
func (a *Acme) Prepare(ctx context.Context, crt *v1alpha1.Certificate) (v1alpha1.CertificateStatus, error) {
|
||||
update := crt.DeepCopy()
|
||||
|
||||
log.Printf("getting private key for acme issuer %s/%s", a.issuer.Namespace, a.issuer.Name)
|
||||
@ -56,7 +56,7 @@ func (a *Acme) Prepare(crt *v1alpha1.Certificate) (v1alpha1.CertificateStatus, e
|
||||
}
|
||||
|
||||
// step one: check issuer to see if we already have authorizations
|
||||
toAuthorize, err := authorizationsToObtain(cl, *crt)
|
||||
toAuthorize, err := authorizationsToObtain(ctx, cl, *crt)
|
||||
|
||||
if err != nil {
|
||||
s := messageErrorCheckAuthorization + err.Error()
|
||||
@ -72,7 +72,7 @@ func (a *Acme) Prepare(crt *v1alpha1.Certificate) (v1alpha1.CertificateStatus, e
|
||||
return update.Status, nil
|
||||
}
|
||||
|
||||
auths, err := getAuthorizations(cl, toAuthorize...)
|
||||
auths, err := getAuthorizations(ctx, cl, toAuthorize...)
|
||||
|
||||
if err != nil {
|
||||
s := messageErrorCheckAuthorization + err.Error()
|
||||
@ -92,7 +92,7 @@ func (a *Acme) Prepare(crt *v1alpha1.Certificate) (v1alpha1.CertificateStatus, e
|
||||
wg.Add(1)
|
||||
go func(auth authResponse) {
|
||||
defer wg.Done()
|
||||
a, err := a.authorize(cl, crt, auth)
|
||||
a, err := a.authorize(ctx, cl, crt, auth)
|
||||
resultChan <- struct {
|
||||
authResponse
|
||||
*acme.Authorization
|
||||
@ -141,7 +141,7 @@ func keyForChallenge(cl *acme.Client, challenge *acme.Challenge) (string, error)
|
||||
return "", err
|
||||
}
|
||||
|
||||
func (a *Acme) authorize(cl *acme.Client, crt *v1alpha1.Certificate, auth authResponse) (*acme.Authorization, error) {
|
||||
func (a *Acme) authorize(ctx context.Context, cl *acme.Client, crt *v1alpha1.Certificate, auth authResponse) (*acme.Authorization, error) {
|
||||
glog.V(4).Infof("picking challenge type for domain '%s'", auth.domain)
|
||||
challengeType, err := pickChallengeType(auth.domain, auth.auth, crt.Spec.ACME.Config)
|
||||
if err != nil {
|
||||
@ -162,27 +162,27 @@ func (a *Acme) authorize(cl *acme.Client, crt *v1alpha1.Certificate, auth authRe
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer solver.CleanUp(context.Background(), crt, auth.domain, token, key)
|
||||
defer solver.CleanUp(ctx, crt, auth.domain, token, key)
|
||||
|
||||
a.recorder.Eventf(crt, v1.EventTypeNormal, reasonPresentChallenge, messagePresentChallenge, challengeType, auth.domain)
|
||||
err = solver.Present(context.Background(), crt, auth.domain, token, key)
|
||||
err = solver.Present(ctx, crt, auth.domain, token, key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error presenting acme authorization for domain '%s': %s", auth.domain, err.Error())
|
||||
}
|
||||
|
||||
a.recorder.Eventf(crt, v1.EventTypeNormal, reasonSelfCheck, messageSelfCheck, auth.domain)
|
||||
err = solver.Wait(context.Background(), crt, auth.domain, token, key)
|
||||
err = solver.Wait(ctx, crt, auth.domain, token, key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error waiting for key to be available for domain '%s': %s", auth.domain, err.Error())
|
||||
}
|
||||
|
||||
challenge, err = cl.Accept(context.Background(), challenge)
|
||||
challenge, err = cl.Accept(ctx, challenge)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error accepting acme challenge for domain '%s': %s", auth.domain, err.Error())
|
||||
}
|
||||
|
||||
glog.V(4).Infof("waiting for authorization for domain %s (%s)...", auth.domain, challenge.URI)
|
||||
authorization, err := cl.WaitAuthorization(context.Background(), challenge.URI)
|
||||
authorization, err := cl.WaitAuthorization(ctx, challenge.URI)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error waiting for authorization for domain '%s': %s", auth.domain, err.Error())
|
||||
}
|
||||
@ -196,8 +196,8 @@ func (a *Acme) authorize(cl *acme.Client, crt *v1alpha1.Certificate, auth authRe
|
||||
return authorization, nil
|
||||
}
|
||||
|
||||
func checkAuthorization(cl *acme.Client, uri string) (bool, error) {
|
||||
a, err := cl.GetAuthorization(context.Background(), uri)
|
||||
func checkAuthorization(ctx context.Context, cl *acme.Client, uri string) (bool, error) {
|
||||
a, err := cl.GetAuthorization(ctx, uri)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
@ -218,14 +218,14 @@ func authorizationsMap(list []v1alpha1.ACMEDomainAuthorization) map[string]v1alp
|
||||
return out
|
||||
}
|
||||
|
||||
func authorizationsToObtain(cl *acme.Client, crt v1alpha1.Certificate) ([]string, error) {
|
||||
func authorizationsToObtain(ctx context.Context, cl *acme.Client, crt v1alpha1.Certificate) ([]string, error) {
|
||||
authMap := authorizationsMap(crt.Status.ACMEStatus().Authorizations)
|
||||
toAuthorize := util.StringFilter(func(domain string) (bool, error) {
|
||||
auth, ok := authMap[domain]
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
return checkAuthorization(cl, auth.URI)
|
||||
return checkAuthorization(ctx, cl, auth.URI)
|
||||
}, crt.Spec.Domains...)
|
||||
|
||||
domains := make([]string, len(toAuthorize))
|
||||
@ -260,12 +260,12 @@ func (a authResponses) Error() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getAuthorizations(cl *acme.Client, domains ...string) ([]authResponse, error) {
|
||||
func getAuthorizations(ctx context.Context, cl *acme.Client, domains ...string) ([]authResponse, error) {
|
||||
respCh := make(chan authResponse)
|
||||
defer close(respCh)
|
||||
for _, d := range domains {
|
||||
go func(domain string) {
|
||||
auth, err := cl.Authorize(context.Background(), domain)
|
||||
auth, err := cl.Authorize(ctx, domain)
|
||||
|
||||
if err != nil {
|
||||
respCh <- authResponse{"", nil, fmt.Errorf("getting acme authorization failed: %s", err.Error())}
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
package acme
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jetstack-experimental/cert-manager/pkg/apis/certmanager/v1alpha1"
|
||||
)
|
||||
|
||||
@ -12,9 +14,9 @@ const (
|
||||
messageCertRenewed = "Certificate renewed successfully"
|
||||
)
|
||||
|
||||
func (a *Acme) Renew(crt *v1alpha1.Certificate) (v1alpha1.CertificateStatus, []byte, []byte, error) {
|
||||
func (a *Acme) Renew(ctx context.Context, crt *v1alpha1.Certificate) (v1alpha1.CertificateStatus, []byte, []byte, error) {
|
||||
update := crt.DeepCopy()
|
||||
key, cert, err := a.obtainCertificate(crt)
|
||||
key, cert, err := a.obtainCertificate(ctx, crt)
|
||||
if err != nil {
|
||||
s := messageErrorIssueCert + err.Error()
|
||||
update.UpdateStatusCondition(v1alpha1.CertificateConditionReady, v1alpha1.ConditionFalse, errorRenewCert, s)
|
||||
|
||||
@ -30,7 +30,7 @@ const (
|
||||
messageAccountVerified = "The ACME account was verified with the ACME server"
|
||||
)
|
||||
|
||||
func (a *Acme) Setup() (v1alpha1.IssuerStatus, error) {
|
||||
func (a *Acme) Setup(ctx context.Context) (v1alpha1.IssuerStatus, error) {
|
||||
update := a.issuer.DeepCopy()
|
||||
|
||||
accountPrivKey, err := kube.SecretTLSKey(a.secretsLister, a.issuer.Namespace, a.issuer.Spec.ACME.PrivateKey)
|
||||
@ -50,7 +50,7 @@ func (a *Acme) Setup() (v1alpha1.IssuerStatus, error) {
|
||||
DirectoryURL: a.issuer.Spec.ACME.Server,
|
||||
}
|
||||
|
||||
_, err = cl.GetReg(context.Background(), a.issuer.Status.ACMEStatus().URI)
|
||||
_, err = cl.GetReg(ctx, a.issuer.Status.ACMEStatus().URI)
|
||||
|
||||
if err == nil {
|
||||
update.UpdateStatusCondition(v1alpha1.IssuerConditionReady, v1alpha1.ConditionTrue, successAccountVerified, messageAccountVerified)
|
||||
@ -65,8 +65,7 @@ func (a *Acme) Setup() (v1alpha1.IssuerStatus, error) {
|
||||
Contact: []string{fmt.Sprintf("mailto:%s", strings.ToLower(a.issuer.Spec.ACME.Email))},
|
||||
}
|
||||
|
||||
// todo (@munnerz): don't use ctx.Background() here
|
||||
account, err := cl.Register(context.Background(), acc, acme.AcceptTOS)
|
||||
account, err := cl.Register(ctx, acc, acme.AcceptTOS)
|
||||
|
||||
if err != nil {
|
||||
s := messageAccountRegistrationFailed + err.Error()
|
||||
|
||||
@ -2,6 +2,7 @@ package ca
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/x509"
|
||||
"crypto/x509/pkix"
|
||||
@ -34,7 +35,7 @@ const (
|
||||
defaultOrganization = "cert-manager"
|
||||
)
|
||||
|
||||
func (c *CA) Issue(crt *v1alpha1.Certificate) (v1alpha1.CertificateStatus, []byte, []byte, error) {
|
||||
func (c *CA) Issue(ctx context.Context, crt *v1alpha1.Certificate) (v1alpha1.CertificateStatus, []byte, []byte, error) {
|
||||
update := crt.DeepCopy()
|
||||
|
||||
signeeKey, err := kube.SecretTLSKey(c.secretsLister, c.issuer.Namespace, crt.Spec.SecretName)
|
||||
|
||||
@ -1,10 +1,14 @@
|
||||
package ca
|
||||
|
||||
import "github.com/jetstack-experimental/cert-manager/pkg/apis/certmanager/v1alpha1"
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jetstack-experimental/cert-manager/pkg/apis/certmanager/v1alpha1"
|
||||
)
|
||||
|
||||
// Prepare does nothing for the CA issuer. In future, this may validate
|
||||
// the certificate request against the issuer, or set fields in the Status
|
||||
// block to be consumed in Issue and Renew
|
||||
func (c *CA) Prepare(crt *v1alpha1.Certificate) (v1alpha1.CertificateStatus, error) {
|
||||
func (c *CA) Prepare(ctx context.Context, crt *v1alpha1.Certificate) (v1alpha1.CertificateStatus, error) {
|
||||
return crt.Status, nil
|
||||
}
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
package ca
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jetstack-experimental/cert-manager/pkg/apis/certmanager/v1alpha1"
|
||||
"github.com/jetstack-experimental/cert-manager/pkg/util/kube"
|
||||
"github.com/jetstack-experimental/cert-manager/pkg/util/pki"
|
||||
@ -16,7 +18,7 @@ const (
|
||||
messageCertRenewed = "Certificate issued successfully"
|
||||
)
|
||||
|
||||
func (c *CA) Renew(crt *v1alpha1.Certificate) (v1alpha1.CertificateStatus, []byte, []byte, error) {
|
||||
func (c *CA) Renew(ctx context.Context, crt *v1alpha1.Certificate) (v1alpha1.CertificateStatus, []byte, []byte, error) {
|
||||
update := crt.DeepCopy()
|
||||
|
||||
signeeKey, err := kube.SecretTLSKey(c.secretsLister, c.issuer.Namespace, crt.Spec.SecretName)
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package ca
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
@ -23,7 +24,7 @@ const (
|
||||
messageKeyPairVerified = "Signing CA verified"
|
||||
)
|
||||
|
||||
func (c *CA) Setup() (v1alpha1.IssuerStatus, error) {
|
||||
func (c *CA) Setup(ctx context.Context) (v1alpha1.IssuerStatus, error) {
|
||||
update := c.issuer.DeepCopy()
|
||||
|
||||
cert, err := kube.SecretTLSCert(c.secretsLister, update.Namespace, update.Spec.CA.SecretRef.Name)
|
||||
|
||||
@ -1,18 +1,22 @@
|
||||
package issuer
|
||||
|
||||
import "github.com/jetstack-experimental/cert-manager/pkg/apis/certmanager/v1alpha1"
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jetstack-experimental/cert-manager/pkg/apis/certmanager/v1alpha1"
|
||||
)
|
||||
|
||||
type Interface interface {
|
||||
// Setup initialises the issuer. This may include registering accounts with
|
||||
// a service, creating a CA and storing it somewhere, or verifying
|
||||
// credentials and authorization with a remote server.
|
||||
Setup() (v1alpha1.IssuerStatus, error)
|
||||
Setup(ctx context.Context) (v1alpha1.IssuerStatus, error)
|
||||
// Prepare
|
||||
Prepare(*v1alpha1.Certificate) (v1alpha1.CertificateStatus, error)
|
||||
Prepare(context.Context, *v1alpha1.Certificate) (v1alpha1.CertificateStatus, error)
|
||||
// Issue attempts to issue a certificate as described by the certificate
|
||||
// resource given
|
||||
Issue(*v1alpha1.Certificate) (v1alpha1.CertificateStatus, []byte, []byte, error)
|
||||
Issue(context.Context, *v1alpha1.Certificate) (v1alpha1.CertificateStatus, []byte, []byte, error)
|
||||
// Renew attempts to renew the certificate describe by the certificate
|
||||
// resource given. If no certificate exists, an error is returned.
|
||||
Renew(*v1alpha1.Certificate) (v1alpha1.CertificateStatus, []byte, []byte, error)
|
||||
Renew(context.Context, *v1alpha1.Certificate) (v1alpha1.CertificateStatus, []byte, []byte, error)
|
||||
}
|
||||
|
||||
17
pkg/util/context.go
Normal file
17
pkg/util/context.go
Normal file
@ -0,0 +1,17 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
func ContextWithStopCh(ctx context.Context, stopCh <-chan struct{}) context.Context {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
go func() {
|
||||
defer cancel()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-stopCh:
|
||||
}
|
||||
}()
|
||||
return ctx
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user