Ensures that pprof is configured for controller in the same way as for cainjector Signed-off-by: irbekrm <irbekrm@gmail.com>
433 lines
16 KiB
Go
433 lines
16 KiB
Go
/*
|
|
Copyright 2020 The cert-manager Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package app
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
corev1 "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
|
kubeinformers "k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
clientv1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
"k8s.io/client-go/tools/leaderelection"
|
|
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
|
"k8s.io/client-go/tools/record"
|
|
"k8s.io/utils/clock"
|
|
gwapi "sigs.k8s.io/gateway-api/apis/v1alpha1"
|
|
gwclient "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
|
|
gwscheme "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/scheme"
|
|
gwinformers "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions"
|
|
|
|
"github.com/jetstack/cert-manager/cmd/controller/app/options"
|
|
cmdutil "github.com/jetstack/cert-manager/cmd/util"
|
|
"github.com/jetstack/cert-manager/pkg/acme/accounts"
|
|
clientset "github.com/jetstack/cert-manager/pkg/client/clientset/versioned"
|
|
intscheme "github.com/jetstack/cert-manager/pkg/client/clientset/versioned/scheme"
|
|
informers "github.com/jetstack/cert-manager/pkg/client/informers/externalversions"
|
|
"github.com/jetstack/cert-manager/pkg/controller"
|
|
"github.com/jetstack/cert-manager/pkg/controller/clusterissuers"
|
|
"github.com/jetstack/cert-manager/pkg/feature"
|
|
dnsutil "github.com/jetstack/cert-manager/pkg/issuer/acme/dns/util"
|
|
logf "github.com/jetstack/cert-manager/pkg/logs"
|
|
"github.com/jetstack/cert-manager/pkg/metrics"
|
|
"github.com/jetstack/cert-manager/pkg/util"
|
|
utilfeature "github.com/jetstack/cert-manager/pkg/util/feature"
|
|
"github.com/jetstack/cert-manager/pkg/util/profiling"
|
|
)
|
|
|
|
const controllerAgentName = "cert-manager"
|
|
|
|
// This sets the informer's resync period to 10 hours
|
|
// following the controller-runtime defaults
|
|
//and following discussion: https://github.com/kubernetes-sigs/controller-runtime/pull/88#issuecomment-408500629
|
|
const resyncPeriod = 10 * time.Hour
|
|
|
|
func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error {
|
|
rootCtx := cmdutil.ContextWithStopCh(context.Background(), stopCh)
|
|
rootCtx, cancelContext := context.WithCancel(rootCtx)
|
|
defer cancelContext()
|
|
g, rootCtx := errgroup.WithContext(rootCtx)
|
|
rootCtx = logf.NewContext(rootCtx, nil, "controller")
|
|
log := logf.FromContext(rootCtx)
|
|
|
|
ctx, kubeCfg, err := buildControllerContext(rootCtx, opts)
|
|
if err != nil {
|
|
return fmt.Errorf("error building controller context (options %v): %v", opts, err)
|
|
}
|
|
|
|
enabledControllers := opts.EnabledControllers()
|
|
log.Info(fmt.Sprintf("enabled controllers: %s", enabledControllers.List()))
|
|
|
|
// Start metrics server
|
|
metricsLn, err := net.Listen("tcp", opts.MetricsListenAddress)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to listen on prometheus address %s: %v", opts.MetricsListenAddress, err)
|
|
}
|
|
metricsServer := ctx.Metrics.NewServer(metricsLn)
|
|
|
|
g.Go(func() error {
|
|
<-rootCtx.Done()
|
|
// allow a timeout for graceful shutdown
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
if err := metricsServer.Shutdown(ctx); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
g.Go(func() error {
|
|
log.V(logf.InfoLevel).Info("starting metrics server", "address", metricsLn.Addr())
|
|
if err := metricsServer.Serve(metricsLn); err != http.ErrServerClosed {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Start profiler if it is enabled
|
|
if opts.EnablePprof {
|
|
profilerLn, err := net.Listen("tcp", opts.PprofAddress)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to listen on profiler address %s: %v", opts.PprofAddress, err)
|
|
}
|
|
profilerMux := http.NewServeMux()
|
|
// Add pprof endpoints to this mux
|
|
profiling.Install(profilerMux)
|
|
profilerServer := &http.Server{
|
|
Handler: profilerMux,
|
|
}
|
|
|
|
g.Go(func() error {
|
|
<-rootCtx.Done()
|
|
// allow a timeout for graceful shutdown
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
if err := profilerServer.Shutdown(ctx); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
g.Go(func() error {
|
|
log.V(logf.InfoLevel).Info("starting profiler", "address", profilerLn.Addr())
|
|
if err := profilerServer.Serve(profilerLn); err != http.ErrServerClosed {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
elected := make(chan struct{})
|
|
if opts.LeaderElect {
|
|
g.Go(func() error {
|
|
log.V(logf.InfoLevel).Info("starting leader election")
|
|
leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(kubeCfg, "leader-election"))
|
|
if err != nil {
|
|
return fmt.Errorf("error creating leader election client: %v", err)
|
|
}
|
|
|
|
errorCh := make(chan error, 1)
|
|
if err := startLeaderElection(rootCtx, opts, leaderElectionClient, ctx.Recorder, leaderelection.LeaderCallbacks{
|
|
OnStartedLeading: func(_ context.Context) {
|
|
close(elected)
|
|
},
|
|
OnStoppedLeading: func() {
|
|
select {
|
|
case <-rootCtx.Done():
|
|
// context was canceled, just return
|
|
return
|
|
default:
|
|
errorCh <- errors.New("leader election lost")
|
|
}
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
select {
|
|
case err := <-errorCh:
|
|
return err
|
|
default:
|
|
return nil
|
|
}
|
|
})
|
|
} else {
|
|
close(elected)
|
|
}
|
|
|
|
select {
|
|
case <-rootCtx.Done(): // Exit early if we are shutting down or if the errgroup has already exited with an error
|
|
// Wait for error group to complete and return
|
|
return g.Wait()
|
|
case <-elected: // Don't launch the controllers unless we have been elected leader
|
|
// Continue with setting up controller
|
|
}
|
|
|
|
for n, fn := range controller.Known() {
|
|
log := log.WithValues("controller", n)
|
|
|
|
// only run a controller if it's been enabled
|
|
if !enabledControllers.Has(n) {
|
|
log.V(logf.InfoLevel).Info("not starting controller as it's disabled")
|
|
continue
|
|
}
|
|
|
|
// don't run clusterissuers controller if scoped to a single namespace
|
|
if ctx.Namespace != "" && n == clusterissuers.ControllerName {
|
|
log.V(logf.InfoLevel).Info("not starting controller as cert-manager has been scoped to a single namespace")
|
|
continue
|
|
}
|
|
|
|
iface, err := fn(ctx)
|
|
if err != nil {
|
|
err = fmt.Errorf("error starting controller: %v", err)
|
|
|
|
cancelContext()
|
|
err2 := g.Wait() // Don't process errors, we already have an error
|
|
if err2 != nil {
|
|
return utilerrors.NewAggregate([]error{err, err2})
|
|
}
|
|
return err
|
|
}
|
|
|
|
g.Go(func() error {
|
|
log.V(logf.InfoLevel).Info("starting controller")
|
|
|
|
// TODO: make this either a constant or a command line flag
|
|
workers := 5
|
|
return iface.Run(workers, rootCtx.Done())
|
|
})
|
|
}
|
|
|
|
log.V(logf.DebugLevel).Info("starting shared informer factories")
|
|
ctx.SharedInformerFactory.Start(rootCtx.Done())
|
|
ctx.KubeSharedInformerFactory.Start(rootCtx.Done())
|
|
|
|
if utilfeature.DefaultFeatureGate.Enabled(feature.ExperimentalGatewayAPISupport) {
|
|
ctx.GWShared.Start(rootCtx.Done())
|
|
}
|
|
|
|
err = g.Wait()
|
|
if err != nil {
|
|
return fmt.Errorf("error starting controller: %v", err)
|
|
}
|
|
log.V(logf.InfoLevel).Info("control loops exited")
|
|
|
|
return nil
|
|
}
|
|
|
|
func buildControllerContext(ctx context.Context, opts *options.ControllerOptions) (*controller.Context, *rest.Config, error) {
|
|
log := logf.FromContext(ctx, "build-context")
|
|
// Load the users Kubernetes config
|
|
kubeCfg, err := clientcmd.BuildConfigFromFlags(opts.APIServerHost, opts.Kubeconfig)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("error creating rest config: %s", err.Error())
|
|
}
|
|
|
|
kubeCfg.QPS = opts.KubernetesAPIQPS
|
|
kubeCfg.Burst = opts.KubernetesAPIBurst
|
|
|
|
// Add User-Agent to client
|
|
kubeCfg = rest.AddUserAgent(kubeCfg, util.CertManagerUserAgent)
|
|
|
|
// Create a cert-manager api client
|
|
intcl, err := clientset.NewForConfig(kubeCfg)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("error creating internal group client: %s", err.Error())
|
|
}
|
|
|
|
// Create a Kubernetes api client
|
|
cl, err := kubernetes.NewForConfig(kubeCfg)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("error creating kubernetes client: %s", err.Error())
|
|
}
|
|
|
|
var gatewayAvailable bool
|
|
// Check if the Gateway API feature gate was enabled
|
|
if utilfeature.DefaultFeatureGate.Enabled(feature.ExperimentalGatewayAPISupport) {
|
|
// check if the gateway API CRDs are available. If they are not found return an error
|
|
// which will cause cert-manager to crashloopbackoff
|
|
d := cl.Discovery()
|
|
resources, err := d.ServerResourcesForGroupVersion(gwapi.GroupVersion.String())
|
|
var GatewayAPINotAvailable = "the Gateway API CRDs do not seem to be present, but " + feature.ExperimentalGatewayAPISupport +
|
|
" is set to true. Please install the gateway-api CRDs."
|
|
switch {
|
|
case apierrors.IsNotFound(err):
|
|
return nil, nil, fmt.Errorf("%s (%w)", GatewayAPINotAvailable, err)
|
|
case err != nil:
|
|
return nil, nil, fmt.Errorf("while checking if the Gateway API CRD is installed: %w", err)
|
|
case len(resources.APIResources) == 0:
|
|
return nil, nil, fmt.Errorf("%s (found %d APIResources in %s)", GatewayAPINotAvailable, len(resources.APIResources), gwapi.GroupVersion.String())
|
|
default:
|
|
gatewayAvailable = true
|
|
}
|
|
}
|
|
|
|
// Create a GatewayAPI client.
|
|
gwcl, err := gwclient.NewForConfig(kubeCfg)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("error creating kubernetes client: %s", err.Error())
|
|
}
|
|
|
|
nameservers := opts.DNS01RecursiveNameservers
|
|
if len(nameservers) == 0 {
|
|
nameservers = dnsutil.RecursiveNameservers
|
|
}
|
|
log.V(logf.InfoLevel).WithValues("nameservers", nameservers).Info("configured acme dns01 nameservers")
|
|
|
|
HTTP01SolverResourceRequestCPU, err := resource.ParseQuantity(opts.ACMEHTTP01SolverResourceRequestCPU)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("error parsing ACMEHTTP01SolverResourceRequestCPU: %s", err.Error())
|
|
}
|
|
|
|
HTTP01SolverResourceRequestMemory, err := resource.ParseQuantity(opts.ACMEHTTP01SolverResourceRequestMemory)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("error parsing ACMEHTTP01SolverResourceRequestMemory: %s", err.Error())
|
|
}
|
|
|
|
HTTP01SolverResourceLimitsCPU, err := resource.ParseQuantity(opts.ACMEHTTP01SolverResourceLimitsCPU)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("error parsing ACMEHTTP01SolverResourceLimitsCPU: %s", err.Error())
|
|
}
|
|
|
|
HTTP01SolverResourceLimitsMemory, err := resource.ParseQuantity(opts.ACMEHTTP01SolverResourceLimitsMemory)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("error parsing ACMEHTTP01SolverResourceLimitsMemory: %s", err.Error())
|
|
}
|
|
|
|
// Create event broadcaster
|
|
// Add cert-manager types to the default Kubernetes Scheme so Events can be
|
|
// logged properly
|
|
intscheme.AddToScheme(scheme.Scheme)
|
|
gwscheme.AddToScheme(scheme.Scheme)
|
|
log.V(logf.DebugLevel).Info("creating event broadcaster")
|
|
eventBroadcaster := record.NewBroadcaster()
|
|
eventBroadcaster.StartLogging(logf.WithInfof(log.V(logf.DebugLevel)).Infof)
|
|
eventBroadcaster.StartRecordingToSink(&clientv1.EventSinkImpl{Interface: cl.CoreV1().Events("")})
|
|
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
|
|
|
|
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(intcl, resyncPeriod, informers.WithNamespace(opts.Namespace))
|
|
kubeSharedInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(cl, resyncPeriod, kubeinformers.WithNamespace(opts.Namespace))
|
|
gwSharedInformerFactory := gwinformers.NewSharedInformerFactoryWithOptions(gwcl, resyncPeriod, gwinformers.WithNamespace(opts.Namespace))
|
|
|
|
acmeAccountRegistry := accounts.NewDefaultRegistry()
|
|
|
|
return &controller.Context{
|
|
RootContext: ctx,
|
|
StopCh: ctx.Done(),
|
|
RESTConfig: kubeCfg,
|
|
Client: cl,
|
|
CMClient: intcl,
|
|
GWClient: gwcl,
|
|
DiscoveryClient: cl.Discovery(),
|
|
Recorder: recorder,
|
|
KubeSharedInformerFactory: kubeSharedInformerFactory,
|
|
SharedInformerFactory: sharedInformerFactory,
|
|
GWShared: gwSharedInformerFactory,
|
|
GatewaySolverEnabled: gatewayAvailable,
|
|
Namespace: opts.Namespace,
|
|
Clock: clock.RealClock{},
|
|
Metrics: metrics.New(log, clock.RealClock{}),
|
|
ACMEOptions: controller.ACMEOptions{
|
|
HTTP01SolverImage: opts.ACMEHTTP01SolverImage,
|
|
HTTP01SolverResourceRequestCPU: HTTP01SolverResourceRequestCPU,
|
|
HTTP01SolverResourceRequestMemory: HTTP01SolverResourceRequestMemory,
|
|
HTTP01SolverResourceLimitsCPU: HTTP01SolverResourceLimitsCPU,
|
|
HTTP01SolverResourceLimitsMemory: HTTP01SolverResourceLimitsMemory,
|
|
DNS01CheckAuthoritative: !opts.DNS01RecursiveNameserversOnly,
|
|
DNS01Nameservers: nameservers,
|
|
AccountRegistry: acmeAccountRegistry,
|
|
DNS01CheckRetryPeriod: opts.DNS01CheckRetryPeriod,
|
|
},
|
|
IssuerOptions: controller.IssuerOptions{
|
|
ClusterIssuerAmbientCredentials: opts.ClusterIssuerAmbientCredentials,
|
|
IssuerAmbientCredentials: opts.IssuerAmbientCredentials,
|
|
ClusterResourceNamespace: opts.ClusterResourceNamespace,
|
|
},
|
|
IngressShimOptions: controller.IngressShimOptions{
|
|
DefaultIssuerName: opts.DefaultIssuerName,
|
|
DefaultIssuerKind: opts.DefaultIssuerKind,
|
|
DefaultIssuerGroup: opts.DefaultIssuerGroup,
|
|
DefaultAutoCertificateAnnotations: opts.DefaultAutoCertificateAnnotations,
|
|
},
|
|
CertificateOptions: controller.CertificateOptions{
|
|
EnableOwnerRef: opts.EnableCertificateOwnerRef,
|
|
CopiedAnnotationPrefixes: opts.CopiedAnnotationPrefixes,
|
|
},
|
|
SchedulerOptions: controller.SchedulerOptions{
|
|
MaxConcurrentChallenges: opts.MaxConcurrentChallenges,
|
|
},
|
|
}, kubeCfg, nil
|
|
}
|
|
|
|
func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, leaderElectionClient kubernetes.Interface, recorder record.EventRecorder, callbacks leaderelection.LeaderCallbacks) error {
|
|
// Identity used to distinguish between multiple controller manager instances
|
|
id, err := os.Hostname()
|
|
if err != nil {
|
|
return fmt.Errorf("error getting hostname: %v", err)
|
|
}
|
|
|
|
// Set up Multilock for leader election. This Multilock is here for the
|
|
// transitionary period from configmaps to leases see
|
|
// https://github.com/kubernetes-sigs/controller-runtime/pull/1144#discussion_r480173688
|
|
lockName := "cert-manager-controller"
|
|
lc := resourcelock.ResourceLockConfig{
|
|
Identity: id + "-external-cert-manager-controller",
|
|
EventRecorder: recorder,
|
|
}
|
|
ml, err := resourcelock.New(resourcelock.ConfigMapsLeasesResourceLock,
|
|
opts.LeaderElectionNamespace,
|
|
lockName,
|
|
leaderElectionClient.CoreV1(),
|
|
leaderElectionClient.CoordinationV1(),
|
|
lc,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("error creating leader election lock: %v", err)
|
|
}
|
|
|
|
// Try and become the leader and start controller manager loops
|
|
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
|
|
Lock: ml,
|
|
LeaseDuration: opts.LeaderElectionLeaseDuration,
|
|
RenewDeadline: opts.LeaderElectionRenewDeadline,
|
|
RetryPeriod: opts.LeaderElectionRetryPeriod,
|
|
ReleaseOnCancel: true,
|
|
Callbacks: callbacks,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
le.Run(ctx)
|
|
|
|
return nil
|
|
}
|