Use NewUnmanaged and separate caches for each controller
Signed-off-by: Richard Wall <richard.wall@jetstack.io>
This commit is contained in:
parent
bcff4edb0f
commit
81874895b0
@ -112,12 +112,12 @@ func (o InjectorControllerOptions) RunInjectorController(stopCh <-chan struct{})
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err := cainjector.RegisterSecretBased(mgr); err != nil {
|
||||
if err := cainjector.RegisterSecretBased(mgr, stopCh); err != nil {
|
||||
o.log.Error(err, "error registering core-only controllers")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err := cainjector.RegisterCertificateBased(mgr); err != nil {
|
||||
if err := cainjector.RegisterCertificateBased(mgr, stopCh); err != nil {
|
||||
o.log.Error(err, "error registering controllers")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package cainjector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
|
||||
logf "github.com/jetstack/cert-manager/pkg/logs"
|
||||
@ -27,6 +28,10 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
apireg "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/source"
|
||||
)
|
||||
|
||||
// injectorSet describes a particular setup of the injector controller
|
||||
@ -67,9 +72,9 @@ var (
|
||||
|
||||
// registerAllInjectors registers all injectors and based on the
|
||||
// graduation state of the injector decides how to log no kind/resource match errors
|
||||
func registerAllInjectors(mgr ctrl.Manager, sources ...caDataSource) error {
|
||||
func registerAllInjectors(mgr ctrl.Manager, stopCh <-chan struct{}, sources []caDataSource) error {
|
||||
for _, setup := range injectorSetups {
|
||||
if err := Register(mgr, setup, sources...); err != nil {
|
||||
if err := Register(mgr, stopCh, setup, sources); err != nil {
|
||||
if !meta.IsNoMatchError(err) || !setup.injector.IsAlpha() {
|
||||
return err
|
||||
}
|
||||
@ -82,22 +87,56 @@ func registerAllInjectors(mgr ctrl.Manager, sources ...caDataSource) error {
|
||||
}
|
||||
|
||||
// Register registers an injection controller with the given manager, and adds relevant indicies.
|
||||
func Register(mgr ctrl.Manager, setup injectorSetup, sources ...caDataSource) error {
|
||||
func Register(mgr ctrl.Manager, stopCh <-chan struct{}, setup injectorSetup, sources []caDataSource) error {
|
||||
typ := setup.injector.NewTarget().AsObject()
|
||||
builder := ctrl.NewControllerManagedBy(mgr).For(typ)
|
||||
|
||||
cacheOptions := cache.Options{
|
||||
Scheme: mgr.GetScheme(),
|
||||
Mapper: mgr.GetRESTMapper(),
|
||||
}
|
||||
ca, err := cache.New(mgr.GetConfig(), cacheOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating cache: %v", err)
|
||||
}
|
||||
|
||||
c, err := controller.NewUnmanaged(
|
||||
// strings.ToLower(typ.GetObjectKind().GroupVersionKind().Kind),
|
||||
"controller-xxx",
|
||||
mgr,
|
||||
controller.Options{
|
||||
Reconciler: &genericInjectReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
sources: sources,
|
||||
log: ctrl.Log.WithName("inject-controller"),
|
||||
resourceName: setup.resourceName,
|
||||
injector: setup.injector,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create controller: %v", err)
|
||||
}
|
||||
if err := c.Watch(source.NewKindWithCache(typ, ca), &handler.EnqueueRequestForObject{}); err != nil {
|
||||
return fmt.Errorf("unable to watch: %v", err)
|
||||
}
|
||||
|
||||
for _, s := range sources {
|
||||
if err := s.ApplyTo(mgr, setup, builder); err != nil {
|
||||
if err := s.ApplyTo(mgr, setup, c, ca); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return builder.Complete(&genericInjectReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
sources: sources,
|
||||
log: ctrl.Log.WithName("inject-controller"),
|
||||
resourceName: setup.resourceName,
|
||||
injector: setup.injector,
|
||||
})
|
||||
go func() {
|
||||
<-mgr.Elected()
|
||||
if err := ca.Start(stopCh); err != nil {
|
||||
ctrl.Log.Error(err, "error starting cache")
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
<-mgr.Elected()
|
||||
if err := c.Start(stopCh); err != nil {
|
||||
ctrl.Log.Error(err, "error starting controller")
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// dataFromSliceOrFile returns data from the slice (if non-empty), or from the file,
|
||||
@ -121,11 +160,14 @@ func dataFromSliceOrFile(data []byte, file string) ([]byte, error) {
|
||||
// indices.
|
||||
// The registered controllers require the cert-manager API to be available
|
||||
// in order to run.
|
||||
func RegisterCertificateBased(mgr ctrl.Manager) error {
|
||||
sources := []caDataSource{
|
||||
&certificateDataSource{client: mgr.GetClient()},
|
||||
}
|
||||
return registerAllInjectors(mgr, sources...)
|
||||
func RegisterCertificateBased(mgr ctrl.Manager, stopCh <-chan struct{}) error {
|
||||
return registerAllInjectors(
|
||||
mgr,
|
||||
stopCh,
|
||||
[]caDataSource{
|
||||
&certificateDataSource{client: mgr.GetClient()},
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// RegisterSecretBased registers all known injection controllers that
|
||||
@ -133,10 +175,13 @@ func RegisterCertificateBased(mgr ctrl.Manager) error {
|
||||
// indices.
|
||||
// The registered controllers only require the corev1 APi to be available in
|
||||
// order to run.
|
||||
func RegisterSecretBased(mgr ctrl.Manager) error {
|
||||
sources := []caDataSource{
|
||||
&secretDataSource{client: mgr.GetClient()},
|
||||
&kubeconfigDataSource{},
|
||||
}
|
||||
return registerAllInjectors(mgr, sources...)
|
||||
func RegisterSecretBased(mgr ctrl.Manager, stopCh <-chan struct{}) error {
|
||||
return registerAllInjectors(
|
||||
mgr,
|
||||
stopCh,
|
||||
[]caDataSource{
|
||||
&secretDataSource{client: mgr.GetClient()},
|
||||
&kubeconfigDataSource{},
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
@ -26,7 +26,9 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/source"
|
||||
|
||||
@ -52,8 +54,8 @@ type caDataSource interface {
|
||||
// failed to read.
|
||||
ReadCA(ctx context.Context, log logr.Logger, metaObj metav1.Object) (ca []byte, err error)
|
||||
|
||||
// ApplyTo applies any required watchers to the given controller builder.
|
||||
ApplyTo(mgr ctrl.Manager, setup injectorSetup, builder *ctrl.Builder) error
|
||||
// ApplyTo applies any required watchers to the given controller.
|
||||
ApplyTo(mgr ctrl.Manager, setup injectorSetup, controller controller.Controller, ca cache.Cache) error
|
||||
}
|
||||
|
||||
// kubeconfigDataSource reads the ca bundle provided as part of the struct
|
||||
@ -71,7 +73,7 @@ func (c *kubeconfigDataSource) ReadCA(ctx context.Context, log logr.Logger, meta
|
||||
return c.apiserverCABundle, nil
|
||||
}
|
||||
|
||||
func (c *kubeconfigDataSource) ApplyTo(mgr ctrl.Manager, setup injectorSetup, builder *ctrl.Builder) error {
|
||||
func (c *kubeconfigDataSource) ApplyTo(mgr ctrl.Manager, setup injectorSetup, _ controller.Controller, _ cache.Cache) error {
|
||||
cfg := mgr.GetConfig()
|
||||
caBundle, err := dataFromSliceOrFile(cfg.CAData, cfg.CAFile)
|
||||
if err != nil {
|
||||
@ -140,26 +142,30 @@ func (c *certificateDataSource) ReadCA(ctx context.Context, log logr.Logger, met
|
||||
return caData, nil
|
||||
}
|
||||
|
||||
func (c *certificateDataSource) ApplyTo(mgr ctrl.Manager, setup injectorSetup, builder *ctrl.Builder) error {
|
||||
func (c *certificateDataSource) ApplyTo(mgr ctrl.Manager, setup injectorSetup, controller controller.Controller, ca cache.Cache) error {
|
||||
typ := setup.injector.NewTarget().AsObject()
|
||||
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), typ, injectFromPath, injectableCAFromIndexer); err != nil {
|
||||
if err := ca.IndexField(context.TODO(), typ, injectFromPath, injectableCAFromIndexer); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
builder.Watches(&source.Kind{Type: &cmapi.Certificate{}},
|
||||
if err := controller.Watch(source.NewKindWithCache(&cmapi.Certificate{}, ca),
|
||||
&handler.EnqueueRequestsFromMapFunc{ToRequests: &certMapper{
|
||||
Client: mgr.GetClient(),
|
||||
log: ctrl.Log.WithName("cert-mapper"),
|
||||
toInjectable: buildCertToInjectableFunc(setup.listType, setup.resourceName),
|
||||
}},
|
||||
).
|
||||
Watches(&source.Kind{Type: &corev1.Secret{}},
|
||||
&handler.EnqueueRequestsFromMapFunc{ToRequests: &secretForCertificateMapper{
|
||||
Client: mgr.GetClient(),
|
||||
log: ctrl.Log.WithName("secret-for-certificate-mapper"),
|
||||
certificateToInjectable: buildCertToInjectableFunc(setup.listType, setup.resourceName),
|
||||
}},
|
||||
)
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := controller.Watch(source.NewKindWithCache(&corev1.Secret{}, ca),
|
||||
&handler.EnqueueRequestsFromMapFunc{ToRequests: &secretForCertificateMapper{
|
||||
Client: mgr.GetClient(),
|
||||
log: ctrl.Log.WithName("secret-for-certificate-mapper"),
|
||||
certificateToInjectable: buildCertToInjectableFunc(setup.listType, setup.resourceName),
|
||||
}},
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -213,18 +219,16 @@ func (c *secretDataSource) ReadCA(ctx context.Context, log logr.Logger, metaObj
|
||||
return caData, nil
|
||||
}
|
||||
|
||||
func (c *secretDataSource) ApplyTo(mgr ctrl.Manager, setup injectorSetup, builder *ctrl.Builder) error {
|
||||
func (c *secretDataSource) ApplyTo(mgr ctrl.Manager, setup injectorSetup, controller controller.Controller, ca cache.Cache) error {
|
||||
typ := setup.injector.NewTarget().AsObject()
|
||||
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), typ, injectFromSecretPath, injectableCAFromSecretIndexer); err != nil {
|
||||
if err := ca.IndexField(context.TODO(), typ, injectFromSecretPath, injectableCAFromSecretIndexer); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
builder.Watches(&source.Kind{Type: &corev1.Secret{}},
|
||||
return controller.Watch(source.NewKindWithCache(&corev1.Secret{}, ca),
|
||||
&handler.EnqueueRequestsFromMapFunc{ToRequests: &secretForInjectableMapper{
|
||||
Client: mgr.GetClient(),
|
||||
log: ctrl.Log.WithName("secret-mapper"),
|
||||
secretToInjectable: buildSecretToInjectableFunc(setup.listType, setup.resourceName),
|
||||
}},
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user