Skip to content

Commit

Permalink
etcdserver: make livez return ok when defrag is active.
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <sizhang@google.com>
  • Loading branch information
siyuanfoundation committed Nov 14, 2023
1 parent b343231 commit f7a2eef
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 17 deletions.
15 changes: 12 additions & 3 deletions server/etcdserver/api/etcdhttp/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ServerHealth interface {
Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error)
Config() config.ServerConfig
AuthStore() auth.AuthStore
IsDefragActive() bool
}

// HandleHealth registers metrics and health handlers. it checks health by using v3 range request
Expand Down Expand Up @@ -207,14 +208,14 @@ type CheckRegistry struct {

func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{path: "/livez", checks: make(map[string]HealthCheck)}
reg.Register("serializable_read", serializableReadCheck(server))
reg.Register("serializable_read", serializableReadCheck(server, true /* skipCheckDuringDefrag */))
reg.InstallHttpEndpoints(lg, mux)
}

func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{path: "/readyz", checks: make(map[string]HealthCheck)}
reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT))
reg.Register("serializable_read", serializableReadCheck(server))
reg.Register("serializable_read", serializableReadCheck(server, false /* skipCheckDuringDefrag */))
reg.InstallHttpEndpoints(lg, mux)
}

Expand Down Expand Up @@ -355,8 +356,16 @@ func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) e
}
}

func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error {
func serializableReadCheck(srv ServerHealth, skipCheckDuringDefrag bool) func(ctx context.Context) error {
return func(ctx context.Context) error {
// skips the check if defrag is active.
if skipCheckDuringDefrag && srv.IsDefragActive() {
return nil
}
// returns early if defrag is active.
if srv.IsDefragActive() {
return fmt.Errorf("defrag is active")
}
ctx = srv.AuthStore().WithRoot(ctx)
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true})
if err != nil {
Expand Down
38 changes: 30 additions & 8 deletions server/etcdserver/api/etcdhttp/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ import (

type fakeHealthServer struct {
fakeServer
apiError error
missingLeader bool
authStore auth.AuthStore
apiError error
missingLeader bool
authStore auth.AuthStore
isDefragActive bool
}

func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) {
Expand All @@ -58,6 +59,10 @@ func (s *fakeHealthServer) Leader() types.ID {
return types.ID(raft.None)
}

func (s *fakeHealthServer) IsDefragActive() bool {
return s.isDefragActive
}

func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore }

func (s *fakeHealthServer) ClientCertAuthEnabled() bool { return false }
Expand All @@ -69,9 +74,10 @@ type healthTestCase struct {
inResult []string
notInResult []string

alarms []*pb.AlarmMember
apiError error
missingLeader bool
alarms []*pb.AlarmMember
apiError error
missingLeader bool
isDefragActive bool
}

func TestHealthHandler(t *testing.T) {
Expand Down Expand Up @@ -278,14 +284,30 @@ func TestSerializableReadCheck(t *testing.T) {
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
},
{
name: "Alive if defrag is active and range api is not available",
healthCheckURL: "/livez",
isDefragActive: true,
apiError: fmt.Errorf("timeout error"),
expectStatusCode: http.StatusOK,
},
{
name: "Not ready if defrag is active and range api is not available",
healthCheckURL: "/readyz",
isDefragActive: true,
apiError: fmt.Errorf("timeout error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: defrag is active"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
isDefragActive: tt.isDefragActive,
}
HandleHealth(logger, mux, s)
ts := httptest.NewServer(mux)
Expand Down
1 change: 1 addition & 0 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer

hsrv := health.NewServer()
healthNotifier := newHealthNotifier(hsrv, s)
s.HealthNtf = healthNotifier
healthpb.RegisterHealthServer(grpcServer, hsrv)
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, healthNotifier))

Expand Down
24 changes: 20 additions & 4 deletions server/etcdserver/api/v3rpc/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package v3rpc

import (
"sync"

"go.uber.org/zap"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
Expand All @@ -29,6 +31,7 @@ const (
type notifier interface {
defragStarted()
defragFinished()
IsDefragActive() bool
}

func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) notifier {
Expand All @@ -43,20 +46,33 @@ func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) notifier {
}

type healthNotifier struct {
hs *health.Server
lg *zap.Logger

hs *health.Server
lg *zap.Logger
defragMu sync.RWMutex
isDefragActive bool
stopGRPCServiceOnDefrag bool
}

func (hc *healthNotifier) IsDefragActive() bool {
return hc.isDefragActive
}

func (hc *healthNotifier) defragStarted() {
hc.defragMu.Lock()
defer hc.defragMu.Unlock()
hc.isDefragActive = true
if !hc.stopGRPCServiceOnDefrag {
return
}
hc.stopServe("defrag is active")
}

func (hc *healthNotifier) defragFinished() { hc.startServe() }
func (hc *healthNotifier) defragFinished() {
hc.defragMu.Lock()
defer hc.defragMu.Unlock()
hc.isDefragActive = false
hc.startServe()
}

func (hc *healthNotifier) startServe() {
hc.lg.Info(
Expand Down
26 changes: 26 additions & 0 deletions server/etcdserver/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdserver

type HealthNotifier interface {
IsDefragActive() bool
}

func (s *EtcdServer) IsDefragActive() bool {
if s.HealthNtf == nil {
return false
}
return s.HealthNtf.IsDefragActive()
}
2 changes: 2 additions & 0 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ type EtcdServer struct {
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
forceSnapshot bool
corruptionChecker CorruptionChecker

HealthNtf HealthNotifier
}

// NewServer creates a new EtcdServer from the supplied configuration. The
Expand Down
31 changes: 29 additions & 2 deletions tests/e2e/http_health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ import (
)

const (
healthCheckTimeout = 2 * time.Second
putCommandTimeout = 200 * time.Millisecond
healthCheckTimeout = 2 * time.Second
putCommandTimeout = 200 * time.Millisecond
defragCommandTimeout = 200 * time.Millisecond
)

type healthCheckConfig struct {
Expand Down Expand Up @@ -279,6 +280,27 @@ func TestHTTPLivezReadyzHandler(t *testing.T) {
},
},
},
// verify that when defrag is active livez is ok and readyz is not ok.
{
name: "defrag active",
injectFailure: triggerSlowDefrag,
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(1), e2e.WithGoFailEnabled(true)},
healthChecks: []healthCheckConfig{
{
url: "/livez",
expectedStatusCode: http.StatusOK,
},
{
url: "/readyz",
expectedTimeoutError: true,
expectedStatusCode: http.StatusServiceUnavailable,
expectedRespSubStrings: []string{
`[-]serializable_read failed: defrag is active`,
`[+]data_corruption ok`,
},
},
},
},
}

for _, tc := range tcs {
Expand Down Expand Up @@ -375,6 +397,11 @@ func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus
clus.Procs[0].Etcdctl(e2e.WithAuth("root", "root")).Put(context.Background(), "foo", "bar", config.PutOptions{Timeout: putCommandTimeout})
}

func triggerSlowDefrag(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) {
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "defragBeforeRename", fmt.Sprintf(`sleep("%s")`, duration)))
clus.Procs[0].Etcdctl().Defragment(ctx, config.DefragOption{Timeout: defragCommandTimeout})
}

func triggerCorrupt(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) {
etcdctl := clus.Procs[0].Etcdctl()
for i := 0; i < 10; i++ {
Expand Down

0 comments on commit f7a2eef

Please sign in to comment.