package app import ( "fmt" "os" "sync" "time" "github.com/golang/glog" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" "github.com/jetstack/cert-manager/cmd/controller/app/options" clientset "github.com/jetstack/cert-manager/pkg/client/clientset/versioned" intscheme "github.com/jetstack/cert-manager/pkg/client/clientset/versioned/scheme" informers "github.com/jetstack/cert-manager/pkg/client/informers/externalversions" "github.com/jetstack/cert-manager/pkg/controller" "github.com/jetstack/cert-manager/pkg/controller/clusterissuers" "github.com/jetstack/cert-manager/pkg/issuer" "github.com/jetstack/cert-manager/pkg/util/kube" kubeinformers "k8s.io/client-go/informers" ) const controllerAgentName = "cert-manager-controller" func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) { ctx, kubeCfg, err := buildControllerContext(opts) if err != nil { glog.Fatalf(err.Error()) } run := func(_ <-chan struct{}) { var wg sync.WaitGroup var controllers = make(map[string]controller.Interface) for n, fn := range controller.Known() { if ctx.Namespace != "" && n == clusterissuers.ControllerName { glog.Infof("Skipping ClusterIssuer controller as cert-manager is scoped to a single namespace") continue } controllers[n] = fn(ctx) } for n, fn := range controllers { wg.Add(1) go func(n string, fn controller.Interface) { defer wg.Done() glog.V(4).Infof("Starting %s controller", n) err := fn(5, stopCh) if err != nil { glog.Fatalf("error running %s controller: %s", n, err.Error()) } }(n, fn) } glog.V(4).Infof("Starting shared informer factory") ctx.SharedInformerFactory.Start(stopCh) ctx.KubeSharedInformerFactory.Start(stopCh) wg.Wait() glog.Fatalf("Control loops exited") } if !opts.LeaderElect { run(stopCh) return } leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(kubeCfg, "leader-election")) if err != nil { glog.Fatalf("error creating leader election client: %s", err.Error()) } startLeaderElection(opts, leaderElectionClient, ctx.Recorder, run) panic("unreachable") } func buildControllerContext(opts *options.ControllerOptions) (*controller.Context, *rest.Config, error) { // Load the users Kubernetes config kubeCfg, err := kube.KubeConfig(opts.APIServerHost) if err != nil { return nil, nil, fmt.Errorf("error creating rest config: %s", err.Error()) } // Create a Navigator api client intcl, err := clientset.NewForConfig(kubeCfg) if err != nil { return nil, nil, fmt.Errorf("error creating internal group client: %s", err.Error()) } // Create a Kubernetes api client cl, err := kubernetes.NewForConfig(kubeCfg) if err != nil { return nil, nil, fmt.Errorf("error creating kubernetes client: %s", err.Error()) } // Create event broadcaster // Add cert-manager types to the default Kubernetes Scheme so Events can be // logged properly intscheme.AddToScheme(scheme.Scheme) glog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.V(4).Infof) eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cl.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerAgentName}) // We only create SharedInformerFactories for the --namespace specified to // watch. If this namespace is blank (i.e. the default, watch all // namespaces) then the factories will watch all namespaces. // If it is specified, all operations relating to ClusterIssuer resources // should be disabled and thus we don't need to also create factories for // the --cluster-resource-namespace. sharedInformerFactory := informers.NewFilteredSharedInformerFactory(intcl, time.Second*30, opts.Namespace, nil) kubeSharedInformerFactory := kubeinformers.NewFilteredSharedInformerFactory(cl, time.Second*30, opts.Namespace, nil) return &controller.Context{ Client: cl, CMClient: intcl, Recorder: recorder, KubeSharedInformerFactory: kubeSharedInformerFactory, SharedInformerFactory: sharedInformerFactory, IssuerFactory: issuer.NewFactory(&issuer.Context{ Client: cl, CMClient: intcl, Recorder: recorder, KubeSharedInformerFactory: kubeSharedInformerFactory, SharedInformerFactory: sharedInformerFactory, Namespace: opts.Namespace, ClusterResourceNamespace: opts.ClusterResourceNamespace, ACMEHTTP01SolverImage: opts.ACMEHTTP01SolverImage, }), Namespace: opts.Namespace, ClusterResourceNamespace: opts.ClusterResourceNamespace, }, kubeCfg, nil } func startLeaderElection(opts *options.ControllerOptions, leaderElectionClient kubernetes.Interface, recorder record.EventRecorder, run func(<-chan struct{})) { // Identity used to distinguish between multiple controller manager instances id, err := os.Hostname() if err != nil { glog.Fatalf("error getting hostname: %s", err.Error()) } // Lock required for leader election rl := resourcelock.EndpointsLock{ EndpointsMeta: metav1.ObjectMeta{ Namespace: opts.LeaderElectionNamespace, Name: "cert-manager-controller", }, Client: leaderElectionClient.CoreV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: id + "-external-cert-manager-controller", EventRecorder: recorder, }, } // Try and become the leader and start controller manager loops leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ Lock: &rl, LeaseDuration: opts.LeaderElectionLeaseDuration, RenewDeadline: opts.LeaderElectionRenewDeadline, RetryPeriod: opts.LeaderElectionRetryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { glog.Fatalf("leaderelection lost") }, }, }) }