Add leader election. Fix gracefully exiting.

This commit is contained in:
James Munnelly 2017-09-09 01:26:45 +01:00
parent 0e1f331c1e
commit 960d46e302
9 changed files with 291 additions and 179 deletions

View File

@ -0,0 +1,174 @@
package app
import (
"fmt"
"os"
"sync"
"github.com/golang/glog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"github.com/jetstack-experimental/cert-manager/cmd/controller/app/options"
clientset "github.com/jetstack-experimental/cert-manager/pkg/client"
intscheme "github.com/jetstack-experimental/cert-manager/pkg/client/scheme"
"github.com/jetstack-experimental/cert-manager/pkg/controller"
"github.com/jetstack-experimental/cert-manager/pkg/issuer"
"github.com/jetstack-experimental/cert-manager/pkg/kube"
)
const controllerAgentName = "cert-manager-controller"
func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) {
// Add cert-manager types to the default Kubernetes Scheme so Events can be
// logged properly
intscheme.AddToScheme(scheme.Scheme)
// Load the users Kubernetes config
kubeCfg, err := KubeConfig(opts.APIServerHost)
if err != nil {
glog.Fatalf("error creating rest config: %s", err.Error())
}
// Create a Navigator api client
intcl, err := clientset.NewForConfig(kubeCfg)
if err != nil {
glog.Fatalf("error creating internal group client: %s", err.Error())
}
// Create a Kubernetes api client
cl, err := kubernetes.NewForConfig(kubeCfg)
if err != nil {
glog.Fatalf("error creating kubernetes client: %s", err.Error())
}
// Create event broadcaster
glog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.V(4).Infof)
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cl.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerAgentName})
sharedInformerFactory := kube.NewSharedInformerFactory()
controllerCtx := &controller.Context{
Client: cl,
CMClient: intcl,
SharedInformerFactory: sharedInformerFactory,
IssuerFactory: issuer.NewFactory(&issuer.Context{
Client: cl,
CMClient: intcl,
SharedInformerFactory: sharedInformerFactory,
Namespace: opts.Namespace,
}),
Namespace: opts.Namespace,
}
run := func(_ <-chan struct{}) {
var wg sync.WaitGroup
var controllers = make(map[string]controller.Interface)
for n, fn := range controller.Known() {
controllers[n] = fn(controllerCtx)
}
for n, fn := range controllers {
wg.Add(1)
go func(n string, fn controller.Interface) {
defer wg.Done()
glog.V(4).Infof("Starting %s controller", n)
err := fn(2, stopCh)
if err != nil {
glog.Fatalf("error running %s controller: %s", n, err.Error())
}
}(n, fn)
}
glog.V(4).Infof("Starting shared informer factory")
controllerCtx.SharedInformerFactory.Start(stopCh)
wg.Wait()
glog.Fatalf("Control loops exited")
}
if !opts.LeaderElect {
run(stopCh)
<-make(chan struct{})
panic("unreachable")
}
leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(kubeCfg, "leader-election"))
if err != nil {
glog.Fatalf("error creating leader election client: %s", err.Error())
}
// Identity used to distinguish between multiple cloud controller manager instances
id, err := os.Hostname()
if err != nil {
glog.Fatalf("error getting hostname: %s", err.Error())
}
// Lock required for leader election
rl := resourcelock.EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Namespace: opts.LeaderElectionNamespace,
Name: "cert-manager-controller",
},
Client: leaderElectionClient.CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id + "-external-cert-manager-controller",
EventRecorder: recorder,
},
}
// Try and become the leader and start cloud controller manager loops
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: &rl,
LeaseDuration: opts.LeaderElectionLeaseDuration,
RenewDeadline: opts.LeaderElectionRenewDeadline,
RetryPeriod: opts.LeaderElectionRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
})
}
// KubeConfig will return a rest.Config for communicating with the Kubernetes API server.
// If apiServerHost is specified, a config without authentication that is configured
// to talk to the apiServerHost URL will be returned. Else, the in-cluster config will be loaded,
// and failing this, the config will be loaded from the users local kubeconfig directory
func KubeConfig(apiServerHost string) (*rest.Config, error) {
var err error
var cfg *rest.Config
if len(apiServerHost) > 0 {
cfg = new(rest.Config)
cfg.Host = apiServerHost
} else if cfg, err = rest.InClusterConfig(); err != nil {
apiCfg, err := clientcmd.NewDefaultClientConfigLoadingRules().Load()
if err != nil {
return nil, fmt.Errorf("error loading cluster config: %s", err.Error())
}
cfg, err = clientcmd.NewDefaultClientConfig(*apiCfg, &clientcmd.ConfigOverrides{}).ClientConfig()
if err != nil {
return nil, fmt.Errorf("error loading cluster client config: %s", err.Error())
}
}
return cfg, nil
}

