diff --git a/server/etcdserver/api/etcdhttp/health.go b/server/etcdserver/api/etcdhttp/health.go index 234580805b40..f430872bfa6d 100644 --- a/server/etcdserver/api/etcdhttp/health.go +++ b/server/etcdserver/api/etcdhttp/health.go @@ -44,6 +44,7 @@ type ServerHealth interface { Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error) Config() config.ServerConfig AuthStore() auth.AuthStore + IsDefragActive() bool } // HandleHealth registers metrics and health handlers. it checks health by using v3 range request @@ -207,14 +208,14 @@ type CheckRegistry struct { func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) { reg := CheckRegistry{path: "/livez", checks: make(map[string]HealthCheck)} - reg.Register("serializable_read", serializableReadCheck(server)) + reg.Register("serializable_read", serializableReadCheck(server, true /* skipCheckDuringDefrag */)) reg.InstallHttpEndpoints(lg, mux) } func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) { reg := CheckRegistry{path: "/readyz", checks: make(map[string]HealthCheck)} reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT)) - reg.Register("serializable_read", serializableReadCheck(server)) + reg.Register("serializable_read", serializableReadCheck(server, false /* skipCheckDuringDefrag */)) reg.InstallHttpEndpoints(lg, mux) } @@ -355,8 +356,16 @@ func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) e } } -func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error { +func serializableReadCheck(srv ServerHealth, skipCheckDuringDefrag bool) func(ctx context.Context) error { return func(ctx context.Context) error { + // skips the check if defrag is active. + if skipCheckDuringDefrag && srv.IsDefragActive() { + return nil + } + // returns early if defrag is active. + if srv.IsDefragActive() { + return fmt.Errorf("defrag is active") + } ctx = srv.AuthStore().WithRoot(ctx) _, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true}) if err != nil { diff --git a/server/etcdserver/api/etcdhttp/health_test.go b/server/etcdserver/api/etcdhttp/health_test.go index 122fbf6adcf5..03d811aaa3de 100644 --- a/server/etcdserver/api/etcdhttp/health_test.go +++ b/server/etcdserver/api/etcdhttp/health_test.go @@ -38,9 +38,10 @@ import ( type fakeHealthServer struct { fakeServer - apiError error - missingLeader bool - authStore auth.AuthStore + apiError error + missingLeader bool + authStore auth.AuthStore + isDefragActive bool } func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) { @@ -58,6 +59,10 @@ func (s *fakeHealthServer) Leader() types.ID { return types.ID(raft.None) } +func (s *fakeHealthServer) IsDefragActive() bool { + return s.isDefragActive +} + func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore } func (s *fakeHealthServer) ClientCertAuthEnabled() bool { return false } @@ -69,9 +74,10 @@ type healthTestCase struct { inResult []string notInResult []string - alarms []*pb.AlarmMember - apiError error - missingLeader bool + alarms []*pb.AlarmMember + apiError error + missingLeader bool + isDefragActive bool } func TestHealthHandler(t *testing.T) { @@ -278,14 +284,30 @@ func TestSerializableReadCheck(t *testing.T) { expectStatusCode: http.StatusServiceUnavailable, inResult: []string{"[-]serializable_read failed: range error: Unexpected error"}, }, + { + name: "Alive if defrag is active and range api is not available", + healthCheckURL: "/livez", + isDefragActive: true, + apiError: fmt.Errorf("timeout error"), + expectStatusCode: http.StatusOK, + }, + { + name: "Not ready if defrag is active and range api is not available", + healthCheckURL: "/readyz", + isDefragActive: true, + apiError: fmt.Errorf("timeout error"), + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[-]serializable_read failed: defrag is active"}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { mux := http.NewServeMux() logger := zaptest.NewLogger(t) s := &fakeHealthServer{ - apiError: tt.apiError, - authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0), + apiError: tt.apiError, + authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0), + isDefragActive: tt.isDefragActive, } HandleHealth(logger, mux, s) ts := httptest.NewServer(mux) diff --git a/server/etcdserver/api/v3rpc/grpc.go b/server/etcdserver/api/v3rpc/grpc.go index d4368c50354c..aece8668c8ac 100644 --- a/server/etcdserver/api/v3rpc/grpc.go +++ b/server/etcdserver/api/v3rpc/grpc.go @@ -78,6 +78,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer hsrv := health.NewServer() healthNotifier := newHealthNotifier(hsrv, s) + s.HealthNtf = healthNotifier healthpb.RegisterHealthServer(grpcServer, hsrv) pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, healthNotifier)) diff --git a/server/etcdserver/api/v3rpc/health.go b/server/etcdserver/api/v3rpc/health.go index e87140d17432..44b43f5a8dea 100644 --- a/server/etcdserver/api/v3rpc/health.go +++ b/server/etcdserver/api/v3rpc/health.go @@ -15,6 +15,8 @@ package v3rpc import ( + "sync" + "go.uber.org/zap" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -29,6 +31,7 @@ const ( type notifier interface { defragStarted() defragFinished() + IsDefragActive() bool } func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) notifier { @@ -43,20 +46,33 @@ func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) notifier { } type healthNotifier struct { - hs *health.Server - lg *zap.Logger - + hs *health.Server + lg *zap.Logger + defragMu sync.RWMutex + isDefragActive bool stopGRPCServiceOnDefrag bool } +func (hc *healthNotifier) IsDefragActive() bool { + return hc.isDefragActive +} + func (hc *healthNotifier) defragStarted() { + hc.defragMu.Lock() + defer hc.defragMu.Unlock() + hc.isDefragActive = true if !hc.stopGRPCServiceOnDefrag { return } hc.stopServe("defrag is active") } -func (hc *healthNotifier) defragFinished() { hc.startServe() } +func (hc *healthNotifier) defragFinished() { + hc.defragMu.Lock() + defer hc.defragMu.Unlock() + hc.isDefragActive = false + hc.startServe() +} func (hc *healthNotifier) startServe() { hc.lg.Info( diff --git a/server/etcdserver/health.go b/server/etcdserver/health.go new file mode 100644 index 000000000000..820a88299c64 --- /dev/null +++ b/server/etcdserver/health.go @@ -0,0 +1,26 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +type HealthNotifier interface { + IsDefragActive() bool +} + +func (s *EtcdServer) IsDefragActive() bool { + if s.HealthNtf == nil { + return false + } + return s.HealthNtf.IsDefragActive() +} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 4b40e32bada2..ff9781ac39c9 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -294,6 +294,8 @@ type EtcdServer struct { // Should only be set within apply code path. Used to force snapshot after cluster version downgrade. forceSnapshot bool corruptionChecker CorruptionChecker + + HealthNtf HealthNotifier } // NewServer creates a new EtcdServer from the supplied configuration. The diff --git a/tests/e2e/http_health_check_test.go b/tests/e2e/http_health_check_test.go index 9cc090f2395c..b0c55c76db6b 100644 --- a/tests/e2e/http_health_check_test.go +++ b/tests/e2e/http_health_check_test.go @@ -37,8 +37,9 @@ import ( ) const ( - healthCheckTimeout = 2 * time.Second - putCommandTimeout = 200 * time.Millisecond + healthCheckTimeout = 2 * time.Second + putCommandTimeout = 200 * time.Millisecond + defragCommandTimeout = 200 * time.Millisecond ) type healthCheckConfig struct { @@ -279,6 +280,27 @@ func TestHTTPLivezReadyzHandler(t *testing.T) { }, }, }, + // verify that when defrag is active livez is ok and readyz is not ok. + { + name: "defrag active", + injectFailure: triggerSlowDefrag, + clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(1), e2e.WithGoFailEnabled(true)}, + healthChecks: []healthCheckConfig{ + { + url: "/livez", + expectedStatusCode: http.StatusOK, + }, + { + url: "/readyz", + expectedTimeoutError: true, + expectedStatusCode: http.StatusServiceUnavailable, + expectedRespSubStrings: []string{ + `[-]serializable_read failed: defrag is active`, + `[+]data_corruption ok`, + }, + }, + }, + }, } for _, tc := range tcs { @@ -375,6 +397,11 @@ func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus clus.Procs[0].Etcdctl(e2e.WithAuth("root", "root")).Put(context.Background(), "foo", "bar", config.PutOptions{Timeout: putCommandTimeout}) } +func triggerSlowDefrag(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) { + require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "defragBeforeRename", fmt.Sprintf(`sleep("%s")`, duration))) + clus.Procs[0].Etcdctl().Defragment(ctx, config.DefragOption{Timeout: defragCommandTimeout}) +} + func triggerCorrupt(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) { etcdctl := clus.Procs[0].Etcdctl() for i := 0; i < 10; i++ {