remove os.Exit from cert-manager controller and make sure LeaderElection ReleaseOnCancel works

Signed-off-by: Inteon <42113979+inteon@users.noreply.github.com>
This commit is contained in:
Inteon 2021-07-27 21:40:42 +02:00
parent 48e9c2bd16
commit d430113666
No known key found for this signature in database
GPG Key ID: BD5DCF7303C7C1A7
3 changed files with 101 additions and 73 deletions

View File

@ -145,14 +145,12 @@ func (o InjectorControllerOptions) RunInjectorController(ctx context.Context) er
return nil
})
// Don't launch the controllers unless we have been elected leader
<-mgr.Elected()
// Exit early if the Elected channel gets closed because we are shutting down.
select {
case <-gctx.Done():
case <-gctx.Done(): // Exit early if the Elected channel gets closed because we are shutting down.
// Wait for error group to complete and return
return g.Wait()
default:
case <-mgr.Elected(): // Don't launch the controllers unless we have been elected leader
// Continue with setting up controller
}
// Retry the start up of the certificate based controller in case the

View File

@ -18,6 +18,7 @@ package app
import (
"context"
"errors"
"fmt"
"net"
"net/http"
@ -65,8 +66,10 @@ const controllerAgentName = "cert-manager"
//and following discussion: https://github.com/kubernetes-sigs/controller-runtime/pull/88#issuecomment-408500629
const resyncPeriod = 10 * time.Hour
func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) {
func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error {
rootCtx := cmdutil.ContextWithStopCh(context.Background(), stopCh)
rootCtx, cancelContext := context.WithCancel(rootCtx)
defer cancelContext()
g, rootCtx := errgroup.WithContext(rootCtx)
rootCtx = logf.NewContext(rootCtx, nil, "controller")
log := logf.FromContext(rootCtx)
@ -74,7 +77,7 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) {
ctx, kubeCfg, err := buildControllerContext(rootCtx, opts)
if err != nil {
log.Error(err, "error building controller context", "options", opts)
os.Exit(1)
return err
}
enabledControllers := opts.EnabledControllers()
@ -83,7 +86,7 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) {
ln, err := net.Listen("tcp", opts.MetricsListenAddress)
if err != nil {
log.Error(err, "failed to listen on prometheus address", "address", opts.MetricsListenAddress)
os.Exit(1)
return err
}
server := ctx.Metrics.NewServer(ln, opts.EnablePprof)
@ -106,65 +109,99 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) {
return nil
})
run := func(_ context.Context) {
for n, fn := range controller.Known() {
log := log.WithValues("controller", n)
// only run a controller if it's been enabled
if !enabledControllers.Has(n) {
log.V(logf.InfoLevel).Info("not starting controller as it's disabled")
continue
}
// don't run clusterissuers controller if scoped to a single namespace
if ctx.Namespace != "" && n == clusterissuers.ControllerName {
log.V(logf.InfoLevel).Info("not starting controller as cert-manager has been scoped to a single namespace")
continue
}
iface, err := fn(ctx)
elected := make(chan struct{})
if opts.LeaderElect {
g.Go(func() error {
log.V(logf.InfoLevel).Info("starting leader election")
leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(kubeCfg, "leader-election"))
if err != nil {
log.Error(err, "error starting controller")
os.Exit(1)
log.Error(err, "error creating leader election client")
return err
}
g.Go(func() error {
log.V(logf.InfoLevel).Info("starting controller")
errorCh := make(chan error, 1)
if err := startLeaderElection(rootCtx, opts, leaderElectionClient, ctx.Recorder, leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
close(elected)
},
OnStoppedLeading: func() {
select {
case <-rootCtx.Done():
// context was canceled, just return
return
default:
log.V(logf.ErrorLevel).Info("leader election lost")
errorCh <- errors.New("leader election lost")
}
},
}); err != nil {
return err
}
workers := 5
return iface.Run(workers, rootCtx.Done())
})
select {
case err := <-errorCh:
return err
default:
return nil
}
})
} else {
close(elected)
}
select {
case <-rootCtx.Done(): // Exit early if the Elected channel gets closed because we are shutting down.
// Wait for error group to complete and return
return g.Wait()
case <-elected: // Don't launch the controllers unless we have been elected leader
// Continue with setting up controller
}
for n, fn := range controller.Known() {
log := log.WithValues("controller", n)
// only run a controller if it's been enabled
if !enabledControllers.Has(n) {
log.V(logf.InfoLevel).Info("not starting controller as it's disabled")
continue
}
log.V(logf.DebugLevel).Info("starting shared informer factories")
// TODO: we should wait for these informers to finish
ctx.SharedInformerFactory.Start(rootCtx.Done())
ctx.KubeSharedInformerFactory.Start(rootCtx.Done())
ctx.GWShared.Start(rootCtx.Done())
// don't run clusterissuers controller if scoped to a single namespace
if ctx.Namespace != "" && n == clusterissuers.ControllerName {
log.V(logf.InfoLevel).Info("not starting controller as cert-manager has been scoped to a single namespace")
continue
}
err := g.Wait()
iface, err := fn(ctx)
if err != nil {
log.Error(err, "error starting controller")
os.Exit(1)
cancelContext()
_ = g.Wait() // Don't process errors, we already have an error
return err
}
log.V(logf.InfoLevel).Info("control loops exited")
os.Exit(0)
g.Go(func() error {
log.V(logf.InfoLevel).Info("starting controller")
workers := 5
return iface.Run(workers, rootCtx.Done())
})
}
if !opts.LeaderElect {
run(context.TODO())
return
}
log.V(logf.DebugLevel).Info("starting shared informer factories")
ctx.SharedInformerFactory.Start(rootCtx.Done())
ctx.KubeSharedInformerFactory.Start(rootCtx.Done())
ctx.GWShared.Start(rootCtx.Done())
log.V(logf.InfoLevel).Info("starting leader election")
leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(kubeCfg, "leader-election"))
err = g.Wait()
if err != nil {
log.Error(err, "error creating leader election client")
os.Exit(1)
log.Error(err, "error starting controller")
return err
}
log.V(logf.InfoLevel).Info("control loops exited")
startLeaderElection(rootCtx, opts, leaderElectionClient, ctx.Recorder, run)
return nil
}
func buildControllerContext(ctx context.Context, opts *options.ControllerOptions) (*controller.Context, *rest.Config, error) {
@ -303,14 +340,14 @@ func buildControllerContext(ctx context.Context, opts *options.ControllerOptions
}, kubeCfg, nil
}
func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, leaderElectionClient kubernetes.Interface, recorder record.EventRecorder, run func(context.Context)) {
func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, leaderElectionClient kubernetes.Interface, recorder record.EventRecorder, callbacks leaderelection.LeaderCallbacks) error {
log := logf.FromContext(ctx, "leader-election")
// Identity used to distinguish between multiple controller manager instances
id, err := os.Hostname()
if err != nil {
log.Error(err, "error getting hostname")
os.Exit(1)
return err
}
// Set up Multilock for leader election. This Multilock is here for the
@ -331,29 +368,23 @@ func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, l
if err != nil {
// We should never get here.
log.Error(err, "error creating leader election lock")
os.Exit(1)
return err
}
// Try and become the leader and start controller manager loops
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: ml,
LeaseDuration: opts.LeaderElectionLeaseDuration,
RenewDeadline: opts.LeaderElectionRenewDeadline,
RetryPeriod: opts.LeaderElectionRetryPeriod,
ReleaseOnCancel: true,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
select {
case <-ctx.Done():
// context was canceled, just return
return
default:
log.V(logf.ErrorLevel).Info("leader election lost")
os.Exit(1)
}
},
},
Callbacks: callbacks,
})
if err != nil {
return err
}
le.Run(ctx)
return nil
}

View File

@ -73,8 +73,7 @@ to renew certificates at an appropriate time before expiry.`,
}
logf.Log.V(logf.InfoLevel).Info("starting controller", "version", util.AppVersion, "git-commit", util.AppGitCommit)
o.RunCertManagerController(stopCh)
return nil
return o.RunCertManagerController(stopCh)
},
}
@ -91,6 +90,6 @@ func (o CertManagerControllerOptions) Validate(args []string) error {
return utilerrors.NewAggregate(errors)
}
func (o CertManagerControllerOptions) RunCertManagerController(stopCh <-chan struct{}) {
Run(o.ControllerOptions, stopCh)
func (o CertManagerControllerOptions) RunCertManagerController(stopCh <-chan struct{}) error {
return Run(o.ControllerOptions, stopCh)
}