Skip to content

Commit

Permalink
add livez/readyz for etcd
Browse files Browse the repository at this point in the history
Change-Id: Ia440a82b2bf3d275b7cd7d88b5a6e86fe9fe1c28

Signed-off-by: Han Kang <hankang@google.com>
Change-Id: Ief9475a92429be58eb7b1f96246bbdb00e996e75
  • Loading branch information
logicalhan committed Jun 5, 2023
1 parent cdbc2c1 commit 22f940c
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 11 deletions.
4 changes: 4 additions & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,8 @@ func (e *Etcd) serveClients() (err error) {
etcdhttp.HandleVersion(mux, e.Server)
etcdhttp.HandleMetrics(mux)
etcdhttp.HandleHealth(e.cfg.logger, mux, e.Server)
etcdhttp.HandleLivez(e.cfg.logger, mux, e.Server)
etcdhttp.HandleReadyz(e.cfg.logger, mux, e.Server)

var gopts []grpc.ServerOption
if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
Expand Down Expand Up @@ -831,6 +833,8 @@ func (e *Etcd) serveMetrics() (err error) {
metricsMux := http.NewServeMux()
etcdhttp.HandleMetrics(metricsMux)
etcdhttp.HandleHealth(e.cfg.logger, metricsMux, e.Server)
etcdhttp.HandleLivez(e.cfg.logger, metricsMux, e.Server)
etcdhttp.HandleReadyz(e.cfg.logger, metricsMux, e.Server)

for _, murl := range e.cfg.ListenMetricsUrls {
tlsInfo := &e.cfg.ClientTLSInfo
Expand Down
49 changes: 39 additions & 10 deletions server/etcdserver/api/etcdhttp/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,19 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"go.etcd.io/raft/v3"

"go.etcd.io/etcd/api/v3/etcdserverpb"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/raft/v3"
)

const (
PathHealth = "/health"
PathLivez = "/livez"
PathReadyz = "/readyz"
PathProxyHealth = "/proxy/health"
)

Expand All @@ -46,33 +49,59 @@ type ServerHealth interface {
// HandleHealth registers metrics and health handlers. it checks health by using v3 range request
// and its corresponding timeout.
func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) {
mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms AlarmSet, serializable bool) Health {
if h := checkAlarms(lg, srv, excludedAlarms); h.Health != "true" {
mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms AlarmSet, serializable bool, endpoint string) Health {
if h := checkAlarms(lg, srv, excludedAlarms, endpoint); h.Health != "true" {
return h
}
if h := checkLeader(lg, srv, serializable); h.Health != "true" {
return h
}
return checkAPI(lg, srv, serializable)
}))
}, PathHealth))
}

// HandleLivez registers metrics and health handlers. it checks health by using v3 range request
// and its corresponding timeout.
func HandleLivez(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) {
mux.Handle(PathLivez, NewHealthHandler(lg, func(excludedAlarms AlarmSet, serializable bool, endpoint string) Health {
if h := checkAlarms(lg, srv, excludedAlarms, endpoint); h.Health != "true" {
return h
}
// TODO(logicalhan) should we require quorum for livez?
return checkAPI(lg, srv, serializable)
}, PathLivez, []string{etcdserverpb.AlarmType_NOSPACE.String()}...))
}

// HandleReadyz registers metrics and health handlers. it checks health by using v3 range request
// and its corresponding timeout.
func HandleReadyz(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) {
mux.Handle(PathReadyz, NewHealthHandler(lg, func(excludedAlarms AlarmSet, serializable bool, endpoint string) Health {
if h := checkAlarms(lg, srv, excludedAlarms, endpoint); h.Health != "true" {
return h
}
return checkAPI(lg, srv, serializable)
}, PathReadyz))
}

