Skip to content

Commit

Permalink
tools: improve flow update (#7791)
Browse files Browse the repository at this point in the history
ref #7703

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Feb 4, 2024
1 parent 6d25ef1 commit 4975890
Show file tree
Hide file tree
Showing 16 changed files with 279 additions and 61 deletions.
1 change: 1 addition & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const (
ClusterStatus = "/pd/api/v1/cluster/status"
Status = "/pd/api/v1/status"
Version = "/pd/api/v1/version"
operators = "/pd/api/v1/operators"
// Micro Service
microServicePrefix = "/pd/api/v2/ms"
)
Expand Down
9 changes: 9 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type Client interface {
GetPDVersion(context.Context) (string, error)
/* Micro Service interfaces */
GetMicroServiceMembers(context.Context, string) ([]string, error)
DeleteOperators(context.Context) error

/* Client-related methods */
// WithCallerID sets and returns a new client with the given caller ID.
Expand Down Expand Up @@ -879,3 +880,11 @@ func (c *client) GetPDVersion(ctx context.Context) (string, error) {
WithResp(&ver))
return ver.Version, err
}

// DeleteOperators deletes the running operators.
func (c *client) DeleteOperators(ctx context.Context) error {
return c.request(ctx, newRequestInfo().
WithName(deleteOperators).
WithURI(operators).
WithMethod(http.MethodDelete))
}
1 change: 1 addition & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const (
resetBaseAllocIDName = "ResetBaseAllocID"
setSnapshotRecoveringMarkName = "SetSnapshotRecoveringMark"
deleteSnapshotRecoveringMarkName = "DeleteSnapshotRecoveringMark"
deleteOperators = "DeleteOperators"
)

type requestInfo struct {
Expand Down
12 changes: 6 additions & 6 deletions pkg/mcs/resourcemanager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func changeLogLevel(c *gin.Context) {
// @Success 200 {string} string "Success"
// @Failure 400 {string} error
// @Failure 500 {string} error
// @Router /config/group [POST]
// @Router /config/group [post]
func (s *Service) postResourceGroup(c *gin.Context) {
var group rmpb.ResourceGroup
if err := c.ShouldBindJSON(&group); err != nil {
Expand Down Expand Up @@ -181,7 +181,7 @@ func (s *Service) putResourceGroup(c *gin.Context) {
// @Failure 404 {string} error
// @Param name path string true "groupName"
// @Param with_stats query bool false "whether to return statistics data."
// @Router /config/group/{name} [GET]
// @Router /config/group/{name} [get]
func (s *Service) getResourceGroup(c *gin.Context) {
withStats := strings.EqualFold(c.Query("with_stats"), "true")
group := s.manager.GetResourceGroup(c.Param("name"), withStats)
Expand All @@ -198,7 +198,7 @@ func (s *Service) getResourceGroup(c *gin.Context) {
// @Success 200 {string} json format of []rmserver.ResourceGroup
// @Failure 404 {string} error
// @Param with_stats query bool false "whether to return statistics data."
// @Router /config/groups [GET]
// @Router /config/groups [get]
func (s *Service) getResourceGroupList(c *gin.Context) {
withStats := strings.EqualFold(c.Query("with_stats"), "true")
groups := s.manager.GetResourceGroupList(withStats)
Expand All @@ -212,7 +212,7 @@ func (s *Service) getResourceGroupList(c *gin.Context) {
// @Param name path string true "Name of the resource group to be deleted"
// @Success 200 {string} string "Success!"
// @Failure 404 {string} error
// @Router /config/group/{name} [DELETE]
// @Router /config/group/{name} [delete]
func (s *Service) deleteResourceGroup(c *gin.Context) {
if err := s.manager.DeleteResourceGroup(c.Param("name")); err != nil {
c.String(http.StatusNotFound, err.Error())
Expand All @@ -226,7 +226,7 @@ func (s *Service) deleteResourceGroup(c *gin.Context) {
// @Summary Get the resource controller config.
// @Success 200 {string} json format of rmserver.ControllerConfig
// @Failure 400 {string} error
// @Router /config/controller [GET]
// @Router /config/controller [get]
func (s *Service) getControllerConfig(c *gin.Context) {
config := s.manager.GetControllerConfig()
c.IndentedJSON(http.StatusOK, config)
Expand All @@ -239,7 +239,7 @@ func (s *Service) getControllerConfig(c *gin.Context) {
// @Param config body object true "json params, rmserver.ControllerConfig"
// @Success 200 {string} string "Success!"
// @Failure 400 {string} error
// @Router /config/controller [POST]
// @Router /config/controller [post]
func (s *Service) setControllerConfig(c *gin.Context) {
conf := make(map[string]any)
if err := c.ShouldBindJSON(&conf); err != nil {
Expand Down
21 changes: 19 additions & 2 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (s *Service) RegisterOperatorsRouter() {
router := s.root.Group("operators")
router.GET("", getOperators)
router.POST("", createOperator)
router.DELETE("", deleteOperators)
router.GET("/:id", getOperatorByRegion)
router.DELETE("/:id", deleteOperatorByRegion)
router.GET("/records", getOperatorRecords)
Expand Down Expand Up @@ -307,7 +308,7 @@ func deleteRegionCacheByID(c *gin.Context) {
// @Success 200 {object} operator.OpWithStatus
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators/{id} [GET]
// @Router /operators/{id} [get]
func getOperatorByRegion(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
id := c.Param("id")
Expand All @@ -334,7 +335,7 @@ func getOperatorByRegion(c *gin.Context) {
// @Produce json
// @Success 200 {array} operator.Operator
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators [GET]
// @Router /operators [get]
func getOperators(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
var (
Expand Down Expand Up @@ -365,6 +366,22 @@ func getOperators(c *gin.Context) {
}
}

// @Tags operators
// @Summary Delete operators.
// @Produce json
// @Success 200 {string} string "All pending operator are canceled."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators [delete]
func deleteOperators(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
if err := handler.RemoveOperators(); err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

c.String(http.StatusOK, "All pending operator are canceled.")
}

// @Tags operator
// @Summary Cancel a Region's pending operator.
// @Param region_id path int true "A Region's Id"
Expand Down
11 changes: 11 additions & 0 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@ func (h *Handler) RemoveOperator(regionID uint64) error {
return nil
}

// RemoveOperators removes the all operators.
func (h *Handler) RemoveOperators() error {
c, err := h.GetOperatorController()
if err != nil {
return err
}

c.RemoveOperators(operator.AdminStop)
return nil
}

// GetOperators returns the running operators.
func (h *Handler) GetOperators() ([]*operator.Operator, error) {
c, err := h.GetOperatorController()
Expand Down
35 changes: 35 additions & 0 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,41 @@ func (oc *Controller) ack(op *Operator) {
}
}

// RemoveOperators removes all operators from the running operators.
func (oc *Controller) RemoveOperators(reasons ...CancelReasonType) {
oc.Lock()
removed := oc.removeOperatorsLocked()
oc.Unlock()
var cancelReason CancelReasonType
if len(reasons) > 0 {
cancelReason = reasons[0]
}
for _, op := range removed {
if op.Cancel(cancelReason) {
log.Info("operator removed",
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op))
}
oc.buryOperator(op)
}
}

func (oc *Controller) removeOperatorsLocked() []*Operator {
var removed []*Operator
for regionID, op := range oc.operators {
delete(oc.operators, regionID)
operatorCounter.WithLabelValues(op.Desc(), "remove").Inc()
oc.ack(op)
if op.Kind()&OpMerge != 0 {
oc.removeRelatedMergeOperator(op)
}
removed = append(removed, op)
}
oc.updateCounts(oc.operators)
return removed
}

// RemoveOperator removes an operator from the running operators.
func (oc *Controller) RemoveOperator(op *Operator, reasons ...CancelReasonType) bool {
oc.Lock()
Expand Down
7 changes: 7 additions & 0 deletions pkg/utils/testutil/api_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ func StringContain(re *require.Assertions, sub string) func([]byte, int, http.He
}
}

// StringNotContain is used to check whether response context doesn't contain given string.
func StringNotContain(re *require.Assertions, sub string) func([]byte, int, http.Header) {
return func(resp []byte, _ int, _ http.Header) {
re.NotContains(string(resp), sub, "resp: "+string(resp))
}
}

// StringEqual is used to check whether response context equal given string.
func StringEqual(re *require.Assertions, str string) func([]byte, int, http.Header) {
return func(resp []byte, _ int, _ http.Header) {
Expand Down
15 changes: 15 additions & 0 deletions server/api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,21 @@ func (h *operatorHandler) GetOperators(w http.ResponseWriter, r *http.Request) {
}
}

// @Tags operator
// @Summary Cancel all pending operators.
// @Produce json
// @Success 200 {string} string "All pending operators are canceled."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators [delete]
func (h *operatorHandler) DeleteOperators(w http.ResponseWriter, r *http.Request) {
if err := h.RemoveOperators(); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.r.JSON(w, http.StatusOK, "All pending operators are canceled.")
}

// FIXME: details of input json body params
// @Tags operator
// @Summary Create an operator.
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
operatorHandler := newOperatorHandler(handler, rd)
registerFunc(apiRouter, "/operators", operatorHandler.GetOperators, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(apiRouter, "/operators", operatorHandler.CreateOperator, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/operators", operatorHandler.DeleteOperators, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/operators/records", operatorHandler.GetOperatorRecords, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(apiRouter, "/operators/{region_id}", operatorHandler.GetOperatorsByRegion, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(apiRouter, "/operators/{region_id}", operatorHandler.DeleteOperatorByRegion, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
Expand Down
4 changes: 2 additions & 2 deletions server/api/service_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (h *serviceMiddlewareHandler) updateAudit(config *config.ServiceMiddlewareC
// @Success 200 {string} string
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "config item not found"
// @Router /service-middleware/config/rate-limit [POST]
// @Router /service-middleware/config/rate-limit [post]
func (h *serviceMiddlewareHandler) SetRateLimitConfig(w http.ResponseWriter, r *http.Request) {
var input map[string]any
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
Expand Down Expand Up @@ -236,7 +236,7 @@ func (h *serviceMiddlewareHandler) SetRateLimitConfig(w http.ResponseWriter, r *
// @Success 200 {string} string
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "config item not found"
// @Router /service-middleware/config/grpc-rate-limit [POST]
// @Router /service-middleware/config/grpc-rate-limit [post]
func (h *serviceMiddlewareHandler) SetGRPCRateLimitConfig(w http.ResponseWriter, r *http.Request) {
var input map[string]any
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions server/api/unsafe_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func newUnsafeOperationHandler(svr *server.Server, rd *render.Render) *unsafeOpe
// Success 200 {string} string "Request has been accepted."
// Failure 400 {string} string "The input is invalid."
// Failure 500 {string} string "PD server failed to proceed the request."
// @Router /admin/unsafe/remove-failed-stores [POST]
// @Router /admin/unsafe/remove-failed-stores [post]
func (h *unsafeOperationHandler) RemoveFailedStores(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
var input map[string]any
Expand Down Expand Up @@ -81,7 +81,7 @@ func (h *unsafeOperationHandler) RemoveFailedStores(w http.ResponseWriter, r *ht
// @Summary Show the current status of failed stores removal.
// @Produce json
// Success 200 {object} []StageOutput
// @Router /admin/unsafe/remove-failed-stores/show [GET]
// @Router /admin/unsafe/remove-failed-stores/show [get]
func (h *unsafeOperationHandler) GetFailedStoresRemovalStatus(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
h.rd.JSON(w, http.StatusOK, rc.GetUnsafeRecoveryController().Show())
Expand Down
53 changes: 53 additions & 0 deletions tests/server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,3 +624,56 @@ func (suite *operatorTestSuite) pauseRuleChecker(re *require.Assertions, cluster
re.NoError(err)
re.True(resp["paused"].(bool))
}

func (suite *operatorTestSuite) TestRemoveOperators() {
suite.env.RunTestInTwoModes(suite.checkRemoveOperators)
}

func (suite *operatorTestSuite) checkRemoveOperators(cluster *tests.TestCluster) {
re := suite.Require()
stores := []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 2,
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 4,
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
LastHeartbeat: time.Now().UnixNano(),
},
}

for _, store := range stores {
tests.MustPutStore(re, cluster, store)
}

suite.pauseRuleChecker(re, cluster)
r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1))
tests.MustPutRegionInfo(re, cluster, r1)
r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3))
tests.MustPutRegionInfo(re, cluster, r2)
r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2))
tests.MustPutRegionInfo(re, cluster, r3)

urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr())
err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 10, "target_region_id": 20}`), tu.StatusOK(re))
re.NoError(err)
err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"add-peer", "region_id": 30, "store_id": 4}`), tu.StatusOK(re))
re.NoError(err)
url := fmt.Sprintf("%s/operators", urlPrefix)
err = tu.CheckGetJSON(testDialClient, url, nil, tu.StatusOK(re), tu.StringContain(re, "merge: region 10 to 20"), tu.StringContain(re, "add peer: store [4]"))
re.NoError(err)
err = tu.CheckDelete(testDialClient, url, tu.StatusOK(re))
re.NoError(err)
err = tu.CheckGetJSON(testDialClient, url, nil, tu.StatusOK(re), tu.StringNotContain(re, "merge: region 10 to 20"), tu.StringNotContain(re, "add peer: store [4]"))
re.NoError(err)
}
1 change: 0 additions & 1 deletion tools/pd-heartbeat-bench/config-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ round = 0
store-count = 100
region-count = 2000000

key-length = 56
replica = 3

leader-update-ratio = 0.06
Expand Down
Loading

0 comments on commit 4975890

Please sign in to comment.