Remove the double cache mechanism for cainjector

Signed-off-by: irbekrm <irbekrm@gmail.com>
This commit is contained in:
irbekrm 2023-01-23 17:38:46 +00:00
parent 1038ca4494
commit 4776597cb4
6 changed files with 89 additions and 295 deletions

View File

@ -187,58 +187,13 @@ func (o InjectorControllerOptions) RunInjectorController(ctx context.Context) er
})
}
g.Go(func() (err error) {
defer func() {
o.log.Error(err, "manager goroutine exited")
}()
if err = mgr.Start(gctx); err != nil {
return fmt.Errorf("error running manager: %v", err)
}
return nil
})
select {
case <-gctx.Done(): // Exit early if we are shutting down or if the manager has exited with an error
// Wait for error group to complete and return
return g.Wait()
case <-mgr.Elected(): // Don't launch the controllers unless we have been elected leader
// Continue with setting up controller
// TODO: make the controllers to be started optional and make it possible to parameterize whether certs are watched
err = cainjector.RegisterAllInjectors(gctx, mgr, o.Namespace)
if err != nil {
o.log.Error(err, "failed to register controllers", err)
}
// Retry the start up of the certificate based controller in case the
// cert-manager CRDs have not been installed yet or in case the CRD API is
// not working. E.g. The conversion webhook has not yet had its CA bundle
// injected by the secret based controller, which is launched in its own
// goroutine.
// When shutting down, return the last error if there is one.
// Never retry if the controller exits cleanly.
g.Go(func() (err error) {
for {
err = cainjector.RegisterCertificateBased(gctx, mgr, o.Namespace)
if err == nil {
return
}
o.log.Error(err, "Error registering certificate based controllers. Retrying after 5 seconds.")
select {
case <-time.After(time.Second * 5):
case <-gctx.Done():
return
}
}
})
// Secrets based controller is started in its own goroutine so that it can
// perform injection of the CA bundle into any webhooks required by the
// cert-manager CRD API.
// We do not retry this controller because it only interacts with core APIs
// which should always be in a working state.
g.Go(func() (err error) {
if err = cainjector.RegisterSecretBased(gctx, mgr, o.Namespace); err != nil {
return fmt.Errorf("error registering secret controller: %v", err)
}
return
})
return g.Wait()
if err = mgr.Start(gctx); err != nil {
return fmt.Errorf("error running manager: %v", err)
}
return nil
}

View File