// NewHealthHandler handles '/health' requests.
func NewHealthHandler(lg *zap.Logger, hfunc func(excludedAlarms AlarmSet, Serializable bool) Health) http.HandlerFunc {
func NewHealthHandler(lg *zap.Logger, hfunc func(excludedAlarms AlarmSet, Serializable bool, endpoint string) Health, endpoint string, alwaysExclude ...string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.Header().Set("Allow", http.MethodGet)
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
lg.Warn("/health error", zap.Int("status-code", http.StatusMethodNotAllowed))
lg.Warn(fmt.Sprintf("%s error", endpoint), zap.Int("status-code", http.StatusMethodNotAllowed))
return
}
excludedAlarms := getExcludedAlarms(r)
for _, additionalExcludes := range alwaysExclude {
excludedAlarms[additionalExcludes] = struct{}{}
}
// Passing the query parameter "serializable=true" ensures that the
// health of the local etcd is checked vs the health of the cluster.
// This is useful for probes attempting to validate the liveness of
// the etcd process vs readiness of the cluster to serve requests.
serializableFlag := getSerializableFlag(r)
h := hfunc(excludedAlarms, serializableFlag)
h := hfunc(excludedAlarms, serializableFlag, endpoint)
defer func() {
if h.Health == "true" {
healthSuccess.Inc()
Expand All @@ -83,12 +112,12 @@ func NewHealthHandler(lg *zap.Logger, hfunc func(excludedAlarms AlarmSet, Serial
d, _ := json.Marshal(h)
if h.Health != "true" {
http.Error(w, string(d), http.StatusServiceUnavailable)
lg.Warn("/health error", zap.String("output", string(d)), zap.Int("status-code", http.StatusServiceUnavailable))
lg.Warn(fmt.Sprintf("%s error", endpoint), zap.String("output", string(d)), zap.Int("status-code", http.StatusServiceUnavailable))
return
}
w.WriteHeader(http.StatusOK)
w.Write(d)
lg.Debug("/health OK", zap.Int("status-code", http.StatusOK))
lg.Debug(fmt.Sprintf("%s ok", endpoint), zap.Int("status-code", http.StatusOK))
}
}

Expand Down Expand Up @@ -141,7 +170,7 @@ func getSerializableFlag(r *http.Request) bool {

// TODO: etcdserver.ErrNoLeader in health API

func checkAlarms(lg *zap.Logger, srv ServerHealth, excludedAlarms AlarmSet) Health {
func checkAlarms(lg *zap.Logger, srv ServerHealth, excludedAlarms AlarmSet, healthType string) Health {
h := Health{Health: "true"}
as := srv.Alarms()
if len(as) > 0 {
Expand Down
219 changes: 218 additions & 1 deletion server/etcdserver/api/etcdhttp/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import (

"go.uber.org/zap/zaptest"

"go.etcd.io/raft/v3"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/raft/v3"
)

type fakeStats struct{}
Expand Down Expand Up @@ -185,9 +186,225 @@ func TestHealthHandler(t *testing.T) {
}
}

func TestReadyzHandler(t *testing.T) {
// define the input and expected output
// input: alarms, and healthCheckURL
tests := []struct {
name string
alarms []*pb.AlarmMember
healthCheckURL string
apiError error

expectStatusCode int
expectHealth string
}{
{
name: "Healthy if no alarm",
alarms: []*pb.AlarmMember{},
healthCheckURL: "/readyz",
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Unhealthy if NOSPACE alarm is on",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}},
healthCheckURL: "/readyz",
expectStatusCode: http.StatusServiceUnavailable,
expectHealth: "false",
},
{
name: "Healthy if NOSPACE alarm is on and excluded",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}},
healthCheckURL: "/readyz?exclude=NOSPACE",
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Healthy if NOSPACE alarm is excluded",
alarms: []*pb.AlarmMember{},
healthCheckURL: "/readyz?exclude=NOSPACE",
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Healthy if multiple NOSPACE alarms are on and excluded",
alarms: []*pb.AlarmMember{{MemberID: uint64(1), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(2), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(3), Alarm: pb.AlarmType_NOSPACE}},
healthCheckURL: "/readyz?exclude=NOSPACE",
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Unhealthy if NOSPACE alarms is excluded and CORRUPT is on",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}},
healthCheckURL: "/readyz?exclude=NOSPACE",
expectStatusCode: http.StatusServiceUnavailable,
expectHealth: "false",
},
{
name: "Unhealthy if both NOSPACE and CORRUPT are on and excluded",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}},
healthCheckURL: "/readyz?exclude=NOSPACE&exclude=CORRUPT",
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Healthy even if authentication failed",
healthCheckURL: "/readyz",
apiError: auth.ErrUserEmpty,
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Healthy even if authorization failed",
healthCheckURL: "/readyz",
apiError: auth.ErrPermissionDenied,
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Unhealthy if api is not available",
healthCheckURL: "/readyz",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
expectHealth: "false",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := http.NewServeMux()
HandleReadyz(zaptest.NewLogger(t), mux, &fakeHealthServer{
fakeServer: fakeServer{alarms: tt.alarms},
health: tt.expectHealth,
apiError: tt.apiError,
})
ts := httptest.NewServer(mux)
defer ts.Close()

res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+tt.healthCheckURL)})
if err != nil {
t.Errorf("fail serve http request %s %v", tt.healthCheckURL, err)
}
if res == nil {
t.Errorf("got nil http response with http request %s", tt.healthCheckURL)
return
}
if res.StatusCode != tt.expectStatusCode {
t.Errorf("want statusCode %d but got %d", tt.expectStatusCode, res.StatusCode)
}
health, err := parseHealthOutput(res.Body)
if err != nil {
t.Errorf("fail parse health check output %v", err)
}
if health.Health != tt.expectHealth {
t.Errorf("want health %s but got %s", tt.expectHealth, health.Health)
}
})
}
}

