Add /livez endpoint which reports the leaderElection status
Signed-off-by: Richard Wall <richard.wall@jetstack.io>
This commit is contained in:
parent
0f9e558e63
commit
4d182e9c7b
@ -40,6 +40,7 @@ import (
|
||||
"github.com/cert-manager/cert-manager/pkg/acme/accounts"
|
||||
"github.com/cert-manager/cert-manager/pkg/controller"
|
||||
"github.com/cert-manager/cert-manager/pkg/controller/clusterissuers"
|
||||
"github.com/cert-manager/cert-manager/pkg/healthz"
|
||||
dnsutil "github.com/cert-manager/cert-manager/pkg/issuer/acme/dns/util"
|
||||
logf "github.com/cert-manager/cert-manager/pkg/logs"
|
||||
"github.com/cert-manager/cert-manager/pkg/metrics"
|
||||
@ -127,6 +128,14 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
healthzListener, err := net.Listen("tcp", opts.HealthzListenAddress)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to listen on healthz address %s: %v", opts.HealthzListenAddress, err)
|
||||
}
|
||||
healthzServer := healthz.NewServer(opts.HealthzLeaderElectionTimeout)
|
||||
g.Go(func() error {
|
||||
return healthzServer.Start(rootCtx, healthzListener)
|
||||
})
|
||||
|
||||
elected := make(chan struct{})
|
||||
if opts.LeaderElect {
|
||||
@ -136,7 +145,6 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
errorCh := make(chan error, 1)
|
||||
if err := startLeaderElection(rootCtx, opts, ctx.Client, ctx.Recorder, leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: func(_ context.Context) {
|
||||
@ -151,7 +159,7 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error {
|
||||
errorCh <- errors.New("leader election lost")
|
||||
}
|
||||
},
|
||||
}); err != nil {
|
||||
}, healthzServer.LeaderHealthzAdaptor); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -319,7 +327,7 @@ func buildControllerContextFactory(ctx context.Context, opts *options.Controller
|
||||
return ctxFactory, nil
|
||||
}
|
||||
|
||||
func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, leaderElectionClient kubernetes.Interface, recorder record.EventRecorder, callbacks leaderelection.LeaderCallbacks) error {
|
||||
func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, leaderElectionClient kubernetes.Interface, recorder record.EventRecorder, callbacks leaderelection.LeaderCallbacks, healthzAdaptor *leaderelection.HealthzAdaptor) error {
|
||||
// Identity used to distinguish between multiple controller manager instances
|
||||
id, err := os.Hostname()
|
||||
if err != nil {
|
||||
@ -353,6 +361,7 @@ func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, l
|
||||
RetryPeriod: opts.LeaderElectionRetryPeriod,
|
||||
ReleaseOnCancel: true,
|
||||
Callbacks: callbacks,
|
||||
WatchDog: healthzAdaptor,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@ -110,6 +110,12 @@ type ControllerOptions struct {
|
||||
// The host and port address, separated by a ':', that the Prometheus server
|
||||
// should expose metrics on.
|
||||
MetricsListenAddress string
|
||||
// The host and port address, separated by a ':', that the healthz server
|
||||
// should listen on.
|
||||
HealthzListenAddress string
|
||||
// Leader election healthz checks within this timeout period after the lease
|
||||
// expires will still return healthy.
|
||||
HealthzLeaderElectionTimeout time.Duration
|
||||
// PprofAddress is the address on which Go profiler will run. Should be
|
||||
// in form <host>:<port>.
|
||||
PprofAddress string
|
||||
@ -150,6 +156,11 @@ const (
|
||||
defaultMaxConcurrentChallenges = 60
|
||||
|
||||
defaultPrometheusMetricsServerAddress = "0.0.0.0:9402"
|
||||
defaultHealthzServerAddress = "127.0.0.1:10257"
|
||||
// This default value is the same as used in Kubernetes controller-manager.
|
||||
// See:
|
||||
// https://github.com/kubernetes/kubernetes/blob/806b30170c61a38fedd54cc9ede4cd6275a1ad3b/cmd/kube-controller-manager/app/controllermanager.go#L202-L209
|
||||
defaultHealthzLeaderElectionTimeout = 20 * time.Second
|
||||
|
||||
// default time period to wait between checking DNS01 and HTTP01 challenge propagation
|
||||
defaultDNS01CheckRetryPeriod = 10 * time.Second
|
||||
@ -251,6 +262,8 @@ func NewControllerOptions() *ControllerOptions {
|
||||
DNS01RecursiveNameserversOnly: defaultDNS01RecursiveNameserversOnly,
|
||||
EnableCertificateOwnerRef: defaultEnableCertificateOwnerRef,
|
||||
MetricsListenAddress: defaultPrometheusMetricsServerAddress,
|
||||
HealthzListenAddress: defaultHealthzServerAddress,
|
||||
HealthzLeaderElectionTimeout: defaultHealthzLeaderElectionTimeout,
|
||||
NumberOfConcurrentWorkers: defaultNumberOfConcurrentWorkers,
|
||||
MaxConcurrentChallenges: defaultMaxConcurrentChallenges,
|
||||
DNS01CheckRetryPeriod: defaultDNS01CheckRetryPeriod,
|
||||
@ -375,6 +388,11 @@ func (s *ControllerOptions) AddFlags(fs *pflag.FlagSet) {
|
||||
|
||||
fs.StringVar(&s.MetricsListenAddress, "metrics-listen-address", defaultPrometheusMetricsServerAddress, ""+
|
||||
"The host and port that the metrics endpoint should listen on.")
|
||||
fs.StringVar(&s.HealthzListenAddress, "healthz-listen-address", defaultHealthzServerAddress, ""+
|
||||
"The host and port that the healthz server should listen on. "+
|
||||
"The healthz server serves the /livez endpoint, which is called by the LivenessProbe.")
|
||||
fs.DurationVar(&s.HealthzLeaderElectionTimeout, "healthz-leader-election-timeout", defaultHealthzLeaderElectionTimeout, ""+
|
||||
"Leader election healthz checks within this timeout period after the lease expires will still return healthy")
|
||||
fs.BoolVar(&s.EnablePprof, "enable-profiling", cmdutil.DefaultEnableProfiling, ""+
|
||||
"Enable profiling for controller.")
|
||||
fs.StringVar(&s.PprofAddress, "profiler-address", cmdutil.DefaultProfilerAddr,
|
||||
|
||||
@ -10,6 +10,7 @@ require (
|
||||
github.com/spf13/pflag v1.0.5
|
||||
golang.org/x/sync v0.1.0
|
||||
k8s.io/apimachinery v0.26.3
|
||||
k8s.io/apiserver v0.26.3
|
||||
k8s.io/client-go v0.26.3
|
||||
k8s.io/utils v0.0.0-20230313181309-38a27ef9d749
|
||||
)
|
||||
@ -158,7 +159,6 @@ require (
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
k8s.io/api v0.26.3 // indirect
|
||||
k8s.io/apiextensions-apiserver v0.26.3 // indirect
|
||||
k8s.io/apiserver v0.26.3 // indirect
|
||||
k8s.io/component-base v0.26.3 // indirect
|
||||
k8s.io/klog/v2 v2.90.1 // indirect
|
||||
k8s.io/kube-aggregator v0.26.3 // indirect
|
||||
|
||||
@ -158,6 +158,20 @@ spec:
|
||||
resources:
|
||||
{{- toYaml . | nindent 12 }}
|
||||
{{- end }}
|
||||
# LivenessProbe settings are based on those used for the Kubernetes
|
||||
# controller-manager. See:
|
||||
# https://github.com/kubernetes/kubernetes/blob/806b30170c61a38fedd54cc9ede4cd6275a1ad3b/cmd/kubeadm/app/util/staticpod/utils.go#L241-L245
|
||||
livenessProbe:
|
||||
httpGet:
|
||||
host: 127.0.0.1
|
||||
port: 10257
|
||||
path: /livez
|
||||
scheme: HTTP
|
||||
initialDelaySeconds: 10
|
||||
periodSeconds: 10
|
||||
timeoutSeconds: 15
|
||||
successThreshold: 1
|
||||
failureThreshold: 8
|
||||
{{- with .Values.nodeSelector }}
|
||||
nodeSelector:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
|
||||
30
pkg/healthz/doc.go
Normal file
30
pkg/healthz/doc.go
Normal file
@ -0,0 +1,30 @@
|
||||
/*
|
||||
Copyright 2020 The cert-manager Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package healthz provides an HTTP server which responds to HTTP liveness probes
|
||||
// and performs health checks.
|
||||
//
|
||||
// Currently it only checks that the LeaderElector has an up to date LeaderElectionRecord.
|
||||
// Normally the parent process should exit if the LeaderElectionRecord is stale,
|
||||
// but it is possible that the process is prevented from exiting by a bug,
|
||||
// in which case this check will fail, the liveness probe will fail and then the
|
||||
// Kubelet will restart the process.
|
||||
// See the following issue and PR to understand how this problem was solved in
|
||||
// Kubernetes:
|
||||
// * [kube-controller-manager becomes deadlocked but still passes healthcheck](https://github.com/kubernetes/kubernetes/issues/70819)
|
||||
// * [Report KCM as unhealthy if leader election is wedged](https://github.com/kubernetes/kubernetes/pull/70971)
|
||||
|
||||
package healthz
|
||||
87
pkg/healthz/healthz.go
Normal file
87
pkg/healthz/healthz.go
Normal file
@ -0,0 +1,87 @@
|
||||
/*
|
||||
Copyright 2020 The cert-manager Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthz
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/client-go/tools/leaderelection"
|
||||
)
|
||||
|
||||
const (
|
||||
// Copied from pkg/metrics/metrics.go
|
||||
healthzServerReadTimeout = 8 * time.Second
|
||||
healthzServerWriteTimeout = 8 * time.Second
|
||||
healthzServerMaxHeaderBytes = 1 << 20 // 1 MiB
|
||||
)
|
||||
|
||||
// Server responds to HTTP requests to a /livez endpoint and responds with an
|
||||
// error if the LeaderElector has exited or has not observed the
|
||||
// LeaderElectionRecord for a given amount of time.
|
||||
type Server struct {
|
||||
server *http.Server
|
||||
// LeaderHealthzAdaptor is public so that it can be retrieved by the caller
|
||||
// and used as the value for `LeaderElectionConfig.Watchdog` when
|
||||
// initializing the LeaderElector.
|
||||
LeaderHealthzAdaptor *leaderelection.HealthzAdaptor
|
||||
}
|
||||
|
||||
// NewServer creates a new healthz.Server.
|
||||
// The supplied leaderElectionHealthzAdaptorTimeout controls how long after the
|
||||
// leader lease time, the leader election will be considered to have failed.
|
||||
func NewServer(leaderElectionHealthzAdaptorTimeout time.Duration) *Server {
|
||||
leaderHealthzAdaptor := leaderelection.NewLeaderHealthzAdaptor(leaderElectionHealthzAdaptorTimeout)
|
||||
mux := http.NewServeMux()
|
||||
healthz.InstallLivezHandler(mux, leaderHealthzAdaptor)
|
||||
return &Server{
|
||||
server: &http.Server{
|
||||
ReadTimeout: healthzServerReadTimeout,
|
||||
WriteTimeout: healthzServerWriteTimeout,
|
||||
MaxHeaderBytes: healthzServerMaxHeaderBytes,
|
||||
Handler: mux,
|
||||
},
|
||||
LeaderHealthzAdaptor: leaderHealthzAdaptor,
|
||||
}
|
||||
}
|
||||
|
||||
// Start makes the server listen on the supplied socket, until the supplied
|
||||
// context is cancelled, after which the server will gracefully shutdown and Start will
|
||||
// exit.
|
||||
// The server is given 5 seconds to shutdown gracefully.
|
||||
func (o *Server) Start(ctx context.Context, l net.Listener) error {
|
||||
var g errgroup.Group
|
||||
g.Go(func() error {
|
||||
if err := o.server.Serve(l); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
g.Go(func() error {
|
||||
<-ctx.Done()
|
||||
// allow a timeout for graceful shutdown
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
return o.server.Shutdown(shutdownCtx)
|
||||
})
|
||||
return g.Wait()
|
||||
}
|
||||
359
pkg/healthz/healthz_test.go
Normal file
359
pkg/healthz/healthz_test.go
Normal file
@ -0,0 +1,359 @@
|
||||
/*
|
||||
Copyright 2020 The cert-manager Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthz_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/tools/leaderelection"
|
||||
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
_ "k8s.io/klog/v2/ktesting/init" // add command line flags
|
||||
|
||||
"github.com/cert-manager/cert-manager/pkg/healthz"
|
||||
)
|
||||
|
||||
const (
|
||||
localIdentity = "local-node"
|
||||
remoteIdentity = "remote-node"
|
||||
lockDescription = "fake-resource-lock"
|
||||
)
|
||||
|
||||
// TestHealthzLivez checks the responses of the `/livez` endpoint.
|
||||
//
|
||||
// These tests are intended to demonstrate that the LeaderElectionHealthzAdaptor
|
||||
// does indeed cause the `/livez` endpoint to return errors if the healthz
|
||||
// server continues to run after the LeaderElector go-routine has exited.
|
||||
func TestHealthzLivez(t *testing.T) {
|
||||
|
||||
type input struct {
|
||||
leaderElectionEnabled bool
|
||||
resourceLock *fakeResourceLock
|
||||
onNewLeaderHook func(in *input)
|
||||
}
|
||||
|
||||
type output struct {
|
||||
responseBody string
|
||||
responseCode int
|
||||
}
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
in input
|
||||
out output
|
||||
}
|
||||
|
||||
tests := []testCase{
|
||||
{
|
||||
// OK: when leader-election is disabled (--leader-elect=false) the leader
|
||||
// election healthz adaptor always returns OK.
|
||||
//
|
||||
// LeaderElectionHealthzAdaptor.Check returns nil if its
|
||||
// LeaderElector pointer has not been set.
|
||||
// See https://github.com/kubernetes/client-go/blob/8cbca742aebe24b24f7f4e32fd999942fa9133e8/tools/leaderelection/healthzadaptor.go#L43-L52
|
||||
name: "ok-leader-election-disabled",
|
||||
in: input{
|
||||
leaderElectionEnabled: false,
|
||||
resourceLock: nil,
|
||||
},
|
||||
out: output{
|
||||
responseBody: "ok",
|
||||
responseCode: http.StatusOK,
|
||||
},
|
||||
},
|
||||
{
|
||||
// OK: when the local node is leader and has updated the leader
|
||||
// election record.
|
||||
name: "ok-local-leader",
|
||||
in: input{
|
||||
leaderElectionEnabled: true,
|
||||
resourceLock: &fakeResourceLock{
|
||||
record: &resourcelock.LeaderElectionRecord{
|
||||
HolderIdentity: localIdentity,
|
||||
},
|
||||
},
|
||||
},
|
||||
out: output{
|
||||
responseBody: "ok",
|
||||
responseCode: http.StatusOK,
|
||||
},
|
||||
},
|
||||
{
|
||||
// OK: when a remote node is leader and has updated the leader
|
||||
// election record.
|
||||
//
|
||||
// LeaderElect.Check always succeeds when another node has the
|
||||
// leader lock.
|
||||
// See https://github.com/kubernetes/client-go/blob/8cbca742aebe24b24f7f4e32fd999942fa9133e8/tools/leaderelection/leaderelection.go#L385-L399
|
||||
name: "ok-remote-leader",
|
||||
in: input{
|
||||
leaderElectionEnabled: true,
|
||||
resourceLock: &fakeResourceLock{
|
||||
record: &resourcelock.LeaderElectionRecord{
|
||||
HolderIdentity: remoteIdentity,
|
||||
},
|
||||
},
|
||||
},
|
||||
out: output{
|
||||
responseBody: "ok",
|
||||
responseCode: http.StatusOK,
|
||||
},
|
||||
},
|
||||
{
|
||||
// Failure: when update starts to fail after the local node has once
|
||||
// acquired the leader election lock.
|
||||
//
|
||||
// This is intended to simulate the situation where the
|
||||
// LeaderElector go-routine has exited, but the parent process is
|
||||
// wedged and has not exited.
|
||||
// In this situation, the /livez endpoint responds with an error,
|
||||
// because the LeaderElectionHealthzAdaptor still has a reference to
|
||||
// the no-longer running LeaderElector and its last state.
|
||||
//
|
||||
// Start LeaderElector without a LeaderElectionRecord, wait for the
|
||||
// record to be created, and then when LeaderElector calls the
|
||||
// OnNewLeader callback, set the fakeResourceLock to return an error
|
||||
// when Update is called.
|
||||
// This persistent error causes `LeaderElector.renew` to exit and
|
||||
// causes LeaderElector.Run to exit after the `RenewDeadline`.
|
||||
//
|
||||
// The LeaderElection go-routine will exit but the healthz server
|
||||
// will continue running.
|
||||
name: "fail-delayed-update-error",
|
||||
in: input{
|
||||
leaderElectionEnabled: true,
|
||||
resourceLock: &fakeResourceLock{
|
||||
record: nil,
|
||||
},
|
||||
onNewLeaderHook: func(in *input) {
|
||||
in.resourceLock.updateError = fmt.Errorf("simulated-delayed-update-error")
|
||||
},
|
||||
},
|
||||
out: output{
|
||||
responseBody: "internal server error: failed election to renew leadership on lease \n",
|
||||
responseCode: http.StatusInternalServerError,
|
||||
},
|
||||
},
|
||||
{
|
||||
// Failure: when the local node attempts to acquire the lease but fails to update
|
||||
// the leader election record.
|
||||
//
|
||||
// Like the fail-delayed-update-error test, this is intended to
|
||||
// cause the LeaderElector to exit, leaving the healthz server
|
||||
// running and querying the last state of the exited LeaderElector.
|
||||
//
|
||||
// In this simulation, there is already a LeaderElectionRecord belonging to the local node,
|
||||
// and the update is simulated to fail on the first attempt.
|
||||
//
|
||||
// TODO(wallrj): This test may be redundant because it has the same
|
||||
// effect as `fail-delayed-update-error`, in causing the running
|
||||
// LeaderElector to exit.
|
||||
name: "fail-immediate-update-error",
|
||||
in: input{
|
||||
leaderElectionEnabled: true,
|
||||
resourceLock: &fakeResourceLock{
|
||||
record: &resourcelock.LeaderElectionRecord{
|
||||
HolderIdentity: localIdentity,
|
||||
},
|
||||
updateError: fmt.Errorf("simulated-update-error"),
|
||||
},
|
||||
},
|
||||
out: output{
|
||||
responseBody: "internal server error: failed election to renew leadership on lease \n",
|
||||
responseCode: http.StatusInternalServerError,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
log, ctx := ktesting.NewTestContext(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
livezURL := "http://" + l.Addr().String() + "/livez/leaderElection"
|
||||
|
||||
const leaderElectionHealthzAdaptorTimeout = time.Millisecond
|
||||
s := healthz.NewServer(leaderElectionHealthzAdaptorTimeout)
|
||||
|
||||
g, gCTX := errgroup.WithContext(ctx)
|
||||
|
||||
leaderElected := make(chan struct{})
|
||||
|
||||
if tc.in.leaderElectionEnabled {
|
||||
const (
|
||||
leaseDuration = 500 * time.Millisecond
|
||||
renewDeadline = 400 * time.Millisecond
|
||||
retryPeriod = 300 * time.Millisecond
|
||||
)
|
||||
|
||||
log.Info(
|
||||
"Starting leader election go-routine",
|
||||
"leaseDuration", leaseDuration,
|
||||
"renewDeadline", renewDeadline,
|
||||
"retryPeriod", retryPeriod,
|
||||
)
|
||||
g.Go(func() error {
|
||||
defer log.Info("Leader election go-routine finished")
|
||||
leaderelection.RunOrDie(gCTX, leaderelection.LeaderElectionConfig{
|
||||
LeaseDuration: leaseDuration,
|
||||
RenewDeadline: renewDeadline,
|
||||
RetryPeriod: retryPeriod,
|
||||
Callbacks: leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: func(context.Context) {
|
||||
log.Info("leaderelection.LeaderCallbacks.OnStartedLeading")
|
||||
},
|
||||
OnStoppedLeading: func() {
|
||||
log.Info("leaderelection.LeaderCallbacks.OnStoppedLeading")
|
||||
},
|
||||
OnNewLeader: func(identity string) {
|
||||
log.Info("leaderelection.LeaderCallbacks.OnNewLeader", "identity", identity)
|
||||
if tc.in.onNewLeaderHook != nil {
|
||||
tc.in.onNewLeaderHook(&tc.in)
|
||||
}
|
||||
close(leaderElected)
|
||||
},
|
||||
},
|
||||
Lock: tc.in.resourceLock,
|
||||
WatchDog: s.LeaderHealthzAdaptor,
|
||||
})
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
log.Info("Starting healthz server go-routine")
|
||||
g.Go(func() error {
|
||||
defer log.Info("Healthz server go-routine finished")
|
||||
return s.Start(gCTX, l)
|
||||
})
|
||||
|
||||
if tc.in.leaderElectionEnabled {
|
||||
log.Info("Waiting for a LeaderElector to know the current leader before polling liveness endpoint")
|
||||
<-leaderElected
|
||||
}
|
||||
|
||||
const (
|
||||
pollingInterval = 500 * time.Millisecond
|
||||
pollingTimeout = 3 * time.Second
|
||||
)
|
||||
log.Info(
|
||||
"Polling liveness endpoint",
|
||||
"url", livezURL,
|
||||
"interval", pollingInterval,
|
||||
"timeout", pollingTimeout,
|
||||
)
|
||||
var (
|
||||
lastResponseCode int
|
||||
lastResponseBody string
|
||||
)
|
||||
assert.Eventually(t, func() bool {
|
||||
resp, err := http.Get(livezURL)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, resp.Body.Close())
|
||||
}()
|
||||
bodyBytes, err := ioutil.ReadAll(resp.Body)
|
||||
require.NoError(t, err)
|
||||
|
||||
lastResponseCode = resp.StatusCode
|
||||
lastResponseBody = string(bodyBytes)
|
||||
|
||||
log.Info("liveness-probe", "response-code", lastResponseCode, "response-body", lastResponseBody)
|
||||
|
||||
return tc.out.responseCode == lastResponseCode && tc.out.responseBody == lastResponseBody
|
||||
}, pollingTimeout, pollingInterval)
|
||||
|
||||
assert.Equal(t, tc.out.responseBody, lastResponseBody)
|
||||
assert.Equal(t, tc.out.responseCode, lastResponseCode)
|
||||
cancel()
|
||||
require.NoError(t, g.Wait())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// fakeResourceLock implements resourcelock.Interface sufficiently to simulate:
|
||||
// * successful acquisition of the leader election lock by the local node,
|
||||
// * current possession of the leader election lock by a remote node, and
|
||||
// * failures in leader election which cause the `LeaderElection.Run` function to exit.
|
||||
//
|
||||
// The intention is to be able to test the behavior of the
|
||||
// LeaderElectionHealthzAdaptor under those circumstances.
|
||||
type fakeResourceLock struct {
|
||||
record *resourcelock.LeaderElectionRecord
|
||||
getError error
|
||||
updateError error
|
||||
}
|
||||
|
||||
func (o *fakeResourceLock) Identity() string {
|
||||
return localIdentity
|
||||
}
|
||||
|
||||
func (o *fakeResourceLock) Describe() string {
|
||||
return lockDescription
|
||||
}
|
||||
|
||||
func (o *fakeResourceLock) Get(ctx context.Context) (*resourcelock.LeaderElectionRecord, []byte, error) {
|
||||
klog.FromContext(ctx).WithName("fakeResourceLock").Info("Get")
|
||||
if o.getError != nil {
|
||||
return nil, nil, o.getError
|
||||
}
|
||||
if o.record == nil {
|
||||
err := errors.NewNotFound(schema.ParseGroupResource("configmap"), "foo")
|
||||
return nil, nil, err
|
||||
}
|
||||
lerByte, err := json.Marshal(*o.record)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return o.record, lerByte, nil
|
||||
}
|
||||
|
||||
func (o *fakeResourceLock) Create(ctx context.Context, ler resourcelock.LeaderElectionRecord) error {
|
||||
klog.FromContext(ctx).WithName("fakeResourceLock").Info("Create")
|
||||
o.record = &ler
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *fakeResourceLock) Update(ctx context.Context, ler resourcelock.LeaderElectionRecord) error {
|
||||
klog.FromContext(ctx).WithName("fakeResourceLock").Info("Update")
|
||||
o.record = &ler
|
||||
return o.updateError
|
||||
}
|
||||
|
||||
func (o *fakeResourceLock) RecordEvent(_ string) {}
|
||||
|
||||
var _ resourcelock.Interface = &fakeResourceLock{}
|
||||
Loading…
Reference in New Issue
Block a user