Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: make livez return ok when defrag is active. #16858

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions server/etcdserver/api/etcdhttp/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http"
"path"
"strings"
"sync"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand All @@ -33,6 +34,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/raft/v3"
)

Expand All @@ -52,11 +54,16 @@ type ServerHealth interface {
Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error)
Config() config.ServerConfig
AuthStore() auth.AuthStore
Backend() backend.Backend
}

// HandleHealth registers metrics and health handlers. it checks health by using v3 range request
// and its corresponding timeout.
func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) {
notifier := &healthNotifier{}
if srv.Backend() != nil {
srv.Backend().SubscribeDefragNotifier(notifier)
}
mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms StringSet, serializable bool) Health {
if h := checkAlarms(lg, srv, excludedAlarms); h.Health != "true" {
return h
Expand All @@ -67,8 +74,8 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) {
return checkAPI(ctx, lg, srv, serializable)
}))

installLivezEndpoints(lg, mux, srv)
installReadyzEndpoints(lg, mux, srv)
installLivezEndpoints(lg, mux, srv, notifier)
installReadyzEndpoints(lg, mux, srv, notifier)
}

// NewHealthHandler handles '/health' requests.
Expand Down Expand Up @@ -237,16 +244,16 @@ type CheckRegistry struct {
checks map[string]HealthCheck
}

func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth, notifier *healthNotifier) {
reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)}
reg.Register("serializable_read", serializableReadCheck(server))
reg.Register("serializable_read", serializableReadCheck(server, notifier, true /* skipCheckDuringDefrag */))
reg.InstallHttpEndpoints(lg, mux)
}

func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth, notifier *healthNotifier) {
reg := CheckRegistry{checkType: checkTypeReadyz, checks: make(map[string]HealthCheck)}
reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT))
reg.Register("serializable_read", serializableReadCheck(server))
reg.Register("serializable_read", serializableReadCheck(server, notifier, false /* skipCheckDuringDefrag */))
reg.InstallHttpEndpoints(lg, mux)
}

Expand Down Expand Up @@ -410,8 +417,33 @@ func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) e
}
}

