Skip to content

Commit

Permalink
etcdserver: make livez return ok when defrag is active.
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <sizhang@google.com>
  • Loading branch information
siyuanfoundation committed Nov 16, 2023
1 parent d0114cf commit a652d02
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 29 deletions.
46 changes: 39 additions & 7 deletions server/etcdserver/api/etcdhttp/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http"
"path"
"strings"
"sync"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand All @@ -33,6 +34,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/raft/v3"
)

Expand All @@ -52,11 +54,16 @@ type ServerHealth interface {
Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error)
Config() config.ServerConfig
AuthStore() auth.AuthStore
Backend() backend.Backend
}

// HandleHealth registers metrics and health handlers. it checks health by using v3 range request
// and its corresponding timeout.
func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) {
notifier := &healthNotifier{}
if srv.Backend() != nil {
srv.Backend().SubscribeDefragNotifier(notifier)
}
mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms StringSet, serializable bool) Health {
if h := checkAlarms(lg, srv, excludedAlarms); h.Health != "true" {
return h
Expand All @@ -67,8 +74,8 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) {
return checkAPI(ctx, lg, srv, serializable)
}))

installLivezEndpoints(lg, mux, srv)
installReadyzEndpoints(lg, mux, srv)
installLivezEndpoints(lg, mux, srv, notifier)
installReadyzEndpoints(lg, mux, srv, notifier)
}

// NewHealthHandler handles '/health' requests.
Expand Down Expand Up @@ -237,16 +244,16 @@ type CheckRegistry struct {
checks map[string]HealthCheck
}

func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth, notifier *healthNotifier) {
reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)}
reg.Register("serializable_read", serializableReadCheck(server))
reg.Register("serializable_read", serializableReadCheck(server, notifier, true /* skipCheckDuringDefrag */))
reg.InstallHttpEndpoints(lg, mux)
}

func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth, notifier *healthNotifier) {
reg := CheckRegistry{checkType: checkTypeReadyz, checks: make(map[string]HealthCheck)}
reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT))
reg.Register("serializable_read", serializableReadCheck(server))
reg.Register("serializable_read", serializableReadCheck(server, notifier, false /* skipCheckDuringDefrag */))
reg.InstallHttpEndpoints(lg, mux)
}

Expand Down Expand Up @@ -410,8 +417,33 @@ func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) e
}
}