View File

@ -0,0 +1,73 @@
package options
import (
"time"
"github.com/spf13/pflag"
)
type ControllerOptions struct {
APIServerHost string
Namespace string
LeaderElect bool
LeaderElectionNamespace string
LeaderElectionLeaseDuration time.Duration
LeaderElectionRenewDeadline time.Duration
LeaderElectionRetryPeriod time.Duration
}
const (
defaultAPIServerHost = ""
defaultNamespace = ""
defaultLeaderElect = true
defaultLeaderElectionNamespace = "kube-system"
defaultLeaderElectionLeaseDuration = 15 * time.Second
defaultLeaderElectionRenewDeadline = 10 * time.Second
defaultLeaderElectionRetryPeriod = 2 * time.Second
)
func NewControllerOptions() *ControllerOptions {
return &ControllerOptions{
APIServerHost: defaultAPIServerHost,
Namespace: defaultNamespace,
LeaderElect: defaultLeaderElect,
LeaderElectionNamespace: defaultLeaderElectionNamespace,
LeaderElectionLeaseDuration: defaultLeaderElectionLeaseDuration,
LeaderElectionRenewDeadline: defaultLeaderElectionRenewDeadline,
LeaderElectionRetryPeriod: defaultLeaderElectionRetryPeriod,
}
}
func (s *ControllerOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.APIServerHost, "master", defaultAPIServerHost, ""+
"Optional apiserver host address to connect to. If not specified, autoconfiguration "+
"will be attempted.")
fs.StringVar(&s.Namespace, "namespace", "", defaultNamespace+
"Optional namespace to monitor resources within. THis can be used to limit the scope "+
"of cert-manager to a single namespace. If not specified, all namespaces will be watched")
fs.BoolVar(&s.LeaderElect, "leader-elect", true, ""+
"If true, cert-manager will perform leader election between instances to ensure no more "+
"than one instance of cert-manager operates at a time")
fs.StringVar(&s.LeaderElectionNamespace, "leader-election-namespace", defaultLeaderElectionNamespace, ""+
"Namespace used to perform leader election. Only used if leader election is enabled")
fs.DurationVar(&s.LeaderElectionLeaseDuration, "leader-election-lease-duration", defaultLeaderElectionLeaseDuration, ""+
"The duration that non-leader candidates will wait after observing a leadership "+
"renewal until attempting to acquire leadership of a led but unrenewed leader "+
"slot. This is effectively the maximum duration that a leader can be stopped "+
"before it is replaced by another candidate. This is only applicable if leader "+
"election is enabled.")
fs.DurationVar(&s.LeaderElectionRenewDeadline, "leader-election-renew-deadline", defaultLeaderElectionRenewDeadline, ""+
"The interval between attempts by the acting master to renew a leadership slot "+
"before it stops leading. This must be less than or equal to the lease duration. "+
"This is only applicable if leader election is enabled.")
fs.DurationVar(&s.LeaderElectionRetryPeriod, "leader-election-retry-period", defaultLeaderElectionRetryPeriod, ""+
"The duration the clients should wait between attempting acquisition and renewal "+
"of a leadership. This is only applicable if leader election is enabled.")
}
func (o *ControllerOptions) Validate() error {
return nil
}

View File