func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error {
type healthNotifier struct {
lock sync.RWMutex
isDefragActive bool
}

func (n *healthNotifier) DefragStarted() {
n.lock.Lock()
defer n.lock.Unlock()
n.isDefragActive = true
}

func (n *healthNotifier) DefragFinished() {
n.lock.Lock()
defer n.lock.Unlock()
n.isDefragActive = false
}

func serializableReadCheck(srv ServerHealth, notifier *healthNotifier, skipCheckDuringDefrag bool) func(ctx context.Context) error {
return func(ctx context.Context) error {
// skips the check if defrag is active.
if skipCheckDuringDefrag && notifier.isDefragActive {
return nil
}
// returns early if defrag is active.
if notifier.isDefragActive {
return fmt.Errorf("defrag is active")
}
ctx = srv.AuthStore().WithRoot(ctx)
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true})
if err != nil {
Expand Down
40 changes: 34 additions & 6 deletions server/etcdserver/api/etcdhttp/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/schema"
)
Expand Down Expand Up @@ -61,6 +62,8 @@ func (s *fakeHealthServer) Leader() types.ID {

func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore }

func (s *fakeHealthServer) Backend() backend.Backend { return nil }

func (s *fakeHealthServer) ClientCertAuthEnabled() bool { return false }

type healthTestCase struct {
Expand All @@ -70,9 +73,10 @@ type healthTestCase struct {
inResult []string
notInResult []string

alarms []*pb.AlarmMember
apiError error
missingLeader bool
alarms []*pb.AlarmMember
apiError error
missingLeader bool
isDefragActive bool
}

func TestHealthHandler(t *testing.T) {
Expand Down Expand Up @@ -190,7 +194,10 @@ func TestHttpSubPath(t *testing.T) {
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
notifier := &healthNotifier{isDefragActive: tt.isDefragActive}
installLivezEndpoints(logger, mux, s, notifier)
installReadyzEndpoints(logger, mux, s, notifier)

ts := httptest.NewServer(mux)
defer ts.Close()
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
Expand Down Expand Up @@ -244,7 +251,10 @@ func TestDataCorruptionCheck(t *testing.T) {
s := &fakeHealthServer{
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
notifier := &healthNotifier{isDefragActive: tt.isDefragActive}
installLivezEndpoints(logger, mux, s, notifier)
installReadyzEndpoints(logger, mux, s, notifier)

ts := httptest.NewServer(mux)
defer ts.Close()
// OK before alarms are activated.
Expand Down Expand Up @@ -280,6 +290,21 @@ func TestSerializableReadCheck(t *testing.T) {
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
},
{
name: "Alive if defrag is active and range api is not available",
healthCheckURL: "/livez",
isDefragActive: true,
apiError: fmt.Errorf("timeout error"),
expectStatusCode: http.StatusOK,
},
{
name: "Not ready if defrag is active and range api is not available",
healthCheckURL: "/readyz",
isDefragActive: true,
apiError: fmt.Errorf("timeout error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: defrag is active"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -289,7 +314,10 @@ func TestSerializableReadCheck(t *testing.T) {
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
notifier := &healthNotifier{isDefragActive: tt.isDefragActive}
installLivezEndpoints(logger, mux, s, notifier)
installReadyzEndpoints(logger, mux, s, notifier)

ts := httptest.NewServer(mux)
defer ts.Close()
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
Expand Down
12 changes: 4 additions & 8 deletions server/etcdserver/api/v3rpc/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@ import (
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/storage/backend"
)

const (
allGRPCServices = ""
)

type notifier interface {
defragStarted()
defragFinished()
}

func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) notifier {
func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) backend.DefragNotifier {
if hs == nil {
panic("unexpected nil gRPC health server")
}
Expand All @@ -49,14 +45,14 @@ type healthNotifier struct {
stopGRPCServiceOnDefrag bool
}

func (hc *healthNotifier) defragStarted() {
func (hc *healthNotifier) DefragStarted() {
if !hc.stopGRPCServiceOnDefrag {
return
}
hc.stopServe("defrag is active")
}

func (hc *healthNotifier) defragFinished() { hc.startServe() }
func (hc *healthNotifier) DefragFinished() { hc.startServe() }

func (hc *healthNotifier) startServe() {
hc.lg.Info(
Expand Down
9 changes: 3 additions & 6 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,19 @@ type maintenanceServer struct {
cs ClusterStatusGetter
d Downgrader
vs serverversion.Server

healthNotifier notifier
}

func NewMaintenanceServer(s *etcdserver.EtcdServer, healthNotifier notifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), healthNotifier: healthNotifier}
func NewMaintenanceServer(s *etcdserver.EtcdServer, healthNotifier backend.DefragNotifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s)}
if srv.lg == nil {
srv.lg = zap.NewNop()
}
s.Backend().SubscribeDefragNotifier(healthNotifier)
return &authMaintenanceServer{srv, &AuthAdmin{s}}
}

func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
ms.lg.Info("starting defragment")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why move notification from here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original notifier is called in grpc server.

  1. It is strange to have a http endpoint depend on grpc server calls. The cluster could be started with grpcEnabled=false.
  2. There could be 2 grpc servers, 1 for insecure and 1 for secure. Which one should be used to determine the http endpoint state?
    Based on these reasons, I think it is better to move the notifiers to the backend.

Copy link
Member

@serathius serathius Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but this is unrelated an change. Still it doesn't solve the issue that there might be multiple calls to Defrag, that will be blocked on somewhere in internals. This still leaves us open to concurrency issue. Let's just use the notifier as it is and do a followup issue to fix notifier.

Copy link
Contributor Author

@siyuanfoundation siyuanfoundation Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh, it is very gnarly to pass notifiers between http and grpc, something like https://github.com/etcd-io/etcd/compare/main...siyuanfoundation:etcd:defrag2?expand=1.

How about I start a PR #16959 to move the notifiers to the backend first?

ms.healthNotifier.defragStarted()
defer ms.healthNotifier.defragFinished()
err := ms.bg.Backend().Defrag()
if err != nil {
ms.lg.Warn("failed to defragment", zap.Error(err))
Expand Down
34 changes: 34 additions & 0 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type Backend interface {

// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
SetTxPostLockInsideApplyHook(func())

SubscribeDefragNotifier(notifier DefragNotifier)
}

type Snapshot interface {
Expand All @@ -82,6 +84,11 @@ type Snapshot interface {
Close() error
}

type DefragNotifier interface {
DefragStarted()
DefragFinished()
}

type txReadBufferCache struct {
mu sync.Mutex
buf *txReadBuffer
Expand Down Expand Up @@ -127,6 +134,9 @@ type backend struct {
txPostLockInsideApplyHook func()

lg *zap.Logger

defragNotifiers []DefragNotifier
notifierMu sync.RWMutex
}

type BackendConfig struct {
Expand Down Expand Up @@ -445,6 +455,27 @@ func (b *backend) Defrag() error {
return b.defrag()
}

func (b *backend) SubscribeDefragNotifier(notifier DefragNotifier) {
if notifier == nil {
return
}
b.notifierMu.Lock()
defer b.notifierMu.Unlock()
b.defragNotifiers = append(b.defragNotifiers, notifier)
}

func (b *backend) defragStarted() {
for _, notifier := range b.defragNotifiers {
notifier.DefragStarted()
}
}

func (b *backend) defragFinished() {
for _, notifier := range b.defragNotifiers {
notifier.DefragFinished()
}
}

func (b *backend) defrag() error {
now := time.Now()
isDefragActive.Set(1)
Expand All @@ -459,6 +490,9 @@ func (b *backend) defrag() error {
// lock database after lock tx to avoid deadlock.
b.mu.Lock()
defer b.mu.Unlock()
// send notifications after acquiring the lock.
b.defragStarted()
defer b.defragFinished()

// block concurrent read requests while resetting tx
b.readTx.Lock()
Expand Down
1 change: 1 addition & 0 deletions server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,7 @@ func (b *fakeBackend) ForceCommit()
func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil }
func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {}
func (b *fakeBackend) SubscribeDefragNotifier(backend.DefragNotifier) {}

type indexGetResp struct {
rev Revision
Expand Down
31 changes: 29 additions & 2 deletions tests/e2e/http_health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ import (
)

const (
healthCheckTimeout = 2 * time.Second
putCommandTimeout = 200 * time.Millisecond
healthCheckTimeout = 2 * time.Second
putCommandTimeout = 200 * time.Millisecond
defragCommandTimeout = 200 * time.Millisecond
)

type healthCheckConfig struct {
Expand Down Expand Up @@ -279,6 +280,27 @@ func TestHTTPLivezReadyzHandler(t *testing.T) {
},
},
},
// verify that when defrag is active livez is ok and readyz is not ok.
{
name: "defrag active",
injectFailure: triggerSlowDefrag,
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(1), e2e.WithGoFailEnabled(true)},
healthChecks: []healthCheckConfig{
{
url: "/livez",
expectedStatusCode: http.StatusOK,
},
{
url: "/readyz",
expectedTimeoutError: true,
expectedStatusCode: http.StatusServiceUnavailable,
expectedRespSubStrings: []string{
`[-]serializable_read failed: defrag is active`,
`[+]data_corruption ok`,
},
},
siyuanfoundation marked this conversation as resolved.
Show resolved Hide resolved
},
},
}

for _, tc := range tcs {
Expand Down Expand Up @@ -375,6 +397,11 @@ func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus
clus.Procs[0].Etcdctl(e2e.WithAuth("root", "root")).Put(context.Background(), "foo", "bar", config.PutOptions{Timeout: putCommandTimeout})
}

func triggerSlowDefrag(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) {
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "defragBeforeRename", fmt.Sprintf(`sleep("%s")`, duration)))
clus.Procs[0].Etcdctl().Defragment(ctx, config.DefragOption{Timeout: defragCommandTimeout})
}

func triggerCorrupt(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) {
etcdctl := clus.Procs[0].Etcdctl()
for i := 0; i < 10; i++ {
Expand Down
Loading