func TestLivezHandler(t *testing.T) {
// define the input and expected output
// input: alarms, and healthCheckURL
tests := []struct {
name string
alarms []*pb.AlarmMember
healthCheckURL string
apiError error

expectStatusCode int
expectHealth string
}{
{
name: "Healthy if no alarm",
alarms: []*pb.AlarmMember{},
healthCheckURL: "/livez",
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Unhealthy if NOSPACE alarm is on",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}},
healthCheckURL: "/livez",
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Unhealthy if NOSPACE alarms is excluded and CORRUPT is on",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}},
healthCheckURL: "/livez?exclude=NOSPACE",
expectStatusCode: http.StatusServiceUnavailable,
expectHealth: "false",
},
{
name: "Unhealthy if both NOSPACE and CORRUPT are on and excluded",
alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}},
healthCheckURL: "/livez?exclude=NOSPACE&exclude=CORRUPT",
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Healthy even if authentication failed",
healthCheckURL: "/livez",
apiError: auth.ErrUserEmpty,
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Healthy even if authorization failed",
healthCheckURL: "/livez",
apiError: auth.ErrPermissionDenied,
expectStatusCode: http.StatusOK,
expectHealth: "true",
},
{
name: "Unhealthy if api is not available",
healthCheckURL: "/livez",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
expectHealth: "false",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := http.NewServeMux()
HandleLivez(zaptest.NewLogger(t), mux, &fakeHealthServer{
fakeServer: fakeServer{alarms: tt.alarms},
health: tt.expectHealth,
apiError: tt.apiError,
})
ts := httptest.NewServer(mux)
defer ts.Close()

res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+tt.healthCheckURL)})
if err != nil {
t.Errorf("fail serve http request %s %v", tt.healthCheckURL, err)
}
if res == nil {
t.Errorf("got nil http response with http request %s", tt.healthCheckURL)
return
}
if res.StatusCode != tt.expectStatusCode {
t.Errorf("want statusCode %d but got %d", tt.expectStatusCode, res.StatusCode)
}

health, err := parseHealthOutput(res.Body)

if err != nil {
t.Errorf("fail parse health check output %v", err)
}
if health.Health != tt.expectHealth {
t.Errorf("want health %s but got %s", tt.expectHealth, health.Health)
}
})
}
}

func parseHealthOutput(body io.Reader) (Health, error) {
obj := Health{}
d, derr := io.ReadAll(body)
println(string(d))
if derr != nil {
return obj, derr
}
Expand Down
2 changes: 2 additions & 0 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,8 @@ func (m *Member) Launch() error {
etcdhttp.HandleVersion(handler, m.Server)
etcdhttp.HandleMetrics(handler)
etcdhttp.HandleHealth(m.Logger, handler, m.Server)
etcdhttp.HandleLivez(m.Logger, handler, m.Server)
etcdhttp.HandleReadyz(m.Logger, handler, m.Server)
hs := &httptest.Server{
Listener: ln,
Config: &http.Server{
Expand Down

0 comments on commit 22f940c

Please sign in to comment.