@ -17,25 +17,22 @@ import (
"flag"
"os"
"os/signal"
"runtime"
"syscall"
"github.com/golang/glog"
_ "github.com/jetstack-experimental/cert-manager/pkg/issuer/acme"
"github.com/jetstack-experimental/cert-manager/pkg/logs"
)
func main() {
logs.InitLogs()
defer logs.FlushLogs()
if len(os.Getenv("GOMAXPROCS")) == 0 {
runtime.GOMAXPROCS(runtime.NumCPU())
}
stopCh := SetupSignalHandler()
cmd := NewCommandStartCertManagerController(os.Stdout, os.Stderr, stopCh)
cmd.Flags().AddGoFlagSet(flag.CommandLine)
flag.CommandLine.Parse([]string{})
if err := cmd.Execute(); err != nil {
glog.Fatal(err)
}

View File

@ -1,21 +0,0 @@
package main
import "github.com/spf13/pflag"
type ControllerOptions struct {
APIServerHost string
Namespace string
}
func (o *ControllerOptions) Validate() error {
return nil
}
func (s *ControllerOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.APIServerHost, "master", "", ""+
"Optional apiserver host address to connect to. If not specified, autoconfiguration "+
"will be attempted.")
fs.StringVar(&s.Namespace, "namespace", "", ""+
"Optional namespace to monitor resources within. THis can be used to limit the scope "+
"of cert-manager to a single namespace. If not specified, all namespaces will be watched")
}

View File

