From d430113666d16ad396cbd6cb421d32265d444677 Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Tue, 27 Jul 2021 21:40:42 +0200 Subject: [PATCH] remove os.Exit from cert-manager controller and make sure LeaderElection ReleaseOnCancel works Signed-off-by: Inteon <42113979+inteon@users.noreply.github.com> --- cmd/cainjector/app/start.go | 10 +- cmd/controller/app/controller.go | 157 ++++++++++++++++++------------- cmd/controller/app/start.go | 7 +- 3 files changed, 101 insertions(+), 73 deletions(-) diff --git a/cmd/cainjector/app/start.go b/cmd/cainjector/app/start.go index 667b7d765..625abb689 100644 --- a/cmd/cainjector/app/start.go +++ b/cmd/cainjector/app/start.go @@ -145,14 +145,12 @@ func (o InjectorControllerOptions) RunInjectorController(ctx context.Context) er return nil }) - // Don't launch the controllers unless we have been elected leader - <-mgr.Elected() - - // Exit early if the Elected channel gets closed because we are shutting down. select { - case <-gctx.Done(): + case <-gctx.Done(): // Exit early if the Elected channel gets closed because we are shutting down. + // Wait for error group to complete and return return g.Wait() - default: + case <-mgr.Elected(): // Don't launch the controllers unless we have been elected leader + // Continue with setting up controller } // Retry the start up of the certificate based controller in case the diff --git a/cmd/controller/app/controller.go b/cmd/controller/app/controller.go index 6ec3e8bff..c13be3f2a 100644 --- a/cmd/controller/app/controller.go +++ b/cmd/controller/app/controller.go @@ -18,6 +18,7 @@ package app import ( "context" + "errors" "fmt" "net" "net/http" @@ -65,8 +66,10 @@ const controllerAgentName = "cert-manager" //and following discussion: https://github.com/kubernetes-sigs/controller-runtime/pull/88#issuecomment-408500629 const resyncPeriod = 10 * time.Hour -func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) { +func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error { rootCtx := cmdutil.ContextWithStopCh(context.Background(), stopCh) + rootCtx, cancelContext := context.WithCancel(rootCtx) + defer cancelContext() g, rootCtx := errgroup.WithContext(rootCtx) rootCtx = logf.NewContext(rootCtx, nil, "controller") log := logf.FromContext(rootCtx) @@ -74,7 +77,7 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) { ctx, kubeCfg, err := buildControllerContext(rootCtx, opts) if err != nil { log.Error(err, "error building controller context", "options", opts) - os.Exit(1) + return err } enabledControllers := opts.EnabledControllers() @@ -83,7 +86,7 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) { ln, err := net.Listen("tcp", opts.MetricsListenAddress) if err != nil { log.Error(err, "failed to listen on prometheus address", "address", opts.MetricsListenAddress) - os.Exit(1) + return err } server := ctx.Metrics.NewServer(ln, opts.EnablePprof) @@ -106,65 +109,99 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) { return nil }) - run := func(_ context.Context) { - for n, fn := range controller.Known() { - log := log.WithValues("controller", n) - - // only run a controller if it's been enabled - if !enabledControllers.Has(n) { - log.V(logf.InfoLevel).Info("not starting controller as it's disabled") - continue - } - - // don't run clusterissuers controller if scoped to a single namespace - if ctx.Namespace != "" && n == clusterissuers.ControllerName { - log.V(logf.InfoLevel).Info("not starting controller as cert-manager has been scoped to a single namespace") - continue - } - - iface, err := fn(ctx) + elected := make(chan struct{}) + if opts.LeaderElect { + g.Go(func() error { + log.V(logf.InfoLevel).Info("starting leader election") + leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(kubeCfg, "leader-election")) if err != nil { - log.Error(err, "error starting controller") - os.Exit(1) + log.Error(err, "error creating leader election client") + return err } - g.Go(func() error { - log.V(logf.InfoLevel).Info("starting controller") + errorCh := make(chan error, 1) + if err := startLeaderElection(rootCtx, opts, leaderElectionClient, ctx.Recorder, leaderelection.LeaderCallbacks{ + OnStartedLeading: func(_ context.Context) { + close(elected) + }, + OnStoppedLeading: func() { + select { + case <-rootCtx.Done(): + // context was canceled, just return + return + default: + log.V(logf.ErrorLevel).Info("leader election lost") + errorCh <- errors.New("leader election lost") + } + }, + }); err != nil { + return err + } - workers := 5 - return iface.Run(workers, rootCtx.Done()) - }) + select { + case err := <-errorCh: + return err + default: + return nil + } + }) + } else { + close(elected) + } + + select { + case <-rootCtx.Done(): // Exit early if the Elected channel gets closed because we are shutting down. + // Wait for error group to complete and return + return g.Wait() + case <-elected: // Don't launch the controllers unless we have been elected leader + // Continue with setting up controller + } + + for n, fn := range controller.Known() { + log := log.WithValues("controller", n) + + // only run a controller if it's been enabled + if !enabledControllers.Has(n) { + log.V(logf.InfoLevel).Info("not starting controller as it's disabled") + continue } - log.V(logf.DebugLevel).Info("starting shared informer factories") - // TODO: we should wait for these informers to finish - ctx.SharedInformerFactory.Start(rootCtx.Done()) - ctx.KubeSharedInformerFactory.Start(rootCtx.Done()) - ctx.GWShared.Start(rootCtx.Done()) + // don't run clusterissuers controller if scoped to a single namespace + if ctx.Namespace != "" && n == clusterissuers.ControllerName { + log.V(logf.InfoLevel).Info("not starting controller as cert-manager has been scoped to a single namespace") + continue + } - err := g.Wait() + iface, err := fn(ctx) if err != nil { log.Error(err, "error starting controller") - os.Exit(1) + + cancelContext() + _ = g.Wait() // Don't process errors, we already have an error + return err } - log.V(logf.InfoLevel).Info("control loops exited") - os.Exit(0) + g.Go(func() error { + log.V(logf.InfoLevel).Info("starting controller") + + workers := 5 + return iface.Run(workers, rootCtx.Done()) + }) } - if !opts.LeaderElect { - run(context.TODO()) - return - } + log.V(logf.DebugLevel).Info("starting shared informer factories") + ctx.SharedInformerFactory.Start(rootCtx.Done()) + ctx.KubeSharedInformerFactory.Start(rootCtx.Done()) + ctx.GWShared.Start(rootCtx.Done()) - log.V(logf.InfoLevel).Info("starting leader election") - leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(kubeCfg, "leader-election")) + err = g.Wait() if err != nil { - log.Error(err, "error creating leader election client") - os.Exit(1) + log.Error(err, "error starting controller") + return err } + log.V(logf.InfoLevel).Info("control loops exited") - startLeaderElection(rootCtx, opts, leaderElectionClient, ctx.Recorder, run) + return nil } func buildControllerContext(ctx context.Context, opts *options.ControllerOptions) (*controller.Context, *rest.Config, error) { @@ -303,14 +340,14 @@ func buildControllerContext(ctx context.Context, opts *options.ControllerOptions }, kubeCfg, nil } -func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, leaderElectionClient kubernetes.Interface, recorder record.EventRecorder, run func(context.Context)) { +func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, leaderElectionClient kubernetes.Interface, recorder record.EventRecorder, callbacks leaderelection.LeaderCallbacks) error { log := logf.FromContext(ctx, "leader-election") // Identity used to distinguish between multiple controller manager instances id, err := os.Hostname() if err != nil { log.Error(err, "error getting hostname") - os.Exit(1) + return err } // Set up Multilock for leader election. This Multilock is here for the @@ -331,29 +368,23 @@ func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, l if err != nil { // We should never get here. log.Error(err, "error creating leader election lock") - os.Exit(1) - + return err } // Try and become the leader and start controller manager loops - leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ Lock: ml, LeaseDuration: opts.LeaderElectionLeaseDuration, RenewDeadline: opts.LeaderElectionRenewDeadline, RetryPeriod: opts.LeaderElectionRetryPeriod, ReleaseOnCancel: true, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: run, - OnStoppedLeading: func() { - select { - case <-ctx.Done(): - // context was canceled, just return - return - default: - log.V(logf.ErrorLevel).Info("leader election lost") - os.Exit(1) - } - }, - }, + Callbacks: callbacks, }) + if err != nil { + return err + } + + le.Run(ctx) + + return nil } diff --git a/cmd/controller/app/start.go b/cmd/controller/app/start.go index 5693d28ab..975b20a86 100644 --- a/cmd/controller/app/start.go +++ b/cmd/controller/app/start.go @@ -73,8 +73,7 @@ to renew certificates at an appropriate time before expiry.`, } logf.Log.V(logf.InfoLevel).Info("starting controller", "version", util.AppVersion, "git-commit", util.AppGitCommit) - o.RunCertManagerController(stopCh) - return nil + return o.RunCertManagerController(stopCh) }, } @@ -91,6 +90,6 @@ func (o CertManagerControllerOptions) Validate(args []string) error { return utilerrors.NewAggregate(errors) } -func (o CertManagerControllerOptions) RunCertManagerController(stopCh <-chan struct{}) { - Run(o.ControllerOptions, stopCh) +func (o CertManagerControllerOptions) RunCertManagerController(stopCh <-chan struct{}) error { + return Run(o.ControllerOptions, stopCh) }