func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error {
type healthNotifier struct {
lock sync.RWMutex
isDefragActive bool
}

func (n *healthNotifier) DefragStarted() {
n.lock.Lock()
defer n.lock.Unlock()
n.isDefragActive = true
}

func (n *healthNotifier) DefragFinished() {
n.lock.Lock()
defer n.lock.Unlock()
n.isDefragActive = false
}

func serializableReadCheck(srv ServerHealth, notifier *healthNotifier, skipCheckDuringDefrag bool) func(ctx context.Context) error {
return func(ctx context.Context) error {
// skips the check if defrag is active.
if skipCheckDuringDefrag && notifier.isDefragActive {
return nil
}
// returns early if defrag is active.
if notifier.isDefragActive {
return fmt.Errorf("defrag is active")
}
ctx = srv.AuthStore().WithRoot(ctx)
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true})
if err != nil {
Expand Down
40 changes: 34 additions & 6 deletions server/etcdserver/api/etcdhttp/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/schema"
)
Expand Down Expand Up @@ -61,6 +62,8 @@ func (s *fakeHealthServer) Leader() types.ID {

func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore }

func (s *fakeHealthServer) Backend() backend.Backend { return nil }

func (s *fakeHealthServer) ClientCertAuthEnabled() bool { return false }

type healthTestCase struct {
Expand All @@ -70,9 +73,10 @@ type healthTestCase struct {
inResult []string
notInResult []string

alarms []*pb.AlarmMember
apiError error
missingLeader bool
alarms []*pb.AlarmMember
apiError error
missingLeader bool
isDefragActive bool
}

func TestHealthHandler(t *testing.T) {
Expand Down Expand Up @@ -190,7 +194,10 @@ func TestHttpSubPath(t *testing.T) {
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
notifier := &healthNotifier{isDefragActive: tt.isDefragActive}
installLivezEndpoints(logger, mux, s, notifier)
installReadyzEndpoints(logger, mux, s, notifier)

ts := httptest.NewServer(mux)
defer ts.Close()
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
Expand Down Expand Up @@ -244,7 +251,10 @@ func TestDataCorruptionCheck(t *testing.T) {
s := &fakeHealthServer{
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
notifier := &healthNotifier{isDefragActive: tt.isDefragActive}
installLivezEndpoints(logger, mux, s, notifier)
installReadyzEndpoints(logger, mux, s, notifier)

ts := httptest.NewServer(mux)
defer ts.Close()
// OK before alarms are activated.
Expand Down Expand Up @@ -280,6 +290,21 @@ func TestSerializableReadCheck(t *testing.T) {
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
},
{
name: "Alive if defrag is active and range api is not available",
healthCheckURL: "/livez",
isDefragActive: true,
apiError: fmt.Errorf("timeout error"),
expectStatusCode: http.StatusOK,
},
{
name: "Not ready if defrag is active and range api is not available",
healthCheckURL: "/readyz",
isDefragActive: true,
apiError: fmt.Errorf("timeout error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: defrag is active"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -289,7 +314,10 @@ func TestSerializableReadCheck(t *testing.T) {
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
notifier := &healthNotifier{isDefragActive: tt.isDefragActive}
installLivezEndpoints(logger, mux, s, notifier)
installReadyzEndpoints(logger, mux, s, notifier)

ts := httptest.NewServer(mux)
defer ts.Close()
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
Expand Down
12 changes: 4 additions & 8 deletions server/etcdserver/api/v3rpc/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@ import (
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/storage/backend"
)

const (
allGRPCServices = ""
)

type notifier interface {
defragStarted()
defragFinished()
}

func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) notifier {
func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) backend.DefragNotifier {
if hs == nil {
panic("unexpected nil gRPC health server")
}
Expand All @@ -49,14 +45,14 @@ type healthNotifier struct {
stopGRPCServiceOnDefrag bool
}

func (hc *healthNotifier) defragStarted() {
func (hc *healthNotifier) DefragStarted() {
if !hc.stopGRPCServiceOnDefrag {
return
}
hc.stopServe("defrag is active")
}

func (hc *healthNotifier) defragFinished() { hc.startServe() }
func (hc *healthNotifier) DefragFinished() { hc.startServe() }

func (hc *healthNotifier) startServe() {
hc.lg.Info(
Expand Down
9 changes: 3 additions & 6 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,19 @@ type maintenanceServer struct {
cs ClusterStatusGetter
d Downgrader
vs serverversion.Server

healthNotifier notifier
}

func NewMaintenanceServer(s *etcdserver.EtcdServer, healthNotifier notifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), healthNotifier: healthNotifier}
func NewMaintenanceServer(s *etcdserver.EtcdServer, healthNotifier backend.DefragNotifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s)}
if srv.lg == nil {
srv.lg = zap.NewNop()
}
s.Backend().SubscribeDefragNotifier(healthNotifier)
return &authMaintenanceServer{srv, &AuthAdmin{s}}
}

func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
ms.lg.Info("starting defragment")
ms.healthNotifier.defragStarted()
defer ms.healthNotifier.defragFinished()
err := ms.bg.Backend().Defrag()
if err != nil {
ms.lg.Warn("failed to defragment", zap.Error(err))
Expand Down
34 changes: 34 additions & 0 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type Backend interface {

// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
SetTxPostLockInsideApplyHook(func())

SubscribeDefragNotifier(notifier DefragNotifier)
}

type Snapshot interface {
Expand All @@ -82,6 +84,11 @@ type Snapshot interface {
Close() error
}

type DefragNotifier interface {
DefragStarted()
DefragFinished()
}

type txReadBufferCache struct {
mu sync.Mutex
buf *txReadBuffer
Expand Down Expand Up @@ -127,6 +134,9 @@ type backend struct {
txPostLockInsideApplyHook func()

lg *zap.Logger

defragNotifiers []DefragNotifier
notifierMu sync.RWMutex
}

type BackendConfig struct {
Expand Down Expand Up @@ -445,6 +455,27 @@ func (b *backend) Defrag() error {
return b.defrag()
}

func (b *backend) SubscribeDefragNotifier(notifier DefragNotifier) {
if notifier == nil {
return
}
b.notifierMu.Lock()
defer b.notifierMu.Unlock()
b.defragNotifiers = append(b.defragNotifiers, notifier)
}

func (b *backend) defragStarted() {
for _, notifier := range b.defragNotifiers {
notifier.DefragStarted()
}
}

func (b *backend) defragFinished() {
for _, notifier := range b.defragNotifiers {
notifier.DefragFinished()
}
}

func (b *backend) defrag() error {
now := time.Now()
isDefragActive.Set(1)
Expand All @@ -459,6 +490,9 @@ func (b *backend) defrag() error {
// lock database after lock tx to avoid deadlock.
b.mu.Lock()
defer b.mu.Unlock()
// send notifications after acquiring the lock.
b.defragStarted()
defer b.defragFinished()

// block concurrent read requests while resetting tx
b.readTx.Lock()
Expand Down
1 change: 1 addition & 0 deletions server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,7 @@ func (b *fakeBackend) ForceCommit()
func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil }
func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {}
func (b *fakeBackend) SubscribeDefragNotifier(backend.DefragNotifier) {}

type indexGetResp struct {
rev Revision
Expand Down
31 changes: 29 additions & 2 deletions tests/e2e/http_health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ import (
)

const (
healthCheckTimeout = 2 * time.Second
putCommandTimeout = 200 * time.Millisecond
healthCheckTimeout = 2 * time.Second
putCommandTimeout = 200 * time.Millisecond
defragCommandTimeout = 200 * time.Millisecond
)

type healthCheckConfig struct {
Expand Down Expand Up @@ -279,6 +280,27 @@ func TestHTTPLivezReadyzHandler(t *testing.T) {
},
},
},
// verify that when defrag is active livez is ok and readyz is not ok.
{
name: "defrag active",
injectFailure: triggerSlowDefrag,
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(1), e2e.WithGoFailEnabled(true)},
healthChecks: []healthCheckConfig{
{
url: "/livez",
expectedStatusCode: http.StatusOK,
},
{
url: "/readyz",
expectedTimeoutError: true,
expectedStatusCode: http.StatusServiceUnavailable,
expectedRespSubStrings: []string{
`[-]serializable_read failed: defrag is active`,
`[+]data_corruption ok`,
},
},
},
},
}

for _, tc := range tcs {
Expand Down Expand Up @@ -375,6 +397,11 @@ func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus
clus.Procs[0].Etcdctl(e2e.WithAuth("root", "root")).Put(context.Background(), "foo", "bar", config.PutOptions{Timeout: putCommandTimeout})
}

func triggerSlowDefrag(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) {
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "defragBeforeRename", fmt.Sprintf(`sleep("%s")`, duration)))
clus.Procs[0].Etcdctl().Defragment(ctx, config.DefragOption{Timeout: defragCommandTimeout})
}

func triggerCorrupt(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) {
etcdctl := clus.Procs[0].Etcdctl()
for i := 0; i < 10; i++ {
Expand Down

0 comments on commit a652d02

Please sign in to comment.