Switch ACME challenge scheduler to evaluate all challenges at once

Signed-off-by: James Munnelly <james@munnelly.eu>
This commit is contained in:
James Munnelly 2018-11-29 18:01:07 +00:00
parent f4e5203f1c
commit bca6ed6e64
15 changed files with 606 additions and 487 deletions

View File

@ -19,7 +19,6 @@ go_library(
"//cmd/controller/app:go_default_library",
"//cmd/controller/app/options:go_default_library",
"//pkg/controller/acmechallenges:go_default_library",
"//pkg/controller/acmechallenges/scheduler:go_default_library",
"//pkg/controller/acmeorders:go_default_library",
"//pkg/controller/certificates:go_default_library",
"//pkg/controller/clusterissuers:go_default_library",

View File

@ -11,7 +11,6 @@ go_library(
"//pkg/client/clientset/versioned/scheme:go_default_library",
"//pkg/client/informers/externalversions:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/acmechallenges/scheduler:go_default_library",
"//pkg/issuer/acme/dns/util:go_default_library",
"//pkg/metrics:go_default_library",
"//pkg/util:go_default_library",

View File

@ -39,7 +39,6 @@ import (
intscheme "github.com/jetstack/cert-manager/pkg/client/clientset/versioned/scheme"
informers "github.com/jetstack/cert-manager/pkg/client/informers/externalversions"
"github.com/jetstack/cert-manager/pkg/controller"
"github.com/jetstack/cert-manager/pkg/controller/acmechallenges/scheduler"
dnsutil "github.com/jetstack/cert-manager/pkg/issuer/acme/dns/util"
"github.com/jetstack/cert-manager/pkg/metrics"
"github.com/jetstack/cert-manager/pkg/util"
@ -76,9 +75,6 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) {
glog.Infof("Starting %s controller", n)
workers := 5
if n == scheduler.ControllerName {
workers = 1
}
err := fn(workers, stopCh)
if err != nil {

View File

@ -7,7 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/controller/acmechallenges:go_default_library",
"//pkg/controller/acmechallenges/scheduler:go_default_library",
"//pkg/controller/acmeorders:go_default_library",
"//pkg/controller/certificates:go_default_library",
"//pkg/controller/clusterissuers:go_default_library",

View File

@ -26,7 +26,6 @@ import (
"github.com/jetstack/cert-manager/pkg/util"
challengescontroller "github.com/jetstack/cert-manager/pkg/controller/acmechallenges"
"github.com/jetstack/cert-manager/pkg/controller/acmechallenges/scheduler"
orderscontroller "github.com/jetstack/cert-manager/pkg/controller/acmeorders"
certificatescontroller "github.com/jetstack/cert-manager/pkg/controller/certificates"
clusterissuerscontroller "github.com/jetstack/cert-manager/pkg/controller/clusterissuers"
@ -106,7 +105,6 @@ var (
ingressshimcontroller.ControllerName,
orderscontroller.ControllerName,
challengescontroller.ControllerName,
scheduler.ControllerName,
}
)

View File

@ -28,7 +28,6 @@ import (
"github.com/jetstack/cert-manager/cmd/controller/app"
"github.com/jetstack/cert-manager/cmd/controller/app/options"
_ "github.com/jetstack/cert-manager/pkg/controller/acmechallenges"
_ "github.com/jetstack/cert-manager/pkg/controller/acmechallenges/scheduler"
_ "github.com/jetstack/cert-manager/pkg/controller/acmeorders"
_ "github.com/jetstack/cert-manager/pkg/controller/certificates"
_ "github.com/jetstack/cert-manager/pkg/controller/clusterissuers"

View File

@ -15,6 +15,7 @@ go_library(
"//pkg/apis/certmanager/v1alpha1:go_default_library",
"//pkg/client/listers/certmanager/v1alpha1:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/acmechallenges/scheduler:go_default_library",
"//pkg/issuer/acme/dns:go_default_library",
"//pkg/issuer/acme/http:go_default_library",
"//pkg/util:go_default_library",

View File

@ -23,6 +23,7 @@ import (
"time"
"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@ -33,6 +34,7 @@ import (
"github.com/jetstack/cert-manager/pkg/acme"
cmlisters "github.com/jetstack/cert-manager/pkg/client/listers/certmanager/v1alpha1"
controllerpkg "github.com/jetstack/cert-manager/pkg/controller"
"github.com/jetstack/cert-manager/pkg/controller/acmechallenges/scheduler"
"github.com/jetstack/cert-manager/pkg/issuer/acme/dns"
"github.com/jetstack/cert-manager/pkg/issuer/acme/http"
"github.com/jetstack/cert-manager/pkg/util"
@ -60,6 +62,8 @@ type Controller struct {
watchedInformers []cache.InformerSynced
queue workqueue.RateLimitingInterface
scheduler *scheduler.Scheduler
}
func New(ctx *controllerpkg.Context) *Controller {
@ -101,6 +105,7 @@ func New(ctx *controllerpkg.Context) *Controller {
ctrl.httpSolver = http.NewSolver(ctx)
ctrl.dnsSolver = dns.NewSolver(ctx)
ctrl.scheduler = scheduler.New(ctrl.challengeLister)
return ctrl
}
@ -123,6 +128,10 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
},
time.Second, stopCh)
}
// TODO: properly plumb in stopCh and WaitGroup to scheduler
// Run the scheduler once per second
go wait.Until(c.runScheduler, time.Second*1, stopCh)
<-stopCh
glog.V(4).Infof("Shutting down queue as workqueue signaled shutdown")
c.queue.ShutDown()
@ -132,6 +141,47 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
return nil
}
// MaxChallengesPerSchedule is the maximum number of challenges that can be
// scheduled with a single call to the scheduler.
// This provides a very crude rate limit on how many challenges we will schedule
// per second. It may be better to remove this altogether in favour of some
// other method of rate limiting creations.
// TODO: make this configurable
const MaxChallengesPerSchedule = 20
// runScheduler will execute the scheduler's ScheduleN function to determine
// which, if any, challenges should be rescheduled.
// TODO: it should also only re-run the scheduler if a change to challenges has
// been observed, to save needless work
func (c *Controller) runScheduler() {
toSchedule, err := c.scheduler.ScheduleN(MaxChallengesPerSchedule)
if err != nil {
runtime.HandleError(fmt.Errorf("Error determining set of challenges that should be scheduled for processing: %v", err))
return
}
for _, ch := range toSchedule {
ch = ch.DeepCopy()
ch.Status.Processing = true
_, err := c.CMClient.CertmanagerV1alpha1().Challenges(ch.Namespace).Update(ch)
if err != nil {
runtime.HandleError(fmt.Errorf("Error scheduling challenge %s/%s for processing: %v", ch.Namespace, ch.Name, err))
return
}
c.Recorder.Event(ch, corev1.EventTypeNormal, "Started", "Challenge scheduled for processing")
}
if len(toSchedule) > 0 {
plural := ""
if len(toSchedule) > 1 {
plural = "s"
}
glog.V(4).Infof("Scheduled %d challenge%s for processing", len(toSchedule), plural)
}
}
func (c *Controller) worker(stopCh <-chan struct{}) {
glog.V(4).Infof("Starting %q worker", ControllerName)
for {

View File

@ -2,42 +2,30 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"controller.go",
"sync.go",
],
srcs = ["scheduler.go"],
importpath = "github.com/jetstack/cert-manager/pkg/controller/acmechallenges/scheduler",
visibility = ["//visibility:public"],
deps = [
"//pkg/acme:go_default_library",
"//pkg/apis/certmanager/v1alpha1:go_default_library",
"//pkg/client/listers/certmanager/v1alpha1:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"sync_test.go",
"util_test.go",
],
srcs = ["scheduler_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/apis/certmanager/v1alpha1:go_default_library",
"//pkg/controller/test:go_default_library",
"//pkg/client/clientset/versioned/fake:go_default_library",
"//pkg/client/informers/externalversions:go_default_library",
"//pkg/util:go_default_library",
"//test/unit/gen:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library",
],
)

View File

@ -1,160 +0,0 @@
/*
Copyright 2018 The Jetstack cert-manager contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"fmt"
"sync"
"time"
"github.com/golang/glog"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
cmlisters "github.com/jetstack/cert-manager/pkg/client/listers/certmanager/v1alpha1"
controllerpkg "github.com/jetstack/cert-manager/pkg/controller"
"github.com/jetstack/cert-manager/pkg/util"
)
type Controller struct {
controllerpkg.Context
// To allow injection for testing.
syncHandler func(ctx context.Context, key string) error
challengeLister cmlisters.ChallengeLister
stopCh <-chan struct{}
challengesHasSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
}
func New(ctx *controllerpkg.Context) *Controller {
ctrl := &Controller{Context: *ctx}
ctrl.syncHandler = ctrl.processNextWorkItem
ctrl.queue = workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), "challenges-scheduler")
challengeInformer := ctrl.SharedInformerFactory.Certmanager().V1alpha1().Challenges()
challengeInformer.Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{Queue: ctrl.queue})
ctrl.challengesHasSynced = challengeInformer.Informer().HasSynced
ctrl.challengeLister = challengeInformer.Lister()
return ctrl
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
c.stopCh = stopCh
glog.V(4).Infof("Starting %s control loop", ControllerName)
// wait for all the informer caches we depend on are synced
if !cache.WaitForCacheSync(stopCh, c.challengesHasSynced) {
return fmt.Errorf("error waiting for informer caches to sync")
}
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
// TODO (@munnerz): make time.Second duration configurable
go wait.Until(func() {
defer wg.Done()
c.worker(stopCh)
},
time.Second, stopCh)
}
<-stopCh
glog.V(4).Infof("Shutting down queue as workqueue signaled shutdown")
c.queue.ShutDown()
glog.V(4).Infof("Waiting for workers to exit...")
wg.Wait()
glog.V(4).Infof("Workers exited.")
return nil
}
func (c *Controller) worker(stopCh <-chan struct{}) {
glog.V(4).Infof("Starting %q worker", ControllerName)
for {
obj, shutdown := c.queue.Get()
if shutdown {
break
}
var key string
err := func(obj interface{}) error {
defer c.queue.Done(obj)
var ok bool
if key, ok = obj.(string); !ok {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = util.ContextWithStopCh(ctx, stopCh)
glog.Infof("%s controller: syncing item '%s'", ControllerName, key)
if err := c.syncHandler(ctx, key); err != nil {
return err
}
c.queue.Forget(obj)
return nil
}(obj)
if err != nil {
glog.Errorf("%s controller: Re-queuing item %q due to error processing: %s", ControllerName, key, err.Error())
c.queue.AddRateLimited(obj)
continue
}
glog.Infof("%s controller: Finished processing work item %q", ControllerName, key)
}
glog.V(4).Infof("Exiting %q worker loop", ControllerName)
}
func (c *Controller) processNextWorkItem(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
ch, err := c.challengeLister.Challenges(namespace).Get(name)
if err != nil {
if k8sErrors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("ch '%s' in work queue no longer exists", key))
return nil
}
return err
}
return c.Sync(ctx, ch)
}
var keyFunc = controllerpkg.KeyFunc
const (
ControllerName = "challenges-scheduler"
)
func init() {
controllerpkg.Register(ControllerName, func(ctx *controllerpkg.Context) controllerpkg.Interface {
return New(ctx).Run
})
}

View File

@ -0,0 +1,250 @@
/*
Copyright 2018 The Jetstack cert-manager contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"sort"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/labels"
"github.com/jetstack/cert-manager/pkg/acme"
cmapi "github.com/jetstack/cert-manager/pkg/apis/certmanager/v1alpha1"
cmlisters "github.com/jetstack/cert-manager/pkg/client/listers/certmanager/v1alpha1"
)
const (
// MaxConcurrentChallenges is the total maximum number of challenges that
// can be scheduled as 'processing' at once.
MaxConcurrentChallenges = 60
)
// Scheduler implements an ACME challenge scheduler that applies heuristics
// to challenge resources in order to determine which challenges should be
// processing at a given time.
type Scheduler struct {
challengeLister cmlisters.ChallengeLister
}
// New will construct a new instance of a scheduler
func New(l cmlisters.ChallengeLister) *Scheduler {
return &Scheduler{challengeLister: l}
}
// ScheduleN will return a maximum of N challenge resources that should be
// scheduled for processing.
// It may return an empty list if there are no challenges that can/should be
// scheduled.
func (s *Scheduler) ScheduleN(n int) ([]*cmapi.Challenge, error) {
// Get a list of all challenges from the cache
allChallenges, err := s.challengeLister.List(labels.Everything())
if err != nil {
return nil, err
}
return s.scheduleN(n, allChallenges)
}
func (s *Scheduler) scheduleN(n int, allChallenges []*cmapi.Challenge) ([]*cmapi.Challenge, error) {
// Determine the list of challenges that could feasibly be scheduled on
// this pass of the scheduler.
// This function returns a list of candidates sorted by creation timestamp.
candidates, inProgressChallengeCount, err := s.determineChallengeCandidates(allChallenges)
if err != nil {
return nil, err
}
numberToSelect := n
remainingNumberAllowedChallenges := MaxConcurrentChallenges - inProgressChallengeCount
if numberToSelect > remainingNumberAllowedChallenges {
numberToSelect = remainingNumberAllowedChallenges
}
candidates, err = s.selectChallengesToSchedule(candidates, numberToSelect)
if err != nil {
return nil, err
}
return candidates, nil
}
// selectChallengesToSchedule will apply some sorting heuristic to the allowed
// challenge candidates and return a maximum of N challenges that should be
// scheduled for processing.
func (s *Scheduler) selectChallengesToSchedule(candidates []*cmapi.Challenge, n int) ([]*cmapi.Challenge, error) {
// Trim the candidates returned to 'n'
if len(candidates) > n {
candidates = candidates[:n]
}
return candidates, nil
}
// determineChallengeCandidates will determine which, if any, challenges can
// be scheduled given the current state of items to be scheduled and currently
// processing.
// The returned challenges will be sorted in ascending order based on timestamp
// (i.e. the oldest challenge will be element zero).
func (s *Scheduler) determineChallengeCandidates(allChallenges []*cmapi.Challenge) ([]*cmapi.Challenge, int, error) {
// consider the entire set of challenges for 'in progress', in case a challenge
// has processing=true whilst still being in a 'final' state
inProgress := processingChallenges(allChallenges)
inProgressChallengeCount := len(inProgress)
// Ensure we only run a max of MaxConcurrentChallenges at a time
// We perform this check here to avoid extra processing if we've already
// hit the maximum number of challenges.
if inProgressChallengeCount >= MaxConcurrentChallenges {
glog.V(4).Infof("There are currently %d running challenges, with a maximum configured of %d. Refusing to schedule more challenges.", len(inProgress), MaxConcurrentChallenges)
return []*cmapi.Challenge{}, inProgressChallengeCount, nil
}
// Calculate incomplete challenges
incomplete := incompleteChallenges(allChallenges)
// This is the list that we will be filtering/scheduling from
unfilteredCandidates := notProcessingChallenges(incomplete)
// Never process multiple challenges for the same domain and solver type
// at any one time
// In-place deduplication: https://github.com/golang/go/wiki/SliceTricks
dedupedCandidates := dedupeChallenges(unfilteredCandidates)
// If there are any already in-progress challenges for a domain and type,
// filter them out.
candidates := quickFilterChallenges(dedupedCandidates, func(ch *cmapi.Challenge) bool {
for _, inPCh := range inProgress {
if compareChallenges(ch, inPCh) == 0 {
glog.V(6).Infof("There is already a challenge processing for domain %q (type %q)", ch.Spec.DNSName, ch.Spec.Type)
return false
}
}
return true
})
// Finally, sorted the challenges by timestamp to ensure a stable output
sortChallengesByTimestamp(candidates)
return candidates, inProgressChallengeCount, nil
}
func sortChallengesByTimestamp(chs []*cmapi.Challenge) {
sort.Slice(chs, func(i, j int) bool {
return chs[i].CreationTimestamp.Before(&chs[j].CreationTimestamp)
})
}
// notProcessingChallenges will filter out challenges from the given slice
// that have status.processing set to true.
func notProcessingChallenges(chs []*cmapi.Challenge) []*cmapi.Challenge {
return quickFilterChallenges(chs, func(ch *cmapi.Challenge) bool {
return !ch.Status.Processing
})
}
// processingChallenges will filter out challenges from the given slice
// that have status.processing set to false.
func processingChallenges(chs []*cmapi.Challenge) []*cmapi.Challenge {
return quickFilterChallenges(chs, func(ch *cmapi.Challenge) bool {
return ch.Status.Processing
})
}
// incompleteChallenges will filter out challenges from the given slice
// that are in a 'final' state
func incompleteChallenges(chs []*cmapi.Challenge) []*cmapi.Challenge {
return quickFilterChallenges(chs, func(ch *cmapi.Challenge) bool {
return !acme.IsFinalState(ch.Status.State)
})
}
// https://github.com/golang/go/wiki/SliceTricks#Filtering-without-allocating
func quickFilterChallenges(chs []*cmapi.Challenge, fn func(ch *cmapi.Challenge) bool) []*cmapi.Challenge {
ret := chs[:0]
for _, ch := range chs {
if fn(ch) {
ret = append(ret, ch)
}
}
return ret
}
// compareChallenges is used to compare two challenge resources.
// If two resources are 'equal', they will not be scheduled at the same time
// as they could cause a conflict.
func compareChallenges(l, r *cmapi.Challenge) int {
if l.Spec.DNSName < r.Spec.DNSName {
return -1
}
if l.Spec.DNSName > r.Spec.DNSName {
return 1
}
if l.Spec.Type < r.Spec.Type {
return -1
}
if l.Spec.Type > r.Spec.Type {
return 1
}
// TODO: check the http01.ingressClass attribute and allow two challenges
// with different ingress classes specified to be scheduled at once
// TODO: check the dns01.provider attribute and allow two challenges with
// different providers to be scheduled at once
return 0
}
// sortChallenges will sort the provided list of challenges according to the
// schedulers sorting heuristics.
// This is used to make deduplication of list items efficient (see dedupeChallenges)
func sortChallenges(chs []*cmapi.Challenge) {
sort.Slice(chs, func(i, j int) bool {
cmp := compareChallenges(chs[i], chs[j])
if cmp != 0 {
return cmp == -1
}
// we have to take the creation timestamp into account when sorting if
// the other fields already match
if chs[i].CreationTimestamp.Time.UnixNano() < chs[j].CreationTimestamp.Time.UnixNano() {
return true
}
if chs[i].CreationTimestamp.Time.UnixNano() > chs[j].CreationTimestamp.Time.UnixNano() {
return false
}
return false
})
}
// https://github.com/golang/go/wiki/SliceTricks#In-place-deduplicate-comparable
func dedupeChallenges(in []*cmapi.Challenge) []*cmapi.Challenge {
sortChallenges(in)
j := 0
for i := 1; i < len(in); i++ {
if compareChallenges(in[j], in[i]) == 0 {
continue
}
j++
in[i], in[j] = in[j], in[i]
}
if len(in) == 0 {
return in
}
return in[:j+1]
}

View File

@ -0,0 +1,298 @@
/*
Copyright 2018 The Jetstack cert-manager contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"fmt"
"reflect"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/diff"
cmapi "github.com/jetstack/cert-manager/pkg/apis/certmanager/v1alpha1"
"github.com/jetstack/cert-manager/pkg/client/clientset/versioned/fake"
cminformers "github.com/jetstack/cert-manager/pkg/client/informers/externalversions"
"github.com/jetstack/cert-manager/pkg/util"
"github.com/jetstack/cert-manager/test/unit/gen"
)
func randomChallenge(rand int) *cmapi.Challenge {
if rand == 0 {
rand = 10
}
return gen.Challenge("test-"+util.RandStringRunes(10),
gen.SetChallengeDNSName(util.RandStringRunes(rand)),
gen.SetChallengeType("http-01"))
}
func randomChallengeN(n int, rand int) []*cmapi.Challenge {
chs := make([]*cmapi.Challenge, n)
for i := range chs {
chs[i] = randomChallenge(rand)
}
return chs
}
func ascendingChallengeN(n int, mods ...gen.ChallengeModifier) []*cmapi.Challenge {
chs := make([]*cmapi.Challenge, n)
for i := range chs {
name := fmt.Sprintf("test-%d", i)
chs[i] = gen.Challenge(name,
gen.SetChallengeDNSName(name),
gen.SetChallengeType("http-01"))
chs[i].CreationTimestamp = metav1.NewTime(time.Unix(int64(i), 0))
for _, m := range mods {
m(chs[i])
}
}
return chs
}
func withCreationTimestamp(i int64) func(*cmapi.Challenge) {
return func(ch *cmapi.Challenge) {
ch.CreationTimestamp.Time = time.Unix(i, 0)
}
}
func BenchmarkScheduleAscending(b *testing.B) {
counts := []int{10, 100, 1000, 10000, 100000, 1000000}
for _, c := range counts {
b.Run(fmt.Sprintf("With %d challenges to schedule", c), func(b *testing.B) {
chs := ascendingChallengeN(c)
s := &Scheduler{}
b.ResetTimer()
for n := 0; n < b.N; n++ {
s.scheduleN(30, chs)
}
})
}
}
func BenchmarkScheduleRandom(b *testing.B) {
counts := []int{10, 100, 1000, 10000, 100000, 1000000}
for _, c := range counts {
b.Run(fmt.Sprintf("With %d random challenges to schedule", c), func(b *testing.B) {
chs := randomChallengeN(c, 0)
s := &Scheduler{}
b.ResetTimer()
for n := 0; n < b.N; n++ {
s.scheduleN(30, chs)
}
})
}
}
func BenchmarkScheduleDuplicates(b *testing.B) {
counts := []int{10, 100, 1000, 10000, 100000, 1000000}
for _, c := range counts {
b.Run(fmt.Sprintf("With %d random but likely duplicate challenges to schedule", c), func(b *testing.B) {
chs := randomChallengeN(c, 3)
s := &Scheduler{}
b.ResetTimer()
for n := 0; n < b.N; n++ {
s.scheduleN(30, chs)
}
})
}
}
func TestScheduleN(t *testing.T) {
tests := []struct {
name string
n int
challenges []*cmapi.Challenge
expected []*cmapi.Challenge
err bool
}{
{
name: "schedule a single challenge",
n: 5,
challenges: ascendingChallengeN(1),
expected: ascendingChallengeN(1),
},
{
name: "schedule a maximum of N challenges",
n: 5,
challenges: ascendingChallengeN(10),
expected: ascendingChallengeN(5),
},
{
name: "schedule a maximum of MaxConcurrentChallenges",
n: MaxConcurrentChallenges * 2,
challenges: ascendingChallengeN(MaxConcurrentChallenges * 2),
expected: ascendingChallengeN(MaxConcurrentChallenges),
},
{
name: "schedule duplicate challenge if second challenge is in a final state",
n: 5,
challenges: []*cmapi.Challenge{
gen.Challenge("test",
gen.SetChallengeDNSName("example.com")),
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeState(cmapi.Valid)),
},
expected: []*cmapi.Challenge{
gen.Challenge("test",
gen.SetChallengeDNSName("example.com")),
},
},
{
name: "schedule a single duplicate in CreationTimestamp order",
n: 5,
challenges: []*cmapi.Challenge{
gen.Challenge("test",
gen.SetChallengeDNSName("example.com"),
withCreationTimestamp(2)),
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
withCreationTimestamp(1)),
},
expected: []*cmapi.Challenge{
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
withCreationTimestamp(1)),
},
},
{
name: "schedule duplicate in CreationTimestamp order (inverted input)",
n: 5,
challenges: []*cmapi.Challenge{
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
withCreationTimestamp(1)),
gen.Challenge("test",
gen.SetChallengeDNSName("example.com"),
withCreationTimestamp(2)),
},
expected: []*cmapi.Challenge{
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
withCreationTimestamp(1)),
},
},
{
name: "schedule duplicate challenges for the same domain if they have a different type",
n: 5,
challenges: []*cmapi.Challenge{
gen.Challenge("test1",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeType("dns01")),
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeType("http01")),
},
expected: []*cmapi.Challenge{
gen.Challenge("test1",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeType("dns01")),
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeType("http01")),
},
},
{
name: "schedule duplicate challenges for the same domain if they have a different type (inverted input)",
n: 5,
challenges: []*cmapi.Challenge{
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeType("http01")),
gen.Challenge("test1",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeType("dns01")),
},
expected: []*cmapi.Challenge{
gen.Challenge("test1",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeType("dns01")),
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeType("http01")),
},
},
{
name: "don't schedule when total number of scheduled challenges exceeds global maximum",
n: 5,
challenges: append(
ascendingChallengeN(MaxConcurrentChallenges, gen.SetChallengeProcessing(true)),
randomChallengeN(5, 0)...,
),
},
{
name: "don't schedule challenge if another one with the same dnsName exists",
n: 5,
challenges: []*cmapi.Challenge{
gen.Challenge("test",
gen.SetChallengeDNSName("example.com")),
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeProcessing(true)),
},
},
{
name: "don't schedule anything if all challenges are processing",
n: 5,
challenges: []*cmapi.Challenge{
gen.Challenge("test",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeProcessing(true)),
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeProcessing(true)),
},
},
{
name: "don't schedule anything if all challenges are in a final state",
n: 5,
challenges: []*cmapi.Challenge{
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeState(cmapi.Valid)),
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cl := fake.NewSimpleClientset()
factory := cminformers.NewSharedInformerFactory(cl, 0)
challengesInformer := factory.Certmanager().V1alpha1().Challenges()
for _, ch := range test.challenges {
challengesInformer.Informer().GetIndexer().Add(ch)
}
s := New(challengesInformer.Lister())
if test.expected == nil {
test.expected = []*cmapi.Challenge{}
}
chs, err := s.ScheduleN(test.n)
if err != nil && !test.err {
t.Errorf("expected no error, but got: %v", err)
}
if err == nil && test.err {
t.Errorf("expected to get an error, but got none")
}
if !reflect.DeepEqual(chs, test.expected) {
t.Errorf("expected did not match actual: %v", diff.ObjectDiff(test.expected, chs))
}
})
}
}

View File

@ -1,116 +0,0 @@
/*
Copyright 2018 The Jetstack cert-manager contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"github.com/jetstack/cert-manager/pkg/acme"
cmapi "github.com/jetstack/cert-manager/pkg/apis/certmanager/v1alpha1"
)
// Sync will process a single ACME challenge resource in order to determine
// whether it can be scheduled for processing.
// This is currently extremelly primitive, and **will not** do intelligent
// things like bumping challenges that are for already expired or nearing expiry
// certificates to the 'front' of the queue.
//
// This may be something to do in future - we could use a resyncFunc to build
// a stack of challenges to process, and upon observation of a new challenge,
// re-evaluate the whole stack.
//
// For now, this function will simply be used to solve https://github.com/jetstack/cert-manager/issues/951
func (c *Controller) Sync(ctx context.Context, ch *cmapi.Challenge) error {
// If the challenge is already in a final state, there is nothing more for
// us to do.
if acme.IsFinalState(ch.Status.State) {
return nil
}
// If the challenge already has 'processing' set to true, there is nothing
// more for us to do.
// The 'acmechallenges' controller is responsible for setting this field to
// false once processing has completed.
if ch.Status.Processing == true {
return nil
}
// Begin the scheduling algorithm! Here, we must evaluate all challenges
// currently in the apiserver, and their current state, in order to determine
// whether we can begin processing this challenge.
allChallenges, err := c.challengeLister.List(labels.Everything())
if err != nil {
return err
}
// First, filter out all challenges that are *not* being processed.
// With our naive scheduling algorithm, we only care about avoiding *duplicate*
// challenges occurring at once.
inFlightChallenges := removeNotProcessingChallenges(allChallenges)
// if any other challenges are in-flight with the same challenge type and
// same dnsName, we will *not* mark this challenge as processing
for _, inFCh := range inFlightChallenges {
if ch.Spec.DNSName == inFCh.Spec.DNSName && ch.Spec.Type == inFCh.Spec.Type {
return fmt.Errorf("another %q challenge for challenge %q (domain %q) is in progress, waiting until it is complete before processing", ch.Spec.Type, ch.Name, ch.Spec.DNSName)
}
}
// if there are no 'conflicts' detected above, then we can mark this challenge
// as processing.
ch.Status.Processing = true
ch, err = c.CMClient.CertmanagerV1alpha1().Challenges(ch.Namespace).Update(ch)
if err != nil {
return err
}
c.Recorder.Event(ch, corev1.EventTypeNormal, "Started", "Challenge scheduled for processing")
// we ignore the return value from waitForCacheSync - if it is false, the
// controller will shutdown anyway.
_ = c.waitForCacheSync()
return nil
}
// removeNotProcessingChallenges will filter out challenges from the given slice
// that have status.processing set to false.
// TODO: we currently call this function on every call to Sync().
// In large deployments, this could cause high CPU and memory consumption as it
// works at O(n^2) complexity (i.e. for every challenge, we have to touch every
// challenge).
func removeNotProcessingChallenges(chs []*cmapi.Challenge) []*cmapi.Challenge {
// TODO: there's probably a more efficient way to manage this that doesn't
// involve constructing large slices and using append.
var ret []*cmapi.Challenge
for _, ch := range chs {
if ch.Status.Processing {
ret = append(ret, ch)
}
}
return ret
}
func (c *Controller) waitForCacheSync() bool {
return cache.WaitForCacheSync(c.stopCh, c.challengesHasSynced)
}

View File

@ -1,120 +0,0 @@
/*
Copyright 2018 The Jetstack cert-manager contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"testing"
"k8s.io/apimachinery/pkg/runtime"
coretesting "k8s.io/client-go/testing"
"github.com/jetstack/cert-manager/pkg/apis/certmanager/v1alpha1"
"github.com/jetstack/cert-manager/pkg/controller/test"
testpkg "github.com/jetstack/cert-manager/pkg/controller/test"
"github.com/jetstack/cert-manager/test/unit/gen"
)
type fixture struct {
Controller *Controller
*test.Builder
Challenge *v1alpha1.Challenge
PreFn func(*testing.T, *fixture)
CheckFn func(*testing.T, *fixture, ...interface{})
Err bool
Ctx context.Context
}
func TestSync(t *testing.T) {
tests := map[string]fixture{
"with one challenge in api, mark processing=true": {
Builder: &testpkg.Builder{
CertManagerObjects: []runtime.Object{
gen.Challenge("test",
gen.SetChallengeDNSName("example.com"))},
ExpectedActions: []testpkg.Action{
testpkg.NewAction(coretesting.NewUpdateAction(v1alpha1.SchemeGroupVersion.WithResource("challenges"), gen.DefaultTestNamespace,
gen.Challenge("test",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeProcessing(true)))),
},
},
Challenge: gen.Challenge("test", gen.SetChallengeDNSName("example.com")),
},
"when a duplicate challenge exists in the API, and is processing, don't mark next one as processing": {
Builder: &testpkg.Builder{
CertManagerObjects: []runtime.Object{
gen.Challenge("test",
gen.SetChallengeDNSName("example.com")),
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeProcessing(true)),
},
},
Challenge: gen.Challenge("test", gen.SetChallengeDNSName("example.com")),
Err: true,
},
"skip elements that are already marked as processing=true": {
Builder: &testpkg.Builder{
CertManagerObjects: []runtime.Object{
gen.Challenge("test",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeProcessing(true)),
},
},
Challenge: gen.Challenge("test",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeProcessing(true)),
},
"skip elements that are already in a final state": {
Builder: &testpkg.Builder{
CertManagerObjects: []runtime.Object{
gen.Challenge("test",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeState(v1alpha1.Invalid)),
gen.Challenge("test2",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeProcessing(true)),
},
},
Challenge: gen.Challenge("test",
gen.SetChallengeDNSName("example.com"),
gen.SetChallengeState(v1alpha1.Invalid)),
},
}
for n, test := range tests {
t.Run(n, func(t *testing.T) {
if test.Builder == nil {
test.Builder = &testpkg.Builder{}
}
test.Setup(t)
chalCopy := test.Challenge.DeepCopy()
err := test.Controller.Sync(test.Ctx, chalCopy)
if err != nil && !test.Err {
t.Errorf("Expected function to not error, but got: %v", err)
}
if err == nil && test.Err {
t.Errorf("Expected function to get an error, but got: %v", err)
}
test.Finish(t, chalCopy, err)
})
}
}

View File

@ -1,62 +0,0 @@
/*
Copyright 2018 The Jetstack cert-manager contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"testing"
"github.com/jetstack/cert-manager/pkg/controller/test"
)
func (f *fixture) Setup(t *testing.T) {
if f.Ctx == nil {
f.Ctx = context.Background()
}
if f.Builder == nil {
f.Builder = &test.Builder{}
}
f.Controller = f.buildFakeController(f.Builder)
if f.PreFn != nil {
f.PreFn(t, f)
f.Builder.Sync()
}
}
func (f *fixture) Finish(t *testing.T, args ...interface{}) {
defer f.Builder.Stop()
if err := f.Builder.AllReactorsCalled(); err != nil {
t.Errorf("Not all expected reactors were called: %v", err)
}
if err := f.Builder.AllActionsExecuted(); err != nil {
t.Errorf(err.Error())
}
// resync listers before running checks
f.Builder.Sync()
// run custom checks
if f.CheckFn != nil {
f.CheckFn(t, f, args...)
}
}
func (f *fixture) buildFakeController(b *test.Builder) *Controller {
b.Start()
c := New(b.Context)
b.Sync()
return c
}