@ -36,6 +36,7 @@ import (
// dropNotFound ignores the given error if it's a not-found error,
// but otherwise just returns the argument.
// TODO: we don't use this pattern anywhere else in this project so probably doesn't make sense here either
func dropNotFound(err error) error {
if apierrors.IsNotFound(err) {
return nil
@ -73,6 +74,7 @@ type InjectTarget interface {
// Injectable is a point in a Kubernetes API object that represents a Kubernetes Service
// reference with a corresponding spot for a CA bundle.
// TODO: either add some actual functionality or remove this empty interface
type Injectable interface {
}
@ -85,8 +87,6 @@ type Injectable interface {
type CertInjector interface {
// NewTarget creates a new InjectTarget containing an empty underlying object.
NewTarget() InjectTarget
// IsAlpha tells the client to disregard "no matching kind" type of errors
IsAlpha() bool
}
// genericInjectReconciler is a reconciler that knows how to check if a given object is
@ -127,6 +127,7 @@ func splitNamespacedName(nameStr string) types.NamespacedName {
func (r *genericInjectReconciler) Reconcile(_ context.Context, req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
log := r.log.WithValues(r.resourceName, req.NamespacedName)
log.V(logf.DebugLevel).Info("Parsing injectable")
// fetch the target object
target := r.injector.NewTarget()

View File

@ -139,11 +139,12 @@ func injectableCAFromIndexer(rawObj client.Object) []string {
// (webhooks, api services, etc) that reference it.
type secretToInjectableFunc func(log logr.Logger, cl client.Reader, certName types.NamespacedName) []ctrl.Request
// buildSecretToInjectableFunc creates a certificateToInjectableFunc that maps from certificates to the given type of injectable.
// buildSecretToInjectableFunc creates a certificateToInjectableFunc that maps from secrets to the given type of injectable.
func buildSecretToInjectableFunc(listTyp runtime.Object, resourceName string) secretToInjectableFunc {
return func(log logr.Logger, cl client.Reader, secretName types.NamespacedName) []ctrl.Request {
log = log.WithValues("type", resourceName)
objs := listTyp.DeepCopyObject().(client.ObjectList)
// TODO: ensure that this is cache lister, not a direct client
if err := cl.List(context.Background(), objs, client.MatchingFields{injectFromSecretPath: secretName.String()}); err != nil {
log.Error(err, "unable to fetch injectables associated with secret")
return nil

View File

@ -23,6 +23,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)
// TODO: consider Go generics for all this stuff
// this contains implementations of CertInjector (and dependents)
// for various Kubernetes types that contain CA bundles.
// This allows us to build a generic "injection" controller, and parameterize
@ -32,10 +33,6 @@ import (
// mutatingWebhookInjector knows how to create an InjectTarget a MutatingWebhookConfiguration.
type mutatingWebhookInjector struct{}
func (i mutatingWebhookInjector) IsAlpha() bool {
return false
}
func (i mutatingWebhookInjector) NewTarget() InjectTarget {
return &mutatingWebhookTarget{}
}
@ -62,10 +59,6 @@ func (i validatingWebhookInjector) NewTarget() InjectTarget {
return &validatingWebhookTarget{}
}
func (i validatingWebhookInjector) IsAlpha() bool {
return false
}
// validatingWebhookTarget knows how to set CA data for all the webhooks
// in a validatingWebhookConfiguration.
type validatingWebhookTarget struct {
@ -89,10 +82,6 @@ func (i apiServiceInjector) NewTarget() InjectTarget {
return &apiServiceTarget{}
}
func (i apiServiceInjector) IsAlpha() bool {
return false
}
// apiServiceTarget knows how to set CA data for the CA bundle in
// the APIService.
type apiServiceTarget struct {
@ -115,10 +104,6 @@ func (i crdConversionInjector) NewTarget() InjectTarget {
return &crdConversionTarget{}
}
func (i crdConversionInjector) IsAlpha() bool {
return false
}
// crdConversionTarget knows how to set CA data for the conversion webhook in CRDs
type crdConversionTarget struct {
obj apiext.CustomResourceDefinition

View File

@ -21,22 +21,17 @@ import (
"fmt"
"os"
logf "github.com/cert-manager/cert-manager/pkg/logs"
"github.com/go-logr/logr"
"golang.org/x/sync/errgroup"
admissionreg "k8s.io/api/admissionregistration/v1"
corev1 "k8s.io/api/core/v1"
apiext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
apireg "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
cmapi "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
)
// injectorSet describes a particular setup of the injector controller
@ -44,6 +39,7 @@ type injectorSetup struct {
resourceName string
injector CertInjector
listType runtime.Object
objType client.Object
}
var (
@ -51,113 +47,109 @@ var (
resourceName: "mutatingwebhookconfiguration",
injector: mutatingWebhookInjector{},
listType: &admissionreg.MutatingWebhookConfigurationList{},
objType: &admissionreg.MutatingWebhookConfiguration{},
}
ValidatingWebhookSetup = injectorSetup{
resourceName: "validatingwebhookconfiguration",
injector: validatingWebhookInjector{},
listType: &admissionreg.ValidatingWebhookConfigurationList{},
objType: &admissionreg.ValidatingWebhookConfiguration{},
}
APIServiceSetup = injectorSetup{
resourceName: "apiservice",
injector: apiServiceInjector{},
listType: &apireg.APIServiceList{},
objType: &apireg.APIService{},
}
CRDSetup = injectorSetup{
resourceName: "customresourcedefinition",
injector: crdConversionInjector{},
listType: &apiext.CustomResourceDefinitionList{},
objType: &apiext.CustomResourceDefinition{},
}
injectorSetups = []injectorSetup{MutatingWebhookSetup, ValidatingWebhookSetup, APIServiceSetup, CRDSetup}
ControllerNames []string
injectorSetups = []injectorSetup{MutatingWebhookSetup, ValidatingWebhookSetup, APIServiceSetup, CRDSetup}
)
// registerAllInjectors registers all injectors and based on the
// graduation state of the injector decides how to log no kind/resource match errors
func registerAllInjectors(ctx context.Context, groupName string, mgr ctrl.Manager, sources []caDataSource, client client.Client, ca cache.Cache, namespace string) error {
controllers := make([]controller.Controller, len(injectorSetups))
for i, setup := range injectorSetups {
controller, err := newGenericInjectionController(ctx, groupName, mgr, setup, sources, ca, client, namespace)
if err != nil {
if !meta.IsNoMatchError(err) || !setup.injector.IsAlpha() {
return err
}
ctrl.Log.V(logf.WarnLevel).Info("unable to register injector which is still in an alpha phase."+
" Enable the feature on the API server in order to use this injector",
"injector", setup.resourceName)
}
controllers[i] = controller
func RegisterAllInjectors(ctx context.Context, mgr ctrl.Manager, namespace string) error {
// TODO: refactor
sds := &secretDataSource{
client: mgr.GetClient(),
}
g, gctx := errgroup.WithContext(ctx)
cds := &certificateDataSource{
client: mgr.GetClient(),
}
cfg := mgr.GetConfig()
caBundle, err := dataFromSliceOrFile(cfg.CAData, cfg.CAFile)
if err != nil {
return err
}
kds := &kubeconfigDataSource{
apiserverCABundle: caBundle,
}
// Registers a c/r controller for each of APIService, CustomResourceDefinition, Mutating/ValidatingWebhookConfiguration
// TODO: add a flag to allow users to configure which of these controllers should be registered
for _, setup := range injectorSetups {
log := ctrl.Log.WithName(setup.objType.GetName())
log.Info("Registering new controller")
r := &genericInjectReconciler{
injector: setup.injector,
namespace: namespace,
log: log,
Client: mgr.GetClient(),
// TODO: refactor
sources: []caDataSource{
sds,
cds,
kds,
},
}
g.Go(func() (err error) {
if err = ca.Start(gctx); err != nil {
// This code does some magic to make it possible to filter
// injectables by whether they have the annotations we're
// interested in when determining whether to trigger reconcilers
secretTyp := setup.injector.NewTarget().AsObject()
if err := mgr.GetFieldIndexer().IndexField(ctx, secretTyp, injectFromSecretPath, injectableCAFromSecretIndexer); err != nil {
err := fmt.Errorf("error making injectable indexable by inject-ca-from-secret annotation: %w", err)
return err
}
return nil
})
if ca.WaitForCacheSync(gctx) {
for _, controller := range controllers {
if gctx.Err() != nil {
break
}
controller := controller
g.Go(func() (err error) {
return controller.Start(gctx)
})
certTyp := setup.injector.NewTarget().AsObject()
if err := mgr.GetFieldIndexer().IndexField(ctx, certTyp, injectFromPath, injectableCAFromIndexer); err != nil {
err := fmt.Errorf("error making injectable indexable by inject-ca-from path: %w", err)
return err
}
} else {
// I assume that if the cache sync fails, then the already-started cache
// will exit with a meaningful error which will be returned by the errgroup
ctrl.Log.Error(nil, "timed out or failed while waiting for cache")
}
return g.Wait()
}
// newGenericInjectionController creates a controller and adds relevant watches
// and indexers to the supplied cache.
// TODO: We can't use the controller-runtime controller.Builder mechanism here
// because it doesn't allow us to specify the cache to which we link watches,
// indexes and event sources. Keep checking new controller-runtime releases for
// improvements which might make this easier:
// * https://github.com/kubernetes-sigs/controller-runtime/issues/764
func newGenericInjectionController(ctx context.Context, groupName string, mgr ctrl.Manager,
setup injectorSetup, sources []caDataSource, ca cache.Cache,
client client.Client, namespace string) (controller.Controller, error) {
log := ctrl.Log.WithName(groupName).WithName(setup.resourceName)
typ := setup.injector.NewTarget().AsObject()
c, err := controller.NewUnmanaged(
fmt.Sprintf("controller-for-%s-%s", groupName, setup.resourceName),
mgr,
controller.Options{
Reconciler: &genericInjectReconciler{
Client: client,
sources: sources,
log: log.WithName("generic-inject-reconciler"),
resourceName: setup.resourceName,
injector: setup.injector,
namespace: namespace,
},
LogConstructor: func(request *reconcile.Request) logr.Logger { return log },
})
if err != nil {
return nil, err
}
if err := c.Watch(source.NewKindWithCache(typ, ca), &handler.EnqueueRequestForObject{}); err != nil {
return nil, err
}
for _, s := range sources {
if err := s.ApplyTo(ctx, mgr, setup, c, ca); err != nil {
return nil, err
if err := ctrl.NewControllerManagedBy(mgr).
For(setup.objType).
Watches(&source.Kind{Type: new(corev1.Secret)}, handler.EnqueueRequestsFromMapFunc((&secretForInjectableMapper{
Client: mgr.GetClient(),
log: log,
secretToInjectable: buildSecretToInjectableFunc(setup.listType, setup.resourceName),
}).Map)).
Watches(&source.Kind{Type: new(corev1.Secret)}, handler.EnqueueRequestsFromMapFunc((&secretForCertificateMapper{
Client: mgr.GetClient(),
log: log,
certificateToInjectable: buildCertToInjectableFunc(setup.listType, setup.resourceName),
}).Map)).
// TODO: make this bit optional
Watches(&source.Kind{Type: new(cmapi.Certificate)},
handler.EnqueueRequestsFromMapFunc((&certMapper{
Client: mgr.GetClient(),
log: log,
toInjectable: buildCertToInjectableFunc(setup.listType, setup.resourceName),
}).Map)).
Complete(r); err != nil {
err = fmt.Errorf("error registering controller for %s: %w", setup.objType.GetName(), err)
return err
}
}
return c, nil
return nil
}
// dataFromSliceOrFile returns data from the slice (if non-empty), or from the file,
@ -175,81 +167,3 @@ func dataFromSliceOrFile(data []byte, file string) ([]byte, error) {
}
return nil, nil
}
// RegisterCertificateBased registers all known injection controllers that
// target Certificate resources with the given manager, and adds relevant
// indices.
// The registered controllers require the cert-manager API to be available
// in order to run.
func RegisterCertificateBased(ctx context.Context, mgr ctrl.Manager, namespace string) error {
cache, client, err := newIndependentCacheAndDelegatingClient(mgr, namespace)
if err != nil {
return err
}
return registerAllInjectors(
ctx,
"certificate",
mgr,
[]caDataSource{
&certificateDataSource{client: cache},
},
client,
cache,
namespace,
)
}
// RegisterSecretBased registers all known injection controllers that
// target Secret resources with the given manager, and adds relevant
// indices.
// The registered controllers only require the corev1 APi to be available in
// order to run.
func RegisterSecretBased(ctx context.Context, mgr ctrl.Manager, namespace string) error {
cache, client, err := newIndependentCacheAndDelegatingClient(mgr, namespace)
if err != nil {
return err
}
return registerAllInjectors(
ctx,
"secret",
mgr,
[]caDataSource{
&secretDataSource{client: cache},
&kubeconfigDataSource{},
},
client,
cache,
namespace,
)
}
// newIndependentCacheAndDelegatingClient creates a cache and a delegating
// client which are independent of the cache of the manager.
// This allows us to start the manager and secrets based injectors before the
// cert-manager Certificates CRDs have been installed and before the CA bundles
// have been injected into the cert-manager CRDs, by the secrets based injector,
// which is running in a separate goroutine.
func newIndependentCacheAndDelegatingClient(mgr ctrl.Manager, namespace string) (cache.Cache, client.Client, error) {
cacheOptions := cache.Options{
Scheme: mgr.GetScheme(),
Mapper: mgr.GetRESTMapper(),
}
if namespace != "" {
cacheOptions.Namespace = namespace
}
ca, err := cache.New(mgr.GetConfig(), cacheOptions)
if err != nil {
return nil, nil, err
}
clientOptions := client.Options{
Scheme: mgr.GetScheme(),
Mapper: mgr.GetRESTMapper(),
}
client, err := cluster.DefaultNewClient(ca, mgr.GetConfig(), clientOptions)
if err != nil {
return nil, nil, err
}
return ca, client, nil
}

View File

@ -25,12 +25,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
cmapi "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
cmmeta "github.com/cert-manager/cert-manager/pkg/apis/meta/v1"
@ -54,9 +49,6 @@ type caDataSource interface {
// It is up to the ReadCA implementation to inform the user why the CA
// failed to read.
ReadCA(ctx context.Context, log logr.Logger, metaObj metav1.Object, namespace string) (ca []byte, err error)
// ApplyTo applies any required watchers to the given controller.
ApplyTo(ctx context.Context, mgr ctrl.Manager, setup injectorSetup, controller controller.Controller, ca cache.Cache) error
}
// kubeconfigDataSource reads the ca bundle provided as part of the struct
@ -74,16 +66,6 @@ func (c *kubeconfigDataSource) ReadCA(ctx context.Context, log logr.Logger, meta
return c.apiserverCABundle, nil
}
func (c *kubeconfigDataSource) ApplyTo(ctx context.Context, mgr ctrl.Manager, setup injectorSetup, _ controller.Controller, _ cache.Cache) error {
cfg := mgr.GetConfig()
caBundle, err := dataFromSliceOrFile(cfg.CAData, cfg.CAFile)
if err != nil {
return err
}
c.apiserverCABundle = caBundle
return nil
}
// certificateDataSource reads a CA bundle by fetching the Certificate named in
// the 'cert-manager.io/inject-ca-from' annotation in the form
// 'namespace/name'.
@ -150,33 +132,6 @@ func (c *certificateDataSource) ReadCA(ctx context.Context, log logr.Logger, met
return caData, nil
}
func (c *certificateDataSource) ApplyTo(ctx context.Context, mgr ctrl.Manager, setup injectorSetup, controller controller.Controller, ca cache.Cache) error {
typ := setup.injector.NewTarget().AsObject()
if err := ca.IndexField(ctx, typ, injectFromPath, injectableCAFromIndexer); err != nil {
return err
}
if err := controller.Watch(source.NewKindWithCache(&cmapi.Certificate{}, ca),
handler.EnqueueRequestsFromMapFunc((&certMapper{
Client: ca,
log: ctrl.Log.WithName("cert-mapper"),
toInjectable: buildCertToInjectableFunc(setup.listType, setup.resourceName),
}).Map),
); err != nil {
return err
}
if err := controller.Watch(source.NewKindWithCache(&corev1.Secret{}, ca),
handler.EnqueueRequestsFromMapFunc((&secretForCertificateMapper{
Client: ca,
log: ctrl.Log.WithName("secret-for-certificate-mapper"),
certificateToInjectable: buildCertToInjectableFunc(setup.listType, setup.resourceName),
}).Map),
); err != nil {
return err
}
return nil
}
// secretDataSource reads a CA bundle from a Secret resource named using the
// 'cert-manager.io/inject-ca-from-secret' annotation in the form
// 'namespace/name'.
@ -234,20 +189,3 @@ func (c *secretDataSource) ReadCA(ctx context.Context, log logr.Logger, metaObj
return caData, nil
}
func (c *secretDataSource) ApplyTo(ctx context.Context, mgr ctrl.Manager, setup injectorSetup, controller controller.Controller, ca cache.Cache) error {
typ := setup.injector.NewTarget().AsObject()
if err := ca.IndexField(ctx, typ, injectFromSecretPath, injectableCAFromSecretIndexer); err != nil {
return err
}
if err := controller.Watch(source.NewKindWithCache(&corev1.Secret{}, ca),
handler.EnqueueRequestsFromMapFunc((&secretForInjectableMapper{
Client: ca,
log: ctrl.Log.WithName("secret-mapper"),
secretToInjectable: buildSecretToInjectableFunc(setup.listType, setup.resourceName),
}).Map),
); err != nil {
return err
}
return nil
}