From bca6ed6e64a0db9dc36a4bd9698dcdbb83500e5b Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Thu, 29 Nov 2018 18:01:07 +0000 Subject: [PATCH] Switch ACME challenge scheduler to evaluate all challenges at once Signed-off-by: James Munnelly --- cmd/controller/BUILD.bazel | 1 - cmd/controller/app/BUILD.bazel | 1 - cmd/controller/app/controller.go | 4 - cmd/controller/app/options/BUILD.bazel | 1 - cmd/controller/app/options/options.go | 2 - cmd/controller/start.go | 1 - pkg/controller/acmechallenges/BUILD.bazel | 1 + pkg/controller/acmechallenges/controller.go | 50 +++ .../acmechallenges/scheduler/BUILD.bazel | 26 +- .../acmechallenges/scheduler/controller.go | 160 ---------- .../acmechallenges/scheduler/scheduler.go | 250 +++++++++++++++ .../scheduler/scheduler_test.go | 298 ++++++++++++++++++ .../acmechallenges/scheduler/sync.go | 116 ------- .../acmechallenges/scheduler/sync_test.go | 120 ------- .../acmechallenges/scheduler/util_test.go | 62 ---- 15 files changed, 606 insertions(+), 487 deletions(-) delete mode 100644 pkg/controller/acmechallenges/scheduler/controller.go create mode 100644 pkg/controller/acmechallenges/scheduler/scheduler.go create mode 100644 pkg/controller/acmechallenges/scheduler/scheduler_test.go delete mode 100644 pkg/controller/acmechallenges/scheduler/sync.go delete mode 100644 pkg/controller/acmechallenges/scheduler/sync_test.go delete mode 100644 pkg/controller/acmechallenges/scheduler/util_test.go diff --git a/cmd/controller/BUILD.bazel b/cmd/controller/BUILD.bazel index 9731db3b4..6710f6025 100644 --- a/cmd/controller/BUILD.bazel +++ b/cmd/controller/BUILD.bazel @@ -19,7 +19,6 @@ go_library( "//cmd/controller/app:go_default_library", "//cmd/controller/app/options:go_default_library", "//pkg/controller/acmechallenges:go_default_library", - "//pkg/controller/acmechallenges/scheduler:go_default_library", "//pkg/controller/acmeorders:go_default_library", "//pkg/controller/certificates:go_default_library", "//pkg/controller/clusterissuers:go_default_library", diff --git a/cmd/controller/app/BUILD.bazel b/cmd/controller/app/BUILD.bazel index 89d28d680..316468e2e 100644 --- a/cmd/controller/app/BUILD.bazel +++ b/cmd/controller/app/BUILD.bazel @@ -11,7 +11,6 @@ go_library( "//pkg/client/clientset/versioned/scheme:go_default_library", "//pkg/client/informers/externalversions:go_default_library", "//pkg/controller:go_default_library", - "//pkg/controller/acmechallenges/scheduler:go_default_library", "//pkg/issuer/acme/dns/util:go_default_library", "//pkg/metrics:go_default_library", "//pkg/util:go_default_library", diff --git a/cmd/controller/app/controller.go b/cmd/controller/app/controller.go index 12c57e763..605957ffa 100644 --- a/cmd/controller/app/controller.go +++ b/cmd/controller/app/controller.go @@ -39,7 +39,6 @@ import ( intscheme "github.com/jetstack/cert-manager/pkg/client/clientset/versioned/scheme" informers "github.com/jetstack/cert-manager/pkg/client/informers/externalversions" "github.com/jetstack/cert-manager/pkg/controller" - "github.com/jetstack/cert-manager/pkg/controller/acmechallenges/scheduler" dnsutil "github.com/jetstack/cert-manager/pkg/issuer/acme/dns/util" "github.com/jetstack/cert-manager/pkg/metrics" "github.com/jetstack/cert-manager/pkg/util" @@ -76,9 +75,6 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) { glog.Infof("Starting %s controller", n) workers := 5 - if n == scheduler.ControllerName { - workers = 1 - } err := fn(workers, stopCh) if err != nil { diff --git a/cmd/controller/app/options/BUILD.bazel b/cmd/controller/app/options/BUILD.bazel index ff5eb91db..9fa415698 100644 --- a/cmd/controller/app/options/BUILD.bazel +++ b/cmd/controller/app/options/BUILD.bazel @@ -7,7 +7,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/controller/acmechallenges:go_default_library", - "//pkg/controller/acmechallenges/scheduler:go_default_library", "//pkg/controller/acmeorders:go_default_library", "//pkg/controller/certificates:go_default_library", "//pkg/controller/clusterissuers:go_default_library", diff --git a/cmd/controller/app/options/options.go b/cmd/controller/app/options/options.go index 0c877d8b2..2ba4001c5 100644 --- a/cmd/controller/app/options/options.go +++ b/cmd/controller/app/options/options.go @@ -26,7 +26,6 @@ import ( "github.com/jetstack/cert-manager/pkg/util" challengescontroller "github.com/jetstack/cert-manager/pkg/controller/acmechallenges" - "github.com/jetstack/cert-manager/pkg/controller/acmechallenges/scheduler" orderscontroller "github.com/jetstack/cert-manager/pkg/controller/acmeorders" certificatescontroller "github.com/jetstack/cert-manager/pkg/controller/certificates" clusterissuerscontroller "github.com/jetstack/cert-manager/pkg/controller/clusterissuers" @@ -106,7 +105,6 @@ var ( ingressshimcontroller.ControllerName, orderscontroller.ControllerName, challengescontroller.ControllerName, - scheduler.ControllerName, } ) diff --git a/cmd/controller/start.go b/cmd/controller/start.go index 3822b9abf..4e41198c2 100644 --- a/cmd/controller/start.go +++ b/cmd/controller/start.go @@ -28,7 +28,6 @@ import ( "github.com/jetstack/cert-manager/cmd/controller/app" "github.com/jetstack/cert-manager/cmd/controller/app/options" _ "github.com/jetstack/cert-manager/pkg/controller/acmechallenges" - _ "github.com/jetstack/cert-manager/pkg/controller/acmechallenges/scheduler" _ "github.com/jetstack/cert-manager/pkg/controller/acmeorders" _ "github.com/jetstack/cert-manager/pkg/controller/certificates" _ "github.com/jetstack/cert-manager/pkg/controller/clusterissuers" diff --git a/pkg/controller/acmechallenges/BUILD.bazel b/pkg/controller/acmechallenges/BUILD.bazel index 79f6bee5b..506481187 100644 --- a/pkg/controller/acmechallenges/BUILD.bazel +++ b/pkg/controller/acmechallenges/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/apis/certmanager/v1alpha1:go_default_library", "//pkg/client/listers/certmanager/v1alpha1:go_default_library", "//pkg/controller:go_default_library", + "//pkg/controller/acmechallenges/scheduler:go_default_library", "//pkg/issuer/acme/dns:go_default_library", "//pkg/issuer/acme/http:go_default_library", "//pkg/util:go_default_library", diff --git a/pkg/controller/acmechallenges/controller.go b/pkg/controller/acmechallenges/controller.go index 769959b66..8c4e7d444 100644 --- a/pkg/controller/acmechallenges/controller.go +++ b/pkg/controller/acmechallenges/controller.go @@ -23,6 +23,7 @@ import ( "time" "github.com/golang/glog" + corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -33,6 +34,7 @@ import ( "github.com/jetstack/cert-manager/pkg/acme" cmlisters "github.com/jetstack/cert-manager/pkg/client/listers/certmanager/v1alpha1" controllerpkg "github.com/jetstack/cert-manager/pkg/controller" + "github.com/jetstack/cert-manager/pkg/controller/acmechallenges/scheduler" "github.com/jetstack/cert-manager/pkg/issuer/acme/dns" "github.com/jetstack/cert-manager/pkg/issuer/acme/http" "github.com/jetstack/cert-manager/pkg/util" @@ -60,6 +62,8 @@ type Controller struct { watchedInformers []cache.InformerSynced queue workqueue.RateLimitingInterface + + scheduler *scheduler.Scheduler } func New(ctx *controllerpkg.Context) *Controller { @@ -101,6 +105,7 @@ func New(ctx *controllerpkg.Context) *Controller { ctrl.httpSolver = http.NewSolver(ctx) ctrl.dnsSolver = dns.NewSolver(ctx) + ctrl.scheduler = scheduler.New(ctrl.challengeLister) return ctrl } @@ -123,6 +128,10 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error { }, time.Second, stopCh) } + // TODO: properly plumb in stopCh and WaitGroup to scheduler + // Run the scheduler once per second + go wait.Until(c.runScheduler, time.Second*1, stopCh) + <-stopCh glog.V(4).Infof("Shutting down queue as workqueue signaled shutdown") c.queue.ShutDown() @@ -132,6 +141,47 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error { return nil } +// MaxChallengesPerSchedule is the maximum number of challenges that can be +// scheduled with a single call to the scheduler. +// This provides a very crude rate limit on how many challenges we will schedule +// per second. It may be better to remove this altogether in favour of some +// other method of rate limiting creations. +// TODO: make this configurable +const MaxChallengesPerSchedule = 20 + +// runScheduler will execute the scheduler's ScheduleN function to determine +// which, if any, challenges should be rescheduled. +// TODO: it should also only re-run the scheduler if a change to challenges has +// been observed, to save needless work +func (c *Controller) runScheduler() { + toSchedule, err := c.scheduler.ScheduleN(MaxChallengesPerSchedule) + if err != nil { + runtime.HandleError(fmt.Errorf("Error determining set of challenges that should be scheduled for processing: %v", err)) + return + } + + for _, ch := range toSchedule { + ch = ch.DeepCopy() + ch.Status.Processing = true + + _, err := c.CMClient.CertmanagerV1alpha1().Challenges(ch.Namespace).Update(ch) + if err != nil { + runtime.HandleError(fmt.Errorf("Error scheduling challenge %s/%s for processing: %v", ch.Namespace, ch.Name, err)) + return + } + + c.Recorder.Event(ch, corev1.EventTypeNormal, "Started", "Challenge scheduled for processing") + } + + if len(toSchedule) > 0 { + plural := "" + if len(toSchedule) > 1 { + plural = "s" + } + glog.V(4).Infof("Scheduled %d challenge%s for processing", len(toSchedule), plural) + } +} + func (c *Controller) worker(stopCh <-chan struct{}) { glog.V(4).Infof("Starting %q worker", ControllerName) for { diff --git a/pkg/controller/acmechallenges/scheduler/BUILD.bazel b/pkg/controller/acmechallenges/scheduler/BUILD.bazel index 300836707..d4870b276 100644 --- a/pkg/controller/acmechallenges/scheduler/BUILD.bazel +++ b/pkg/controller/acmechallenges/scheduler/BUILD.bazel @@ -2,42 +2,30 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = [ - "controller.go", - "sync.go", - ], + srcs = ["scheduler.go"], importpath = "github.com/jetstack/cert-manager/pkg/controller/acmechallenges/scheduler", visibility = ["//visibility:public"], deps = [ "//pkg/acme:go_default_library", "//pkg/apis/certmanager/v1alpha1:go_default_library", "//pkg/client/listers/certmanager/v1alpha1:go_default_library", - "//pkg/controller:go_default_library", - "//pkg/util:go_default_library", "//vendor/github.com/golang/glog:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) go_test( name = "go_default_test", - srcs = [ - "sync_test.go", - "util_test.go", - ], + srcs = ["scheduler_test.go"], embed = [":go_default_library"], deps = [ "//pkg/apis/certmanager/v1alpha1:go_default_library", - "//pkg/controller/test:go_default_library", + "//pkg/client/clientset/versioned/fake:go_default_library", + "//pkg/client/informers/externalversions:go_default_library", + "//pkg/util:go_default_library", "//test/unit/gen:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/client-go/testing:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library", ], ) diff --git a/pkg/controller/acmechallenges/scheduler/controller.go b/pkg/controller/acmechallenges/scheduler/controller.go deleted file mode 100644 index 085d4883b..000000000 --- a/pkg/controller/acmechallenges/scheduler/controller.go +++ /dev/null @@ -1,160 +0,0 @@ -/* -Copyright 2018 The Jetstack cert-manager contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package scheduler - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/golang/glog" - k8sErrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" - - cmlisters "github.com/jetstack/cert-manager/pkg/client/listers/certmanager/v1alpha1" - controllerpkg "github.com/jetstack/cert-manager/pkg/controller" - "github.com/jetstack/cert-manager/pkg/util" -) - -type Controller struct { - controllerpkg.Context - - // To allow injection for testing. - syncHandler func(ctx context.Context, key string) error - - challengeLister cmlisters.ChallengeLister - stopCh <-chan struct{} - - challengesHasSynced cache.InformerSynced - queue workqueue.RateLimitingInterface -} - -func New(ctx *controllerpkg.Context) *Controller { - ctrl := &Controller{Context: *ctx} - ctrl.syncHandler = ctrl.processNextWorkItem - - ctrl.queue = workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), "challenges-scheduler") - - challengeInformer := ctrl.SharedInformerFactory.Certmanager().V1alpha1().Challenges() - challengeInformer.Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{Queue: ctrl.queue}) - ctrl.challengesHasSynced = challengeInformer.Informer().HasSynced - ctrl.challengeLister = challengeInformer.Lister() - - return ctrl -} - -func (c *Controller) Run(workers int, stopCh <-chan struct{}) error { - c.stopCh = stopCh - - glog.V(4).Infof("Starting %s control loop", ControllerName) - // wait for all the informer caches we depend on are synced - if !cache.WaitForCacheSync(stopCh, c.challengesHasSynced) { - return fmt.Errorf("error waiting for informer caches to sync") - } - - var wg sync.WaitGroup - for i := 0; i < workers; i++ { - wg.Add(1) - // TODO (@munnerz): make time.Second duration configurable - go wait.Until(func() { - defer wg.Done() - c.worker(stopCh) - }, - time.Second, stopCh) - } - <-stopCh - glog.V(4).Infof("Shutting down queue as workqueue signaled shutdown") - c.queue.ShutDown() - glog.V(4).Infof("Waiting for workers to exit...") - wg.Wait() - glog.V(4).Infof("Workers exited.") - return nil -} - -func (c *Controller) worker(stopCh <-chan struct{}) { - glog.V(4).Infof("Starting %q worker", ControllerName) - for { - obj, shutdown := c.queue.Get() - if shutdown { - break - } - - var key string - err := func(obj interface{}) error { - defer c.queue.Done(obj) - var ok bool - if key, ok = obj.(string); !ok { - return nil - } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ctx = util.ContextWithStopCh(ctx, stopCh) - glog.Infof("%s controller: syncing item '%s'", ControllerName, key) - if err := c.syncHandler(ctx, key); err != nil { - return err - } - c.queue.Forget(obj) - return nil - }(obj) - - if err != nil { - glog.Errorf("%s controller: Re-queuing item %q due to error processing: %s", ControllerName, key, err.Error()) - c.queue.AddRateLimited(obj) - continue - } - - glog.Infof("%s controller: Finished processing work item %q", ControllerName, key) - } - glog.V(4).Infof("Exiting %q worker loop", ControllerName) -} - -func (c *Controller) processNextWorkItem(ctx context.Context, key string) error { - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) - return nil - } - - ch, err := c.challengeLister.Challenges(namespace).Get(name) - - if err != nil { - if k8sErrors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("ch '%s' in work queue no longer exists", key)) - return nil - } - - return err - } - - return c.Sync(ctx, ch) -} - -var keyFunc = controllerpkg.KeyFunc - -const ( - ControllerName = "challenges-scheduler" -) - -func init() { - controllerpkg.Register(ControllerName, func(ctx *controllerpkg.Context) controllerpkg.Interface { - return New(ctx).Run - }) -} diff --git a/pkg/controller/acmechallenges/scheduler/scheduler.go b/pkg/controller/acmechallenges/scheduler/scheduler.go new file mode 100644 index 000000000..3788e68cc --- /dev/null +++ b/pkg/controller/acmechallenges/scheduler/scheduler.go @@ -0,0 +1,250 @@ +/* +Copyright 2018 The Jetstack cert-manager contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "sort" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/labels" + + "github.com/jetstack/cert-manager/pkg/acme" + cmapi "github.com/jetstack/cert-manager/pkg/apis/certmanager/v1alpha1" + cmlisters "github.com/jetstack/cert-manager/pkg/client/listers/certmanager/v1alpha1" +) + +const ( + // MaxConcurrentChallenges is the total maximum number of challenges that + // can be scheduled as 'processing' at once. + MaxConcurrentChallenges = 60 +) + +// Scheduler implements an ACME challenge scheduler that applies heuristics +// to challenge resources in order to determine which challenges should be +// processing at a given time. +type Scheduler struct { + challengeLister cmlisters.ChallengeLister +} + +// New will construct a new instance of a scheduler +func New(l cmlisters.ChallengeLister) *Scheduler { + return &Scheduler{challengeLister: l} +} + +// ScheduleN will return a maximum of N challenge resources that should be +// scheduled for processing. +// It may return an empty list if there are no challenges that can/should be +// scheduled. +func (s *Scheduler) ScheduleN(n int) ([]*cmapi.Challenge, error) { + // Get a list of all challenges from the cache + allChallenges, err := s.challengeLister.List(labels.Everything()) + if err != nil { + return nil, err + } + + return s.scheduleN(n, allChallenges) +} + +func (s *Scheduler) scheduleN(n int, allChallenges []*cmapi.Challenge) ([]*cmapi.Challenge, error) { + // Determine the list of challenges that could feasibly be scheduled on + // this pass of the scheduler. + // This function returns a list of candidates sorted by creation timestamp. + candidates, inProgressChallengeCount, err := s.determineChallengeCandidates(allChallenges) + if err != nil { + return nil, err + } + + numberToSelect := n + remainingNumberAllowedChallenges := MaxConcurrentChallenges - inProgressChallengeCount + if numberToSelect > remainingNumberAllowedChallenges { + numberToSelect = remainingNumberAllowedChallenges + } + + candidates, err = s.selectChallengesToSchedule(candidates, numberToSelect) + if err != nil { + return nil, err + } + + return candidates, nil +} + +// selectChallengesToSchedule will apply some sorting heuristic to the allowed +// challenge candidates and return a maximum of N challenges that should be +// scheduled for processing. +func (s *Scheduler) selectChallengesToSchedule(candidates []*cmapi.Challenge, n int) ([]*cmapi.Challenge, error) { + // Trim the candidates returned to 'n' + if len(candidates) > n { + candidates = candidates[:n] + } + return candidates, nil +} + +// determineChallengeCandidates will determine which, if any, challenges can +// be scheduled given the current state of items to be scheduled and currently +// processing. +// The returned challenges will be sorted in ascending order based on timestamp +// (i.e. the oldest challenge will be element zero). +func (s *Scheduler) determineChallengeCandidates(allChallenges []*cmapi.Challenge) ([]*cmapi.Challenge, int, error) { + // consider the entire set of challenges for 'in progress', in case a challenge + // has processing=true whilst still being in a 'final' state + inProgress := processingChallenges(allChallenges) + inProgressChallengeCount := len(inProgress) + + // Ensure we only run a max of MaxConcurrentChallenges at a time + // We perform this check here to avoid extra processing if we've already + // hit the maximum number of challenges. + if inProgressChallengeCount >= MaxConcurrentChallenges { + glog.V(4).Infof("There are currently %d running challenges, with a maximum configured of %d. Refusing to schedule more challenges.", len(inProgress), MaxConcurrentChallenges) + return []*cmapi.Challenge{}, inProgressChallengeCount, nil + } + + // Calculate incomplete challenges + incomplete := incompleteChallenges(allChallenges) + // This is the list that we will be filtering/scheduling from + unfilteredCandidates := notProcessingChallenges(incomplete) + + // Never process multiple challenges for the same domain and solver type + // at any one time + // In-place deduplication: https://github.com/golang/go/wiki/SliceTricks + dedupedCandidates := dedupeChallenges(unfilteredCandidates) + + // If there are any already in-progress challenges for a domain and type, + // filter them out. + candidates := quickFilterChallenges(dedupedCandidates, func(ch *cmapi.Challenge) bool { + for _, inPCh := range inProgress { + if compareChallenges(ch, inPCh) == 0 { + glog.V(6).Infof("There is already a challenge processing for domain %q (type %q)", ch.Spec.DNSName, ch.Spec.Type) + return false + } + } + return true + }) + + // Finally, sorted the challenges by timestamp to ensure a stable output + sortChallengesByTimestamp(candidates) + + return candidates, inProgressChallengeCount, nil +} + +func sortChallengesByTimestamp(chs []*cmapi.Challenge) { + sort.Slice(chs, func(i, j int) bool { + return chs[i].CreationTimestamp.Before(&chs[j].CreationTimestamp) + }) +} + +// notProcessingChallenges will filter out challenges from the given slice +// that have status.processing set to true. +func notProcessingChallenges(chs []*cmapi.Challenge) []*cmapi.Challenge { + return quickFilterChallenges(chs, func(ch *cmapi.Challenge) bool { + return !ch.Status.Processing + }) +} + +// processingChallenges will filter out challenges from the given slice +// that have status.processing set to false. +func processingChallenges(chs []*cmapi.Challenge) []*cmapi.Challenge { + return quickFilterChallenges(chs, func(ch *cmapi.Challenge) bool { + return ch.Status.Processing + }) +} + +// incompleteChallenges will filter out challenges from the given slice +// that are in a 'final' state +func incompleteChallenges(chs []*cmapi.Challenge) []*cmapi.Challenge { + return quickFilterChallenges(chs, func(ch *cmapi.Challenge) bool { + return !acme.IsFinalState(ch.Status.State) + }) +} + +// https://github.com/golang/go/wiki/SliceTricks#Filtering-without-allocating +func quickFilterChallenges(chs []*cmapi.Challenge, fn func(ch *cmapi.Challenge) bool) []*cmapi.Challenge { + ret := chs[:0] + for _, ch := range chs { + if fn(ch) { + ret = append(ret, ch) + } + } + + return ret +} + +// compareChallenges is used to compare two challenge resources. +// If two resources are 'equal', they will not be scheduled at the same time +// as they could cause a conflict. +func compareChallenges(l, r *cmapi.Challenge) int { + if l.Spec.DNSName < r.Spec.DNSName { + return -1 + } + if l.Spec.DNSName > r.Spec.DNSName { + return 1 + } + + if l.Spec.Type < r.Spec.Type { + return -1 + } + if l.Spec.Type > r.Spec.Type { + return 1 + } + + // TODO: check the http01.ingressClass attribute and allow two challenges + // with different ingress classes specified to be scheduled at once + + // TODO: check the dns01.provider attribute and allow two challenges with + // different providers to be scheduled at once + + return 0 +} + +// sortChallenges will sort the provided list of challenges according to the +// schedulers sorting heuristics. +// This is used to make deduplication of list items efficient (see dedupeChallenges) +func sortChallenges(chs []*cmapi.Challenge) { + sort.Slice(chs, func(i, j int) bool { + cmp := compareChallenges(chs[i], chs[j]) + if cmp != 0 { + return cmp == -1 + } + + // we have to take the creation timestamp into account when sorting if + // the other fields already match + if chs[i].CreationTimestamp.Time.UnixNano() < chs[j].CreationTimestamp.Time.UnixNano() { + return true + } + if chs[i].CreationTimestamp.Time.UnixNano() > chs[j].CreationTimestamp.Time.UnixNano() { + return false + } + + return false + }) +} + +// https://github.com/golang/go/wiki/SliceTricks#In-place-deduplicate-comparable +func dedupeChallenges(in []*cmapi.Challenge) []*cmapi.Challenge { + sortChallenges(in) + j := 0 + for i := 1; i < len(in); i++ { + if compareChallenges(in[j], in[i]) == 0 { + continue + } + j++ + in[i], in[j] = in[j], in[i] + } + if len(in) == 0 { + return in + } + return in[:j+1] +} diff --git a/pkg/controller/acmechallenges/scheduler/scheduler_test.go b/pkg/controller/acmechallenges/scheduler/scheduler_test.go new file mode 100644 index 000000000..83ba97842 --- /dev/null +++ b/pkg/controller/acmechallenges/scheduler/scheduler_test.go @@ -0,0 +1,298 @@ +/* +Copyright 2018 The Jetstack cert-manager contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + "reflect" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/diff" + + cmapi "github.com/jetstack/cert-manager/pkg/apis/certmanager/v1alpha1" + "github.com/jetstack/cert-manager/pkg/client/clientset/versioned/fake" + cminformers "github.com/jetstack/cert-manager/pkg/client/informers/externalversions" + "github.com/jetstack/cert-manager/pkg/util" + "github.com/jetstack/cert-manager/test/unit/gen" +) + +func randomChallenge(rand int) *cmapi.Challenge { + if rand == 0 { + rand = 10 + } + return gen.Challenge("test-"+util.RandStringRunes(10), + gen.SetChallengeDNSName(util.RandStringRunes(rand)), + gen.SetChallengeType("http-01")) +} + +func randomChallengeN(n int, rand int) []*cmapi.Challenge { + chs := make([]*cmapi.Challenge, n) + for i := range chs { + chs[i] = randomChallenge(rand) + } + return chs +} + +func ascendingChallengeN(n int, mods ...gen.ChallengeModifier) []*cmapi.Challenge { + chs := make([]*cmapi.Challenge, n) + for i := range chs { + name := fmt.Sprintf("test-%d", i) + chs[i] = gen.Challenge(name, + gen.SetChallengeDNSName(name), + gen.SetChallengeType("http-01")) + chs[i].CreationTimestamp = metav1.NewTime(time.Unix(int64(i), 0)) + for _, m := range mods { + m(chs[i]) + } + } + return chs +} + +func withCreationTimestamp(i int64) func(*cmapi.Challenge) { + return func(ch *cmapi.Challenge) { + ch.CreationTimestamp.Time = time.Unix(i, 0) + } +} + +func BenchmarkScheduleAscending(b *testing.B) { + counts := []int{10, 100, 1000, 10000, 100000, 1000000} + for _, c := range counts { + b.Run(fmt.Sprintf("With %d challenges to schedule", c), func(b *testing.B) { + chs := ascendingChallengeN(c) + s := &Scheduler{} + b.ResetTimer() + for n := 0; n < b.N; n++ { + s.scheduleN(30, chs) + } + }) + } +} + +func BenchmarkScheduleRandom(b *testing.B) { + counts := []int{10, 100, 1000, 10000, 100000, 1000000} + for _, c := range counts { + b.Run(fmt.Sprintf("With %d random challenges to schedule", c), func(b *testing.B) { + chs := randomChallengeN(c, 0) + s := &Scheduler{} + b.ResetTimer() + for n := 0; n < b.N; n++ { + s.scheduleN(30, chs) + } + }) + } +} + +func BenchmarkScheduleDuplicates(b *testing.B) { + counts := []int{10, 100, 1000, 10000, 100000, 1000000} + for _, c := range counts { + b.Run(fmt.Sprintf("With %d random but likely duplicate challenges to schedule", c), func(b *testing.B) { + chs := randomChallengeN(c, 3) + s := &Scheduler{} + b.ResetTimer() + for n := 0; n < b.N; n++ { + s.scheduleN(30, chs) + } + }) + } +} + +func TestScheduleN(t *testing.T) { + tests := []struct { + name string + n int + challenges []*cmapi.Challenge + expected []*cmapi.Challenge + err bool + }{ + { + name: "schedule a single challenge", + n: 5, + challenges: ascendingChallengeN(1), + expected: ascendingChallengeN(1), + }, + { + name: "schedule a maximum of N challenges", + n: 5, + challenges: ascendingChallengeN(10), + expected: ascendingChallengeN(5), + }, + { + name: "schedule a maximum of MaxConcurrentChallenges", + n: MaxConcurrentChallenges * 2, + challenges: ascendingChallengeN(MaxConcurrentChallenges * 2), + expected: ascendingChallengeN(MaxConcurrentChallenges), + }, + { + name: "schedule duplicate challenge if second challenge is in a final state", + n: 5, + challenges: []*cmapi.Challenge{ + gen.Challenge("test", + gen.SetChallengeDNSName("example.com")), + gen.Challenge("test2", + gen.SetChallengeDNSName("example.com"), + gen.SetChallengeState(cmapi.Valid)), + }, + expected: []*cmapi.Challenge{ + gen.Challenge("test", + gen.SetChallengeDNSName("example.com")), + }, + }, + { + name: "schedule a single duplicate in CreationTimestamp order", + n: 5, + challenges: []*cmapi.Challenge{ + gen.Challenge("test", + gen.SetChallengeDNSName("example.com"), + withCreationTimestamp(2)), + gen.Challenge("test2", + gen.SetChallengeDNSName("example.com"), + withCreationTimestamp(1)), + }, + expected: []*cmapi.Challenge{ + gen.Challenge("test2", + gen.SetChallengeDNSName("example.com"), + withCreationTimestamp(1)), + }, + }, + { + name: "schedule duplicate in CreationTimestamp order (inverted input)", + n: 5, + challenges: []*cmapi.Challenge{ + gen.Challenge("test2", + gen.SetChallengeDNSName("example.com"), + withCreationTimestamp(1)), + gen.Challenge("test", + gen.SetChallengeDNSName("example.com"), + withCreationTimestamp(2)), + }, + expected: []*cmapi.Challenge{ + gen.Challenge("test2", + gen.SetChallengeDNSName("example.com"), + withCreationTimestamp(1)), + }, + }, + { + name: "schedule duplicate challenges for the same domain if they have a different type", + n: 5, + challenges: []*cmapi.Challenge{ + gen.Challenge("test1", + gen.SetChallengeDNSName("example.com"), + gen.SetChallengeType("dns01")), + gen.Challenge("test2", + gen.SetChallengeDNSName("example.com"), + gen.SetChallengeType("http01")), + }, + expected: []*cmapi.Challenge{ + gen.Challenge("test1", + gen.SetChallengeDNSName("example.com"), + gen.SetChallengeType("dns01")), + gen.Challenge("test2", + gen.SetChallengeDNSName("example.com"), + gen.SetChallengeType("http01")), + }, + }, + { + name: "schedule duplicate challenges for the same domain if they have a different type (inverted input)", + n: 5, + challenges: []*cmapi.Challenge{ + gen.Challenge("test2", + gen.SetChallengeDNSName("example.com"), + gen.SetChallengeType("http01")), + gen.Challenge("test1", + gen.SetChallengeDNSName("example.com"), + gen.SetChallengeType("dns01")), + }, + expected: []*cmapi.Challenge{ + gen.Challenge("test1", + gen.SetChallengeDNSName("example.com"), + gen.SetChallengeType("dns01")), + gen.Challenge("test2", + gen.SetChallengeDNSName("example.com"), + gen.SetChallengeType("http01")), + }, + }, + { + name: "don't schedule when total number of scheduled challenges exceeds global maximum", + n: 5, + challenges: append( + ascendingChallengeN(MaxConcurrentChallenges, gen.SetChallengeProcessing(true)), + randomChallengeN(5, 0)..., + ), + }, + { + name: "don't schedule challenge if another one with the same dnsName exists", + n: 5, + challenges: []*cmapi.Challenge{ + gen.Challenge("test", + gen.SetChallengeDNSName("example.com")), + gen.Challenge("test2", + gen.SetChallengeDNSName("example.com"), + gen.SetChallengeProcessing(true)), + }, + }, + { + name: "don't schedule anything if all challenges are processing", + n: 5, + challenges: []*cmapi.Challenge{ + gen.Challenge("test", + gen.SetChallengeDNSName("example.com"), + gen.SetChallengeProcessing(true)), + gen.Challenge("test2", + gen.SetChallengeDNSName("example.com"), + gen.SetChallengeProcessing(true)), + }, + }, + { + name: "don't schedule anything if all challenges are in a final state", + n: 5, + challenges: []*cmapi.Challenge{ + gen.Challenge("test2", + gen.SetChallengeDNSName("example.com"), + gen.SetChallengeState(cmapi.Valid)), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + cl := fake.NewSimpleClientset() + factory := cminformers.NewSharedInformerFactory(cl, 0) + challengesInformer := factory.Certmanager().V1alpha1().Challenges() + for _, ch := range test.challenges { + challengesInformer.Informer().GetIndexer().Add(ch) + } + + s := New(challengesInformer.Lister()) + + if test.expected == nil { + test.expected = []*cmapi.Challenge{} + } + chs, err := s.ScheduleN(test.n) + if err != nil && !test.err { + t.Errorf("expected no error, but got: %v", err) + } + if err == nil && test.err { + t.Errorf("expected to get an error, but got none") + } + if !reflect.DeepEqual(chs, test.expected) { + t.Errorf("expected did not match actual: %v", diff.ObjectDiff(test.expected, chs)) + } + }) + } +} diff --git a/pkg/controller/acmechallenges/scheduler/sync.go b/pkg/controller/acmechallenges/scheduler/sync.go deleted file mode 100644 index dbfd65362..000000000 --- a/pkg/controller/acmechallenges/scheduler/sync.go +++ /dev/null @@ -1,116 +0,0 @@ -/* -Copyright 2018 The Jetstack cert-manager contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package scheduler - -import ( - "context" - "fmt" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/tools/cache" - - "github.com/jetstack/cert-manager/pkg/acme" - cmapi "github.com/jetstack/cert-manager/pkg/apis/certmanager/v1alpha1" -) - -// Sync will process a single ACME challenge resource in order to determine -// whether it can be scheduled for processing. -// This is currently extremelly primitive, and **will not** do intelligent -// things like bumping challenges that are for already expired or nearing expiry -// certificates to the 'front' of the queue. -// -// This may be something to do in future - we could use a resyncFunc to build -// a stack of challenges to process, and upon observation of a new challenge, -// re-evaluate the whole stack. -// -// For now, this function will simply be used to solve https://github.com/jetstack/cert-manager/issues/951 -func (c *Controller) Sync(ctx context.Context, ch *cmapi.Challenge) error { - // If the challenge is already in a final state, there is nothing more for - // us to do. - if acme.IsFinalState(ch.Status.State) { - return nil - } - - // If the challenge already has 'processing' set to true, there is nothing - // more for us to do. - // The 'acmechallenges' controller is responsible for setting this field to - // false once processing has completed. - if ch.Status.Processing == true { - return nil - } - - // Begin the scheduling algorithm! Here, we must evaluate all challenges - // currently in the apiserver, and their current state, in order to determine - // whether we can begin processing this challenge. - - allChallenges, err := c.challengeLister.List(labels.Everything()) - if err != nil { - return err - } - - // First, filter out all challenges that are *not* being processed. - // With our naive scheduling algorithm, we only care about avoiding *duplicate* - // challenges occurring at once. - inFlightChallenges := removeNotProcessingChallenges(allChallenges) - - // if any other challenges are in-flight with the same challenge type and - // same dnsName, we will *not* mark this challenge as processing - for _, inFCh := range inFlightChallenges { - if ch.Spec.DNSName == inFCh.Spec.DNSName && ch.Spec.Type == inFCh.Spec.Type { - return fmt.Errorf("another %q challenge for challenge %q (domain %q) is in progress, waiting until it is complete before processing", ch.Spec.Type, ch.Name, ch.Spec.DNSName) - } - } - - // if there are no 'conflicts' detected above, then we can mark this challenge - // as processing. - ch.Status.Processing = true - ch, err = c.CMClient.CertmanagerV1alpha1().Challenges(ch.Namespace).Update(ch) - if err != nil { - return err - } - - c.Recorder.Event(ch, corev1.EventTypeNormal, "Started", "Challenge scheduled for processing") - - // we ignore the return value from waitForCacheSync - if it is false, the - // controller will shutdown anyway. - _ = c.waitForCacheSync() - - return nil -} - -// removeNotProcessingChallenges will filter out challenges from the given slice -// that have status.processing set to false. -// TODO: we currently call this function on every call to Sync(). -// In large deployments, this could cause high CPU and memory consumption as it -// works at O(n^2) complexity (i.e. for every challenge, we have to touch every -// challenge). -func removeNotProcessingChallenges(chs []*cmapi.Challenge) []*cmapi.Challenge { - // TODO: there's probably a more efficient way to manage this that doesn't - // involve constructing large slices and using append. - var ret []*cmapi.Challenge - for _, ch := range chs { - if ch.Status.Processing { - ret = append(ret, ch) - } - } - return ret -} - -func (c *Controller) waitForCacheSync() bool { - return cache.WaitForCacheSync(c.stopCh, c.challengesHasSynced) -} diff --git a/pkg/controller/acmechallenges/scheduler/sync_test.go b/pkg/controller/acmechallenges/scheduler/sync_test.go deleted file mode 100644 index 15ae0e13e..000000000 --- a/pkg/controller/acmechallenges/scheduler/sync_test.go +++ /dev/null @@ -1,120 +0,0 @@ -/* -Copyright 2018 The Jetstack cert-manager contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package scheduler - -import ( - "context" - "testing" - - "k8s.io/apimachinery/pkg/runtime" - coretesting "k8s.io/client-go/testing" - - "github.com/jetstack/cert-manager/pkg/apis/certmanager/v1alpha1" - "github.com/jetstack/cert-manager/pkg/controller/test" - testpkg "github.com/jetstack/cert-manager/pkg/controller/test" - "github.com/jetstack/cert-manager/test/unit/gen" -) - -type fixture struct { - Controller *Controller - *test.Builder - - Challenge *v1alpha1.Challenge - - PreFn func(*testing.T, *fixture) - CheckFn func(*testing.T, *fixture, ...interface{}) - Err bool - - Ctx context.Context -} - -func TestSync(t *testing.T) { - tests := map[string]fixture{ - "with one challenge in api, mark processing=true": { - Builder: &testpkg.Builder{ - CertManagerObjects: []runtime.Object{ - gen.Challenge("test", - gen.SetChallengeDNSName("example.com"))}, - ExpectedActions: []testpkg.Action{ - testpkg.NewAction(coretesting.NewUpdateAction(v1alpha1.SchemeGroupVersion.WithResource("challenges"), gen.DefaultTestNamespace, - gen.Challenge("test", - gen.SetChallengeDNSName("example.com"), - gen.SetChallengeProcessing(true)))), - }, - }, - Challenge: gen.Challenge("test", gen.SetChallengeDNSName("example.com")), - }, - "when a duplicate challenge exists in the API, and is processing, don't mark next one as processing": { - Builder: &testpkg.Builder{ - CertManagerObjects: []runtime.Object{ - gen.Challenge("test", - gen.SetChallengeDNSName("example.com")), - gen.Challenge("test2", - gen.SetChallengeDNSName("example.com"), - gen.SetChallengeProcessing(true)), - }, - }, - Challenge: gen.Challenge("test", gen.SetChallengeDNSName("example.com")), - Err: true, - }, - "skip elements that are already marked as processing=true": { - Builder: &testpkg.Builder{ - CertManagerObjects: []runtime.Object{ - gen.Challenge("test", - gen.SetChallengeDNSName("example.com"), - gen.SetChallengeProcessing(true)), - }, - }, - Challenge: gen.Challenge("test", - gen.SetChallengeDNSName("example.com"), - gen.SetChallengeProcessing(true)), - }, - "skip elements that are already in a final state": { - Builder: &testpkg.Builder{ - CertManagerObjects: []runtime.Object{ - gen.Challenge("test", - gen.SetChallengeDNSName("example.com"), - gen.SetChallengeState(v1alpha1.Invalid)), - gen.Challenge("test2", - gen.SetChallengeDNSName("example.com"), - gen.SetChallengeProcessing(true)), - }, - }, - Challenge: gen.Challenge("test", - gen.SetChallengeDNSName("example.com"), - gen.SetChallengeState(v1alpha1.Invalid)), - }, - } - - for n, test := range tests { - t.Run(n, func(t *testing.T) { - if test.Builder == nil { - test.Builder = &testpkg.Builder{} - } - test.Setup(t) - chalCopy := test.Challenge.DeepCopy() - err := test.Controller.Sync(test.Ctx, chalCopy) - if err != nil && !test.Err { - t.Errorf("Expected function to not error, but got: %v", err) - } - if err == nil && test.Err { - t.Errorf("Expected function to get an error, but got: %v", err) - } - test.Finish(t, chalCopy, err) - }) - } -} diff --git a/pkg/controller/acmechallenges/scheduler/util_test.go b/pkg/controller/acmechallenges/scheduler/util_test.go deleted file mode 100644 index 49d541c8e..000000000 --- a/pkg/controller/acmechallenges/scheduler/util_test.go +++ /dev/null @@ -1,62 +0,0 @@ -/* -Copyright 2018 The Jetstack cert-manager contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package scheduler - -import ( - "context" - "testing" - - "github.com/jetstack/cert-manager/pkg/controller/test" -) - -func (f *fixture) Setup(t *testing.T) { - if f.Ctx == nil { - f.Ctx = context.Background() - } - if f.Builder == nil { - f.Builder = &test.Builder{} - } - f.Controller = f.buildFakeController(f.Builder) - if f.PreFn != nil { - f.PreFn(t, f) - f.Builder.Sync() - } -} - -func (f *fixture) Finish(t *testing.T, args ...interface{}) { - defer f.Builder.Stop() - if err := f.Builder.AllReactorsCalled(); err != nil { - t.Errorf("Not all expected reactors were called: %v", err) - } - if err := f.Builder.AllActionsExecuted(); err != nil { - t.Errorf(err.Error()) - } - - // resync listers before running checks - f.Builder.Sync() - // run custom checks - if f.CheckFn != nil { - f.CheckFn(t, f, args...) - } -} - -func (f *fixture) buildFakeController(b *test.Builder) *Controller { - b.Start() - c := New(b.Context) - b.Sync() - return c -}