@ -1,26 +1,22 @@
package main
import (
"fmt"
"io"
"github.com/golang/glog"
"github.com/spf13/cobra"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
rest "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
intclient "github.com/jetstack-experimental/cert-manager/pkg/client"
"github.com/jetstack-experimental/cert-manager/pkg/controller"
"github.com/jetstack-experimental/cert-manager/cmd/controller/app"
"github.com/jetstack-experimental/cert-manager/cmd/controller/app/options"
_ "github.com/jetstack-experimental/cert-manager/pkg/controller/certificates"
_ "github.com/jetstack-experimental/cert-manager/pkg/controller/issuers"
"github.com/jetstack-experimental/cert-manager/pkg/issuer"
_ "github.com/jetstack-experimental/cert-manager/pkg/issuer/acme"
"github.com/jetstack-experimental/cert-manager/pkg/kube"
)
type CertManagerControllerOptions struct {
ControllerOptions *ControllerOptions
ControllerOptions *options.ControllerOptions
StdOut io.Writer
StdErr io.Writer
@ -28,7 +24,7 @@ type CertManagerControllerOptions struct {
func NewCertManagerControllerOptions(out, errOut io.Writer) *CertManagerControllerOptions {
o := &CertManagerControllerOptions{
ControllerOptions: &ControllerOptions{},
ControllerOptions: options.NewControllerOptions(),
StdOut: out,
StdErr: errOut,
@ -52,17 +48,11 @@ It will ensure certificates are valid and up to date periodically, and attempt
to renew certificates at an appropriate time before expiry.`,
// TODO: Refactor this function from this package
RunE: func(cmd *cobra.Command, args []string) error {
if err := o.Complete(); err != nil {
return err
}
Run: func(cmd *cobra.Command, args []string) {
if err := o.Validate(args); err != nil {
return err
glog.Fatalf("error validating options: %s", err.Error())
}
if err := o.RunCertManagerController(stopCh); err != nil {
return err
}
return nil
o.RunCertManagerController(stopCh)
},
}
@ -78,86 +68,6 @@ func (o CertManagerControllerOptions) Validate(args []string) error {
return utilerrors.NewAggregate(errors)
}
func (o *CertManagerControllerOptions) Complete() error {
return nil
}
func (o CertManagerControllerOptions) Context() (*controller.Context, error) {
// Load the users Kubernetes config
cfg, err := KubeConfig(o.ControllerOptions.APIServerHost)
if err != nil {
return nil, fmt.Errorf("error creating rest config: %s", err.Error())
}
// Create a Navigator api client
intcl, err := intclient.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("error creating internal group client: %s", err.Error())
}
// Create a Kubernetes api client
cl, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("error creating kubernetes client: %s", err.Error())
}
sharedInformerFactory := kube.NewSharedInformerFactory()
issuerCtx := &issuer.Context{
Client: cl,
CMClient: intcl,
SharedInformerFactory: sharedInformerFactory,
Namespace: o.ControllerOptions.Namespace,
}
// Create a context for controllers to use
ctx := &controller.Context{
Client: cl,
CMClient: intcl,
SharedInformerFactory: sharedInformerFactory,
IssuerFactory: issuer.NewFactory(issuerCtx),
Namespace: o.ControllerOptions.Namespace,
}
return ctx, nil
}
// KubeConfig will return a rest.Config for communicating with the Kubernetes API server.
// If apiServerHost is specified, a config without authentication that is configured
// to talk to the apiServerHost URL will be returned. Else, the in-cluster config will be loaded,
// and failing this, the config will be loaded from the users local kubeconfig directory
func KubeConfig(apiServerHost string) (*rest.Config, error) {
var err error
var cfg *rest.Config
if len(apiServerHost) > 0 {
cfg = new(rest.Config)
cfg.Host = apiServerHost
} else if cfg, err = rest.InClusterConfig(); err != nil {
apiCfg, err := clientcmd.NewDefaultClientConfigLoadingRules().Load()
if err != nil {
return nil, fmt.Errorf("error loading cluster config: %s", err.Error())
}
cfg, err = clientcmd.NewDefaultClientConfig(*apiCfg, &clientcmd.ConfigOverrides{}).ClientConfig()
if err != nil {
return nil, fmt.Errorf("error loading cluster client config: %s", err.Error())
}
}
return cfg, nil
}
func (o CertManagerControllerOptions) RunCertManagerController(stopCh <-chan struct{}) error {
ctx, err := o.Context()
if err != nil {
return err
}
// Start all known controller loops
return controller.Start(ctx, controller.Known(), stopCh)
func (o CertManagerControllerOptions) RunCertManagerController(stopCh <-chan struct{}) {
app.Run(o.ControllerOptions, stopCh)
}

View File

@ -2,7 +2,6 @@ package certificates
import (
"fmt"
"log"
"time"
corev1 "k8s.io/api/core/v1"
@ -19,6 +18,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"github.com/golang/glog"
"github.com/jetstack-experimental/cert-manager/pkg/apis/certmanager"
"github.com/jetstack-experimental/cert-manager/pkg/client"
controllerpkg "github.com/jetstack-experimental/cert-manager/pkg/controller"
@ -130,31 +130,31 @@ func (c *Controller) ingressDeleted(obj interface{}) {
}
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
defer c.queue.ShutDown()
log.Printf("Starting control loop")
// wait for all the informer caches we depend on are synced
glog.V(4).Infof("Starting %s control loop", ControllerName)
// wait for all the informer caches we depend to sync
if !cache.WaitForCacheSync(stopCh,
c.secretInformerSynced,
c.certificateInformerSynced,
c.ingressInformerSynced) {
// TODO: replace with a call to glog Errorf
log.Printf("error waiting for informer caches to sync")
return
return fmt.Errorf("error waiting for informer caches to sync")
}
glog.V(4).Infof("Synced all caches for %s control loop", ControllerName)
for i := 0; i < workers; i++ {
// TODO (@munnerz): make time.Second duration configurable
go wait.Until(c.worker, time.Second, stopCh)
}
<-stopCh
log.Printf("shutting down queue as workqueue signalled shutdown")
glog.V(4).Infof("Shutting down queue as workqueue signalled shutdown")
return nil
}
func (c *Controller) worker() {
log.Printf("starting worker")
glog.V(4).Infof("Starting %s worker", ControllerName)
for {
obj, shutdown := c.queue.Get()
if shutdown {
@ -177,14 +177,14 @@ func (c *Controller) worker() {
}(obj)
if err != nil {
log.Printf("requeuing item due to error processing: %s", err.Error())
glog.V(2).Infof("Requeuing object due to error processing: %s", err.Error())
c.queue.AddRateLimited(obj)
continue
}
log.Printf("finished processing work item")
glog.V(4).Infof("Finished processing work item")
}
log.Printf("exiting worker loop")
glog.V(4).Infof("Exiting %s worker loop", ControllerName)
}
func (c *Controller) processNextWorkItem(key string) error {
@ -216,8 +216,8 @@ const (
)
func init() {
controllerpkg.Register(ControllerName, func(ctx *controllerpkg.Context, stopCh <-chan struct{}) (bool, error) {
go New(
controllerpkg.Register(ControllerName, func(ctx *controllerpkg.Context) controllerpkg.Interface {
return New(
ctx.SharedInformerFactory.InformerFor(
ctx.Namespace,
metav1.GroupVersionKind{Group: certmanager.GroupName, Version: "v1alpha1", Kind: "Certificate"},
@ -251,8 +251,6 @@ func init() {
ctx.Client,
ctx.CMClient,
ctx.IssuerFactory,
).Run(2, stopCh)
return true, nil
).Run
})
}

View File

@ -1,9 +1,6 @@
package controller
import (
"fmt"
"github.com/Sirupsen/logrus"
"k8s.io/client-go/kubernetes"
clientset "github.com/jetstack-experimental/cert-manager/pkg/client"
@ -20,21 +17,3 @@ type Context struct {
Namespace string
}
type InitFn func(ctx *Context, stopCh <-chan struct{}) (bool, error)
func Start(ctx *Context, fns map[string]InitFn, stopCh <-chan struct{}) error {
for n, fn := range fns {
logrus.Debugf("starting %s controller", n)
_, err := fn(ctx, stopCh)
if err != nil {
return fmt.Errorf("error starting '%s' controller: %s", n, err.Error())
}
}
ctx.SharedInformerFactory.Start(stopCh)
select {}
}

View File

@ -16,6 +16,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"github.com/golang/glog"
"github.com/jetstack-experimental/cert-manager/pkg/apis/certmanager"
"github.com/jetstack-experimental/cert-manager/pkg/client"
controllerpkg "github.com/jetstack-experimental/cert-manager/pkg/controller"
@ -87,15 +88,14 @@ func (c *Controller) secretDeleted(obj interface{}) {
}
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
defer c.queue.ShutDown()
log.Printf("Starting control loop")
glog.V(4).Infof("Starting %s control loop", ControllerName)
// wait for all the informer caches we depend on are synced
if !cache.WaitForCacheSync(stopCh, c.issuerInformerSynced, c.secretInformerSynced) {
// TODO: replace with Errorf call to glog
log.Printf("error waiting for informer caches to sync")
return
return fmt.Errorf("error waiting for informer caches to sync")
}
for i := 0; i < workers; i++ {
@ -105,6 +105,7 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
<-stopCh
log.Printf("shutting down queue as workqueue signalled shutdown")
return nil
}
func (c *Controller) worker() {
@ -169,8 +170,8 @@ const (
)
func init() {
controllerpkg.Register(ControllerName, func(ctx *controllerpkg.Context, stopCh <-chan struct{}) (bool, error) {
go New(
controllerpkg.Register(ControllerName, func(ctx *controllerpkg.Context) controllerpkg.Interface {
return New(
ctx.SharedInformerFactory.InformerFor(
ctx.Namespace,
metav1.GroupVersionKind{Group: certmanager.GroupName, Version: "v1alpha1", Kind: "Issuer"},
@ -194,8 +195,6 @@ func init() {
ctx.Client,
ctx.CMClient,
ctx.IssuerFactory,
).Run(2, stopCh)
return true, nil
).Run
})
}

View File

@ -1,13 +1,16 @@
package controller
type Interface func(workers int, stopCh <-chan struct{}) error
type Constructor func(ctx *Context) Interface
var (
known = make(map[string]InitFn, 0)
known = make(map[string]Constructor, 0)
)
func Known() map[string]InitFn {
func Known() map[string]Constructor {
return known
}
func Register(name string, fn InitFn) {
func Register(name string, fn Constructor) {
known[name] = fn
}