Add ACME challenge scheduler and process challenges for matching domains/types in serial

Signed-off-by: James Munnelly <james@munnelly.eu>
This commit is contained in:
James Munnelly 2018-10-12 16:55:45 +01:00
parent 9d192e3ffe
commit 85d433cff9
11 changed files with 487 additions and 7 deletions

View File

@ -39,6 +39,7 @@ import (
intscheme "github.com/jetstack/cert-manager/pkg/client/clientset/versioned/scheme"
informers "github.com/jetstack/cert-manager/pkg/client/informers/externalversions"
"github.com/jetstack/cert-manager/pkg/controller"
"github.com/jetstack/cert-manager/pkg/controller/acmechallenges/scheduler"
dnsutil "github.com/jetstack/cert-manager/pkg/issuer/acme/dns/util"
"github.com/jetstack/cert-manager/pkg/util"
"github.com/jetstack/cert-manager/pkg/util/kube"
@ -68,7 +69,11 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) {
defer wg.Done()
glog.Infof("Starting %s controller", n)
err := fn(5, stopCh)
workers := 5
if n == scheduler.ControllerName {
workers = 1
}
err := fn(workers, stopCh)
if err != nil {
glog.Fatalf("error running %s controller: %s", n, err.Error())

View File

@ -26,6 +26,7 @@ import (
"github.com/jetstack/cert-manager/pkg/util"
challengescontroller "github.com/jetstack/cert-manager/pkg/controller/acmechallenges"
"github.com/jetstack/cert-manager/pkg/controller/acmechallenges/scheduler"
orderscontroller "github.com/jetstack/cert-manager/pkg/controller/acmeorders"
certificatescontroller "github.com/jetstack/cert-manager/pkg/controller/certificates"
clusterissuerscontroller "github.com/jetstack/cert-manager/pkg/controller/clusterissuers"
@ -99,6 +100,7 @@ var (
ingressshimcontroller.ControllerName,
orderscontroller.ControllerName,
challengescontroller.ControllerName,
scheduler.ControllerName,
}
)

View File

@ -28,6 +28,7 @@ import (
"github.com/jetstack/cert-manager/cmd/controller/app"
"github.com/jetstack/cert-manager/cmd/controller/app/options"
_ "github.com/jetstack/cert-manager/pkg/controller/acmechallenges"
_ "github.com/jetstack/cert-manager/pkg/controller/acmechallenges/scheduler"
_ "github.com/jetstack/cert-manager/pkg/controller/acmeorders"
_ "github.com/jetstack/cert-manager/pkg/controller/certificates"
_ "github.com/jetstack/cert-manager/pkg/controller/clusterissuers"

View File

@ -84,6 +84,15 @@ type ChallengeSpec struct {
}
type ChallengeStatus struct {
// Processing is used to denote whether this challenge should be processed
// or not.
// This field will only be set to true by the 'scheduling' component.
// It will only be set to false by the 'challenges' controller, after the
// challenge has reached a final state or timed out.
// If this field is set to false, the challenge controller will not take
// any more action.
Processing bool `json:"processing"`
// Presented will be set to true if the challenge values for this challenge
// are currently 'presented'.
// This *does not* imply the self check is passing. Only that the values

View File

@ -0,0 +1,160 @@
/*
Copyright 2018 The Jetstack cert-manager contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"fmt"
"sync"
"time"
"github.com/golang/glog"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
cmlisters "github.com/jetstack/cert-manager/pkg/client/listers/certmanager/v1alpha1"
controllerpkg "github.com/jetstack/cert-manager/pkg/controller"
"github.com/jetstack/cert-manager/pkg/util"
)
type Controller struct {
controllerpkg.Context
// To allow injection for testing.
syncHandler func(ctx context.Context, key string) error
challengeLister cmlisters.ChallengeLister
stopCh <-chan struct{}
challengesHasSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
}
func New(ctx *controllerpkg.Context) *Controller {
ctrl := &Controller{Context: *ctx}
ctrl.syncHandler = ctrl.processNextWorkItem
ctrl.queue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Second*1, time.Second*10), "challenges-scheduler")
challengeInformer := ctrl.SharedInformerFactory.Certmanager().V1alpha1().Challenges()
challengeInformer.Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{Queue: ctrl.queue})
ctrl.challengesHasSynced = challengeInformer.Informer().HasSynced
ctrl.challengeLister = challengeInformer.Lister()
return ctrl
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
c.stopCh = stopCh
glog.V(4).Infof("Starting %s control loop", ControllerName)
// wait for all the informer caches we depend on are synced
if !cache.WaitForCacheSync(stopCh, c.challengesHasSynced) {
return fmt.Errorf("error waiting for informer caches to sync")
}
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
// TODO (@munnerz): make time.Second duration configurable
go wait.Until(func() {
defer wg.Done()
c.worker(stopCh)
},
time.Second, stopCh)
}
<-stopCh
glog.V(4).Infof("Shutting down queue as workqueue signaled shutdown")
c.queue.ShutDown()
glog.V(4).Infof("Waiting for workers to exit...")
wg.Wait()
glog.V(4).Infof("Workers exited.")
return nil
}
func (c *Controller) worker(stopCh <-chan struct{}) {
glog.V(4).Infof("Starting %q worker", ControllerName)
for {
obj, shutdown := c.queue.Get()
if shutdown {
break
}
var key string
err := func(obj interface{}) error {
defer c.queue.Done(obj)
var ok bool
if key, ok = obj.(string); !ok {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = util.ContextWithStopCh(ctx, stopCh)
glog.Infof("%s controller: syncing item '%s'", ControllerName, key)
if err := c.syncHandler(ctx, key); err != nil {
return err
}
c.queue.Forget(obj)
return nil
}(obj)
if err != nil {
glog.Errorf("%s controller: Re-queuing item %q due to error processing: %s", ControllerName, key, err.Error())
c.queue.AddRateLimited(obj)
continue
}
glog.Infof("%s controller: Finished processing work item %q", ControllerName, key)
}
glog.V(4).Infof("Exiting %q worker loop", ControllerName)
}
func (c *Controller) processNextWorkItem(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
ch, err := c.challengeLister.Challenges(namespace).Get(name)
if err != nil {
if k8sErrors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("ch '%s' in work queue no longer exists", key))
return nil
}
return err
}
return c.Sync(ctx, ch)
}
var keyFunc = controllerpkg.KeyFunc
const (
ControllerName = "challenges-scheduler"
)
func init() {
controllerpkg.Register(ControllerName, func(ctx *controllerpkg.Context) controllerpkg.Interface {
return New(ctx).Run
})
}

View File

@ -0,0 +1,113 @@
/*
Copyright 2018 The Jetstack cert-manager contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"github.com/jetstack/cert-manager/pkg/acme"
cmapi "github.com/jetstack/cert-manager/pkg/apis/certmanager/v1alpha1"
)
// Sync will process a single ACME challenge resource in order to determine
// whether it can be scheduled for processing.
// This is currently extremelly primitive, and **will not** do intelligent
// things like bumping challenges that are for already expired or nearing expiry
// certificates to the 'front' of the queue.
//
// This may be something to do in future - we could use a resyncFunc to build
// a stack of challenges to process, and upon observation of a new challenge,
// re-evaluate the whole stack.
//
// For now, this function will simply be used to solve https://github.com/jetstack/cert-manager/issues/951
func (c *Controller) Sync(ctx context.Context, ch *cmapi.Challenge) error {
// If the challenge is already in a final state, there is nothing more for
// us to do.
if acme.IsFinalState(ch.Status.State) {
return nil
}
// If the challenge already has 'processing' set to true, there is nothing
// more for us to do.
// The 'acmechallenges' controller is responsible for setting this field to
// false once processing has completed.
if ch.Status.Processing == true {
return nil
}
// Begin the scheduling algorithm! Here, we must evaluate all challenges
// currently in the apiserver, and their current state, in order to determine
// whether we can begin processing this challenge.
allChallenges, err := c.challengeLister.List(labels.Everything())
if err != nil {
return err
}
// First, filter out all challenges that are *not* being processed.
// With our naive scheduling algorithm, we only care about avoiding *duplicate*
// challenges occurring at once.
inFlightChallenges := removeNotProcessingChallenges(allChallenges)
// if any other challenges are in-flight with the same challenge type and
// same dnsName, we will *not* mark this challenge as processing
for _, inFCh := range inFlightChallenges {
if ch.Spec.DNSName == inFCh.Spec.DNSName && ch.Spec.Type == inFCh.Spec.Type {
return fmt.Errorf("another %q challenge for challenge %q (domain %q) is in progress, waiting until it is complete before processing", ch.Spec.Type, ch.Name, ch.Spec.DNSName)
}
}
// if there are no 'conflicts' detected above, then we can mark this challenge
// as processing.
ch.Status.Processing = true
_, err = c.CMClient.CertmanagerV1alpha1().Challenges(ch.Namespace).Update(ch)
if err != nil {
return err
}
// we ignore the return value from waitForCacheSync - if it is false, the
// controller will shutdown anyway.
_ = c.waitForCacheSync()
return nil
}
// removeNotProcessingChallenges will filter out challenges from the given slice
// that have status.processing set to false.
// TODO: we currently call this function on every call to Sync().
// In large deployments, this could cause high CPU and memory consumption as it
// works at O(n^2) complexity (i.e. for every challenge, we have to touch every
// challenge).
func removeNotProcessingChallenges(chs []*cmapi.Challenge) []*cmapi.Challenge {
// TODO: there's probably a more efficient way to manage this that doesn't
// involve constructing large slices and using append.
var ret []*cmapi.Challenge
for _, ch := range chs {
if ch.Status.Processing {
ret = append(ret, ch)
}
}
return ret
}
func (c *Controller) waitForCacheSync() bool {
return cache.WaitForCacheSync(c.stopCh, c.challengesHasSynced)
}

View File

@ -0,0 +1,120 @@
/*
Copyright 2018 The Jetstack cert-manager contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"testing"
"k8s.io/apimachinery/pkg/runtime"
coretesting "k8s.io/client-go/testing"
"github.com/jetstack/cert-manager/pkg/apis/certmanager/v1alpha1"
"github.com/jetstack/cert-manager/pkg/controller/test"
testpkg "github.com/jetstack/cert-manager/pkg/controller/test"
"github.com/jetstack/cert-manager/test/unit/gen"
)
type fixture struct {
Controller *Controller
*test.Builder
Challenge *v1alpha1.Challenge
PreFn func(*testing.T, *fixture)
CheckFn func(*testing.T, *fixture, ...interface{})
Err bool
Ctx context.Context
}
func TestSync(t *testing.T) {
tests := map[string]fixture{
"with one challenge in api, mark processing=true": {
Builder: &testpkg.Builder{
CertManagerObjects: []runtime.Object{
gen.Challenge("test",
gen.SetChallengeDNSName("example.com"))},
ExpectedActions: []testpkg.Action{
testpkg.NewAction(coretesting.NewUpdateAction(v1alpha1.SchemeGroupVersion.WithResource("challenges"), defaultTestNamespace,
gen.Challenge("test",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeProcessing(true)))),
},
},
Challenge: gen.Challenge("test", setDNSName("example.com")),
},
"when a duplicate challenge exists in the API, and is processing, don't mark next one as processing": {
Builder: &testpkg.Builder{
CertManagerObjects: []runtime.Object{
gen.Challenge("test",
gen.SetChallengeDNSName("example.com")),
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeProcessing(true)),
},
},
Challenge: gen.Challenge("test", setDNSName("example.com")),
Err: true,
},
"skip elements that are already marked as processing=true": {
Builder: &testpkg.Builder{
CertManagerObjects: []runtime.Object{
gen.Challenge("test",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeProcessing(true)),
},
},
Challenge: gen.Challenge("test",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeProcessing(true)),
},
"skip elements that are already in a final state": {
Builder: &testpkg.Builder{
CertManagerObjects: []runtime.Object{
gen.Challenge("test",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeState(v1alpha1.Invalid)),
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeProcessing(true)),
},
},
Challenge: gen.Challenge("test",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeState(v1alpha1.Invalid)),
},
}
for n, test := range tests {
t.Run(n, func(t *testing.T) {
if test.Builder == nil {
test.Builder = &testpkg.Builder{}
}
test.Setup(t)
chalCopy := test.Challenge.DeepCopy()
err := test.Controller.Sync(test.Ctx, chalCopy)
if err != nil && !test.Err {
t.Errorf("Expected function to not error, but got: %v", err)
}
if err == nil && test.Err {
t.Errorf("Expected function to get an error, but got: %v", err)
}
test.Finish(t, chalCopy, err)
})
}
}

View File

@ -0,0 +1,62 @@
/*
Copyright 2018 The Jetstack cert-manager contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"testing"
"github.com/jetstack/cert-manager/pkg/controller/test"
)
func (f *fixture) Setup(t *testing.T) {
if f.Ctx == nil {
f.Ctx = context.Background()
}
if f.Builder == nil {
f.Builder = &test.Builder{}
}
f.Controller = f.buildFakeController(f.Builder)
if f.PreFn != nil {
f.PreFn(t, f)
f.Builder.Sync()
}
}
func (f *fixture) Finish(t *testing.T, args ...interface{}) {
defer f.Builder.Stop()
if err := f.Builder.AllReactorsCalled(); err != nil {
t.Errorf("Not all expected reactors were called: %v", err)
}
if err := f.Builder.AllActionsExecuted(); err != nil {
t.Errorf(err.Error())
}
// resync listers before running checks
f.Builder.Sync()
// run custom checks
if f.CheckFn != nil {
f.CheckFn(t, f, args...)
}
}
func (f *fixture) buildFakeController(b *test.Builder) *Controller {
b.Start()
c := New(b.Context)
b.Sync()
return c
}

View File

@ -57,6 +57,12 @@ func (c *Controller) Sync(ctx context.Context, ch *cmapi.Challenge) (err error)
oldChal := ch
ch = ch.DeepCopy()
// bail out early on if processing=false, as this challenge has not been
// scheduled yet.
if ch.Status.Processing == false {
return nil
}
defer func() {
// TODO: replace with more efficient comparison
if reflect.DeepEqual(oldChal.Status, ch.Status) {
@ -71,6 +77,8 @@ func (c *Controller) Sync(ctx context.Context, ch *cmapi.Challenge) (err error)
// if a challenge is in a final state, we bail out early as there is nothing
// left for us to do here.
if acme.IsFinalState(ch.Status.State) {
// we set processing to false now, as this item has finished being processed.
ch.Status.Processing = false
return nil
}

View File

@ -164,12 +164,6 @@ func testDNSProvider(name string, p dns01Provider) bool {
})
It("should obtain a signed certificate for a wildcard and apex domain", func() {
// We skip this test for now, as it will always fail until we implement
// 'serial' solving of ACME challenges.
// See https://github.com/jetstack/cert-manager/issues/951 for more info.
// This test **must** be enabled before a new release can be cut.
Skip("Test disabled pending #951 being implemented")
By("Creating a Certificate")
certClient := f.CertManagerClientSet.CertmanagerV1alpha1().Certificates(f.Namespace.Name)

View File

@ -81,3 +81,9 @@ func SetChallengeURL(s string) ChallengeModifier {
ch.Spec.URL = s
}
}
func SetChallengeProcessing(b bool) ChallengeModifier {
return func(ch *v1alpha1.Challenge) {
ch.Status.Processing = b
}
}