diff --git a/client/http/api.go b/client/http/api.go index bd0bf830a1f..2cd7fee89c2 100644 --- a/client/http/api.go +++ b/client/http/api.go @@ -77,6 +77,7 @@ const ( ClusterStatus = "/pd/api/v1/cluster/status" Status = "/pd/api/v1/status" Version = "/pd/api/v1/version" + operators = "/pd/api/v1/operators" // Micro Service microServicePrefix = "/pd/api/v2/ms" ) diff --git a/client/http/interface.go b/client/http/interface.go index c9b212f565f..47bf5cc58e1 100644 --- a/client/http/interface.go +++ b/client/http/interface.go @@ -92,6 +92,7 @@ type Client interface { GetPDVersion(context.Context) (string, error) /* Micro Service interfaces */ GetMicroServiceMembers(context.Context, string) ([]string, error) + DeleteOperators(context.Context) error /* Client-related methods */ // WithCallerID sets and returns a new client with the given caller ID. @@ -879,3 +880,11 @@ func (c *client) GetPDVersion(ctx context.Context) (string, error) { WithResp(&ver)) return ver.Version, err } + +// DeleteOperators deletes the running operators. +func (c *client) DeleteOperators(ctx context.Context) error { + return c.request(ctx, newRequestInfo(). + WithName(deleteOperators). + WithURI(operators). + WithMethod(http.MethodDelete)) +} diff --git a/client/http/request_info.go b/client/http/request_info.go index 132841ffb0b..e5e8f57724c 100644 --- a/client/http/request_info.go +++ b/client/http/request_info.go @@ -75,6 +75,7 @@ const ( resetBaseAllocIDName = "ResetBaseAllocID" setSnapshotRecoveringMarkName = "SetSnapshotRecoveringMark" deleteSnapshotRecoveringMarkName = "DeleteSnapshotRecoveringMark" + deleteOperators = "DeleteOperators" ) type requestInfo struct { diff --git a/pkg/mcs/resourcemanager/server/apis/v1/api.go b/pkg/mcs/resourcemanager/server/apis/v1/api.go index 64e2f9edb47..3fd94e637d0 100644 --- a/pkg/mcs/resourcemanager/server/apis/v1/api.go +++ b/pkg/mcs/resourcemanager/server/apis/v1/api.go @@ -137,7 +137,7 @@ func changeLogLevel(c *gin.Context) { // @Success 200 {string} string "Success" // @Failure 400 {string} error // @Failure 500 {string} error -// @Router /config/group [POST] +// @Router /config/group [post] func (s *Service) postResourceGroup(c *gin.Context) { var group rmpb.ResourceGroup if err := c.ShouldBindJSON(&group); err != nil { @@ -181,7 +181,7 @@ func (s *Service) putResourceGroup(c *gin.Context) { // @Failure 404 {string} error // @Param name path string true "groupName" // @Param with_stats query bool false "whether to return statistics data." -// @Router /config/group/{name} [GET] +// @Router /config/group/{name} [get] func (s *Service) getResourceGroup(c *gin.Context) { withStats := strings.EqualFold(c.Query("with_stats"), "true") group := s.manager.GetResourceGroup(c.Param("name"), withStats) @@ -198,7 +198,7 @@ func (s *Service) getResourceGroup(c *gin.Context) { // @Success 200 {string} json format of []rmserver.ResourceGroup // @Failure 404 {string} error // @Param with_stats query bool false "whether to return statistics data." -// @Router /config/groups [GET] +// @Router /config/groups [get] func (s *Service) getResourceGroupList(c *gin.Context) { withStats := strings.EqualFold(c.Query("with_stats"), "true") groups := s.manager.GetResourceGroupList(withStats) @@ -212,7 +212,7 @@ func (s *Service) getResourceGroupList(c *gin.Context) { // @Param name path string true "Name of the resource group to be deleted" // @Success 200 {string} string "Success!" // @Failure 404 {string} error -// @Router /config/group/{name} [DELETE] +// @Router /config/group/{name} [delete] func (s *Service) deleteResourceGroup(c *gin.Context) { if err := s.manager.DeleteResourceGroup(c.Param("name")); err != nil { c.String(http.StatusNotFound, err.Error()) @@ -226,7 +226,7 @@ func (s *Service) deleteResourceGroup(c *gin.Context) { // @Summary Get the resource controller config. // @Success 200 {string} json format of rmserver.ControllerConfig // @Failure 400 {string} error -// @Router /config/controller [GET] +// @Router /config/controller [get] func (s *Service) getControllerConfig(c *gin.Context) { config := s.manager.GetControllerConfig() c.IndentedJSON(http.StatusOK, config) @@ -239,7 +239,7 @@ func (s *Service) getControllerConfig(c *gin.Context) { // @Param config body object true "json params, rmserver.ControllerConfig" // @Success 200 {string} string "Success!" // @Failure 400 {string} error -// @Router /config/controller [POST] +// @Router /config/controller [post] func (s *Service) setControllerConfig(c *gin.Context) { conf := make(map[string]any) if err := c.ShouldBindJSON(&conf); err != nil { diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 73a2bbed4b7..36451e5f031 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -164,6 +164,7 @@ func (s *Service) RegisterOperatorsRouter() { router := s.root.Group("operators") router.GET("", getOperators) router.POST("", createOperator) + router.DELETE("", deleteOperators) router.GET("/:id", getOperatorByRegion) router.DELETE("/:id", deleteOperatorByRegion) router.GET("/records", getOperatorRecords) @@ -307,7 +308,7 @@ func deleteRegionCacheByID(c *gin.Context) { // @Success 200 {object} operator.OpWithStatus // @Failure 400 {string} string "The input is invalid." // @Failure 500 {string} string "PD server failed to proceed the request." -// @Router /operators/{id} [GET] +// @Router /operators/{id} [get] func getOperatorByRegion(c *gin.Context) { handler := c.MustGet(handlerKey).(*handler.Handler) id := c.Param("id") @@ -334,7 +335,7 @@ func getOperatorByRegion(c *gin.Context) { // @Produce json // @Success 200 {array} operator.Operator // @Failure 500 {string} string "PD server failed to proceed the request." -// @Router /operators [GET] +// @Router /operators [get] func getOperators(c *gin.Context) { handler := c.MustGet(handlerKey).(*handler.Handler) var ( @@ -365,6 +366,22 @@ func getOperators(c *gin.Context) { } } +// @Tags operators +// @Summary Delete operators. +// @Produce json +// @Success 200 {string} string "All pending operator are canceled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /operators [delete] +func deleteOperators(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + if err := handler.RemoveOperators(); err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + + c.String(http.StatusOK, "All pending operator are canceled.") +} + // @Tags operator // @Summary Cancel a Region's pending operator. // @Param region_id path int true "A Region's Id" diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index 017289c81af..a9b89e4e3a4 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -133,6 +133,17 @@ func (h *Handler) RemoveOperator(regionID uint64) error { return nil } +// RemoveOperators removes the all operators. +func (h *Handler) RemoveOperators() error { + c, err := h.GetOperatorController() + if err != nil { + return err + } + + c.RemoveOperators(operator.AdminStop) + return nil +} + // GetOperators returns the running operators. func (h *Handler) GetOperators() ([]*operator.Operator, error) { c, err := h.GetOperatorController() diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index e3bead3ffca..baef0c6d564 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -536,6 +536,41 @@ func (oc *Controller) ack(op *Operator) { } } +// RemoveOperators removes all operators from the running operators. +func (oc *Controller) RemoveOperators(reasons ...CancelReasonType) { + oc.Lock() + removed := oc.removeOperatorsLocked() + oc.Unlock() + var cancelReason CancelReasonType + if len(reasons) > 0 { + cancelReason = reasons[0] + } + for _, op := range removed { + if op.Cancel(cancelReason) { + log.Info("operator removed", + zap.Uint64("region-id", op.RegionID()), + zap.Duration("takes", op.RunningTime()), + zap.Reflect("operator", op)) + } + oc.buryOperator(op) + } +} + +func (oc *Controller) removeOperatorsLocked() []*Operator { + var removed []*Operator + for regionID, op := range oc.operators { + delete(oc.operators, regionID) + operatorCounter.WithLabelValues(op.Desc(), "remove").Inc() + oc.ack(op) + if op.Kind()&OpMerge != 0 { + oc.removeRelatedMergeOperator(op) + } + removed = append(removed, op) + } + oc.updateCounts(oc.operators) + return removed +} + // RemoveOperator removes an operator from the running operators. func (oc *Controller) RemoveOperator(op *Operator, reasons ...CancelReasonType) bool { oc.Lock() diff --git a/pkg/utils/testutil/api_check.go b/pkg/utils/testutil/api_check.go index e4c58f68396..5356d18514b 100644 --- a/pkg/utils/testutil/api_check.go +++ b/pkg/utils/testutil/api_check.go @@ -56,6 +56,13 @@ func StringContain(re *require.Assertions, sub string) func([]byte, int, http.He } } +// StringNotContain is used to check whether response context doesn't contain given string. +func StringNotContain(re *require.Assertions, sub string) func([]byte, int, http.Header) { + return func(resp []byte, _ int, _ http.Header) { + re.NotContains(string(resp), sub, "resp: "+string(resp)) + } +} + // StringEqual is used to check whether response context equal given string. func StringEqual(re *require.Assertions, str string) func([]byte, int, http.Header) { return func(resp []byte, _ int, _ http.Header) { diff --git a/server/api/operator.go b/server/api/operator.go index f7b82b27552..c3d47d97421 100644 --- a/server/api/operator.go +++ b/server/api/operator.go @@ -101,6 +101,21 @@ func (h *operatorHandler) GetOperators(w http.ResponseWriter, r *http.Request) { } } +// @Tags operator +// @Summary Cancel all pending operators. +// @Produce json +// @Success 200 {string} string "All pending operators are canceled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /operators [delete] +func (h *operatorHandler) DeleteOperators(w http.ResponseWriter, r *http.Request) { + if err := h.RemoveOperators(); err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + + h.r.JSON(w, http.StatusOK, "All pending operators are canceled.") +} + // FIXME: details of input json body params // @Tags operator // @Summary Create an operator. diff --git a/server/api/router.go b/server/api/router.go index 705e4da0959..58fc152ddb1 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -133,6 +133,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { operatorHandler := newOperatorHandler(handler, rd) registerFunc(apiRouter, "/operators", operatorHandler.GetOperators, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(apiRouter, "/operators", operatorHandler.CreateOperator, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) + registerFunc(apiRouter, "/operators", operatorHandler.DeleteOperators, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/operators/records", operatorHandler.GetOperatorRecords, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(apiRouter, "/operators/{region_id}", operatorHandler.GetOperatorsByRegion, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(apiRouter, "/operators/{region_id}", operatorHandler.DeleteOperatorByRegion, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) diff --git a/server/api/service_middleware.go b/server/api/service_middleware.go index 5be15c81550..3d9ee6c3e2a 100644 --- a/server/api/service_middleware.go +++ b/server/api/service_middleware.go @@ -140,7 +140,7 @@ func (h *serviceMiddlewareHandler) updateAudit(config *config.ServiceMiddlewareC // @Success 200 {string} string // @Failure 400 {string} string "The input is invalid." // @Failure 500 {string} string "config item not found" -// @Router /service-middleware/config/rate-limit [POST] +// @Router /service-middleware/config/rate-limit [post] func (h *serviceMiddlewareHandler) SetRateLimitConfig(w http.ResponseWriter, r *http.Request) { var input map[string]any if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { @@ -236,7 +236,7 @@ func (h *serviceMiddlewareHandler) SetRateLimitConfig(w http.ResponseWriter, r * // @Success 200 {string} string // @Failure 400 {string} string "The input is invalid." // @Failure 500 {string} string "config item not found" -// @Router /service-middleware/config/grpc-rate-limit [POST] +// @Router /service-middleware/config/grpc-rate-limit [post] func (h *serviceMiddlewareHandler) SetGRPCRateLimitConfig(w http.ResponseWriter, r *http.Request) { var input map[string]any if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { diff --git a/server/api/unsafe_operation.go b/server/api/unsafe_operation.go index b60afbf19d5..dc41ec336e3 100644 --- a/server/api/unsafe_operation.go +++ b/server/api/unsafe_operation.go @@ -43,7 +43,7 @@ func newUnsafeOperationHandler(svr *server.Server, rd *render.Render) *unsafeOpe // Success 200 {string} string "Request has been accepted." // Failure 400 {string} string "The input is invalid." // Failure 500 {string} string "PD server failed to proceed the request." -// @Router /admin/unsafe/remove-failed-stores [POST] +// @Router /admin/unsafe/remove-failed-stores [post] func (h *unsafeOperationHandler) RemoveFailedStores(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) var input map[string]any @@ -81,7 +81,7 @@ func (h *unsafeOperationHandler) RemoveFailedStores(w http.ResponseWriter, r *ht // @Summary Show the current status of failed stores removal. // @Produce json // Success 200 {object} []StageOutput -// @Router /admin/unsafe/remove-failed-stores/show [GET] +// @Router /admin/unsafe/remove-failed-stores/show [get] func (h *unsafeOperationHandler) GetFailedStoresRemovalStatus(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) h.rd.JSON(w, http.StatusOK, rc.GetUnsafeRecoveryController().Show()) diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index eb2ea5e1cd8..32ca4ea300d 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -624,3 +624,56 @@ func (suite *operatorTestSuite) pauseRuleChecker(re *require.Assertions, cluster re.NoError(err) re.True(resp["paused"].(bool)) } + +func (suite *operatorTestSuite) TestRemoveOperators() { + suite.env.RunTestInTwoModes(suite.checkRemoveOperators) +} + +func (suite *operatorTestSuite) checkRemoveOperators(cluster *tests.TestCluster) { + re := suite.Require() + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + } + + for _, store := range stores { + tests.MustPutStore(re, cluster, store) + } + + suite.pauseRuleChecker(re, cluster) + r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1)) + tests.MustPutRegionInfo(re, cluster, r1) + r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3)) + tests.MustPutRegionInfo(re, cluster, r2) + r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2)) + tests.MustPutRegionInfo(re, cluster, r3) + + urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr()) + err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 10, "target_region_id": 20}`), tu.StatusOK(re)) + re.NoError(err) + err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"add-peer", "region_id": 30, "store_id": 4}`), tu.StatusOK(re)) + re.NoError(err) + url := fmt.Sprintf("%s/operators", urlPrefix) + err = tu.CheckGetJSON(testDialClient, url, nil, tu.StatusOK(re), tu.StringContain(re, "merge: region 10 to 20"), tu.StringContain(re, "add peer: store [4]")) + re.NoError(err) + err = tu.CheckDelete(testDialClient, url, tu.StatusOK(re)) + re.NoError(err) + err = tu.CheckGetJSON(testDialClient, url, nil, tu.StatusOK(re), tu.StringNotContain(re, "merge: region 10 to 20"), tu.StringNotContain(re, "add peer: store [4]")) + re.NoError(err) +} diff --git a/tools/pd-heartbeat-bench/config-template.toml b/tools/pd-heartbeat-bench/config-template.toml index 4964535a772..73917c1425a 100644 --- a/tools/pd-heartbeat-bench/config-template.toml +++ b/tools/pd-heartbeat-bench/config-template.toml @@ -3,7 +3,6 @@ round = 0 store-count = 100 region-count = 2000000 -key-length = 56 replica = 3 leader-update-ratio = 0.06 diff --git a/tools/pd-heartbeat-bench/config/config.go b/tools/pd-heartbeat-bench/config/config.go index 90254014d82..d66f5b3139b 100644 --- a/tools/pd-heartbeat-bench/config/config.go +++ b/tools/pd-heartbeat-bench/config/config.go @@ -14,12 +14,12 @@ import ( const ( defaultStoreCount = 50 defaultRegionCount = 1000000 - defaultKeyLength = 56 + defaultHotStoreCount = 0 defaultReplica = 3 defaultLeaderUpdateRatio = 0.06 - defaultEpochUpdateRatio = 0.04 - defaultSpaceUpdateRatio = 0.15 - defaultFlowUpdateRatio = 0.35 + defaultEpochUpdateRatio = 0.0 + defaultSpaceUpdateRatio = 0.0 + defaultFlowUpdateRatio = 0.0 defaultReportRatio = 1 defaultRound = 0 defaultSample = false @@ -41,8 +41,8 @@ type Config struct { Security configutil.SecurityConfig `toml:"security" json:"security"` StoreCount int `toml:"store-count" json:"store-count"` + HotStoreCount int `toml:"hot-store-count" json:"hot-store-count"` RegionCount int `toml:"region-count" json:"region-count"` - KeyLength int `toml:"key-length" json:"key-length"` Replica int `toml:"replica" json:"replica"` LeaderUpdateRatio float64 `toml:"leader-update-ratio" json:"leader-update-ratio"` EpochUpdateRatio float64 `toml:"epoch-update-ratio" json:"epoch-update-ratio"` @@ -117,10 +117,9 @@ func (c *Config) Adjust(meta *toml.MetaData) { configutil.AdjustInt(&c.RegionCount, defaultRegionCount) } - if !meta.IsDefined("key-length") { - configutil.AdjustInt(&c.KeyLength, defaultKeyLength) + if !meta.IsDefined("hot-store-count") { + configutil.AdjustInt(&c.HotStoreCount, defaultHotStoreCount) } - if !meta.IsDefined("replica") { configutil.AdjustInt(&c.Replica, defaultReplica) } @@ -147,6 +146,9 @@ func (c *Config) Adjust(meta *toml.MetaData) { // Validate is used to validate configurations func (c *Config) Validate() error { + if c.HotStoreCount < 0 || c.HotStoreCount > c.StoreCount { + return errors.Errorf("hot-store-count must be in [0, store-count]") + } if c.ReportRatio < 0 || c.ReportRatio > 1 { return errors.Errorf("report-ratio must be in [0, 1]") } @@ -174,7 +176,8 @@ func (c *Config) Clone() *Config { // Options is the option of the heartbeat-bench. type Options struct { - ReportRatio atomic.Value + HotStoreCount atomic.Value + ReportRatio atomic.Value LeaderUpdateRatio atomic.Value EpochUpdateRatio atomic.Value @@ -185,6 +188,7 @@ type Options struct { // NewOptions creates a new option. func NewOptions(cfg *Config) *Options { o := &Options{} + o.HotStoreCount.Store(cfg.HotStoreCount) o.LeaderUpdateRatio.Store(cfg.LeaderUpdateRatio) o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio) o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio) @@ -193,6 +197,11 @@ func NewOptions(cfg *Config) *Options { return o } +// GetHotStoreCount returns the hot store count. +func (o *Options) GetHotStoreCount() int { + return o.HotStoreCount.Load().(int) +} + // GetLeaderUpdateRatio returns the leader update ratio. func (o *Options) GetLeaderUpdateRatio() float64 { return o.LeaderUpdateRatio.Load().(float64) @@ -220,6 +229,7 @@ func (o *Options) GetReportRatio() float64 { // SetOptions sets the option. func (o *Options) SetOptions(cfg *Config) { + o.HotStoreCount.Store(cfg.HotStoreCount) o.LeaderUpdateRatio.Store(cfg.LeaderUpdateRatio) o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio) o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio) diff --git a/tools/pd-heartbeat-bench/main.go b/tools/pd-heartbeat-bench/main.go index 52591b05770..5879cd307f0 100644 --- a/tools/pd-heartbeat-bench/main.go +++ b/tools/pd-heartbeat-bench/main.go @@ -16,6 +16,7 @@ package main import ( "context" + "crypto/tls" "fmt" "io" "math/rand" @@ -38,7 +39,11 @@ import ( "github.com/pingcap/log" "github.com/spf13/pflag" "github.com/tikv/pd/client/grpcutil" + pdHttp "github.com/tikv/pd/client/http" + "github.com/tikv/pd/client/tlsutil" + "github.com/tikv/pd/pkg/codec" "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/tools/pd-heartbeat-bench/config" "go.etcd.io/etcd/pkg/report" @@ -46,9 +51,12 @@ import ( ) const ( - bytesUnit = 8 * units.MiB - keysUint = 8 * units.KiB - queryUnit = 1 * units.KiB + bytesUnit = 128 + keysUint = 8 + queryUnit = 8 + hotByteUnit = 16 * units.KiB + hotKeysUint = 256 + hotQueryUnit = 256 regionReportInterval = 60 // 60s storeReportInterval = 10 // 10s capacity = 4 * units.TiB @@ -168,18 +176,6 @@ func putStores(ctx context.Context, cfg *config.Config, cli pdpb.PDClient, store } } -func newStartKey(id uint64, keyLen int) []byte { - k := make([]byte, keyLen) - copy(k, fmt.Sprintf("%010d", id)) - return k -} - -func newEndKey(id uint64, keyLen int) []byte { - k := newStartKey(id, keyLen) - k[len(k)-1]++ - return k -} - // Regions simulates all regions to heartbeat. type Regions struct { regions []*pdpb.RegionHeartbeatRequest @@ -193,7 +189,7 @@ type Regions struct { updateFlow []int } -func (rs *Regions) init(cfg *config.Config, options *config.Options) []int { +func (rs *Regions) init(cfg *config.Config, options *config.Options) { rs.regions = make([]*pdpb.RegionHeartbeatRequest, 0, cfg.RegionCount) rs.updateRound = 0 @@ -201,14 +197,13 @@ func (rs *Regions) init(cfg *config.Config, options *config.Options) []int { id := uint64(1) now := uint64(time.Now().Unix()) - keyLen := cfg.KeyLength for i := 0; i < cfg.RegionCount; i++ { region := &pdpb.RegionHeartbeatRequest{ Header: header(), Region: &metapb.Region{ Id: id, - StartKey: newStartKey(id, keyLen), - EndKey: newEndKey(id, keyLen), + StartKey: codec.GenerateTableKey(int64(i)), + EndKey: codec.GenerateTableKey(int64(i + 1)), RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 1}, }, ApproximateSize: bytesUnit, @@ -238,25 +233,23 @@ func (rs *Regions) init(cfg *config.Config, options *config.Options) []int { region.Leader = peers[0] rs.regions = append(rs.regions, region) } +} + +func (rs *Regions) update(cfg *config.Config, options *config.Options) { + rs.updateRound += 1 // Generate sample index indexes := make([]int, cfg.RegionCount) for i := range indexes { indexes[i] = i } - - return indexes -} - -func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes []int) { - rs.updateRound += 1 - reportRegions := pick(indexes, cfg.RegionCount, options.GetReportRatio()) + reportCount := len(reportRegions) - rs.updateLeader = pick(reportRegions, reportCount, options.GetLeaderUpdateRatio()) - rs.updateEpoch = pick(reportRegions, reportCount, options.GetEpochUpdateRatio()) - rs.updateSpace = pick(reportRegions, reportCount, options.GetSpaceUpdateRatio()) rs.updateFlow = pick(reportRegions, reportCount, options.GetFlowUpdateRatio()) + rs.updateLeader = randomPick(reportRegions, reportCount, options.GetLeaderUpdateRatio()) + rs.updateEpoch = randomPick(reportRegions, reportCount, options.GetEpochUpdateRatio()) + rs.updateSpace = randomPick(reportRegions, reportCount, options.GetSpaceUpdateRatio()) var ( updatedStatisticsMap = make(map[int]*pdpb.RegionHeartbeatRequest) awakenRegions []*pdpb.RegionHeartbeatRequest @@ -281,13 +274,24 @@ func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes [ // update flow for _, i := range rs.updateFlow { region := rs.regions[i] - region.BytesWritten = uint64(bytesUnit * rand.Float64()) - region.BytesRead = uint64(bytesUnit * rand.Float64()) - region.KeysWritten = uint64(keysUint * rand.Float64()) - region.KeysRead = uint64(keysUint * rand.Float64()) - region.QueryStats = &pdpb.QueryStats{ - Get: uint64(queryUnit * rand.Float64()), - Put: uint64(queryUnit * rand.Float64()), + if region.Leader.StoreId <= uint64(options.GetHotStoreCount()) { + region.BytesWritten = uint64(hotByteUnit * (1 + rand.Float64()) * 60) + region.BytesRead = uint64(hotByteUnit * (1 + rand.Float64()) * 10) + region.KeysWritten = uint64(hotKeysUint * (1 + rand.Float64()) * 60) + region.KeysRead = uint64(hotKeysUint * (1 + rand.Float64()) * 10) + region.QueryStats = &pdpb.QueryStats{ + Get: uint64(hotQueryUnit * (1 + rand.Float64()) * 10), + Put: uint64(hotQueryUnit * (1 + rand.Float64()) * 60), + } + } else { + region.BytesWritten = uint64(bytesUnit * rand.Float64()) + region.BytesRead = uint64(bytesUnit * rand.Float64()) + region.KeysWritten = uint64(keysUint * rand.Float64()) + region.KeysRead = uint64(keysUint * rand.Float64()) + region.QueryStats = &pdpb.QueryStats{ + Get: uint64(queryUnit * rand.Float64()), + Put: uint64(queryUnit * rand.Float64()), + } } updatedStatisticsMap[i] = region } @@ -438,15 +442,20 @@ func (s *Stores) update(rs *Regions) { } } -func pick(slice []int, total int, ratio float64) []int { +func randomPick(slice []int, total int, ratio float64) []int { rand.Shuffle(total, func(i, j int) { slice[i], slice[j] = slice[j], slice[i] }) return append(slice[:0:0], slice[0:int(float64(total)*ratio)]...) } +func pick(slice []int, total int, ratio float64) []int { + return append(slice[:0:0], slice[0:int(float64(total)*ratio)]...) +} + func main() { rand.New(rand.NewSource(0)) // Ensure consistent behavior multiple times + statistics.Denoising = false cfg := config.NewConfig() err := cfg.Parse(os.Args[1:]) defer logutil.LogPanic() @@ -487,16 +496,19 @@ func main() { if err != nil { log.Fatal("create client error", zap.Error(err)) } + initClusterID(ctx, cli) go runHTTPServer(cfg, options) regions := new(Regions) - indexes := regions.init(cfg, options) + regions.init(cfg, options) log.Info("finish init regions") stores := newStores(cfg.StoreCount) stores.update(regions) bootstrap(ctx, cli) putStores(ctx, cfg, cli, stores) log.Info("finish put stores") + httpCli := pdHttp.NewClient("tools-heartbeat-bench", []string{cfg.PDAddr}, pdHttp.WithTLSConfig(loadTLSConfig(cfg))) + go deleteOperators(ctx, httpCli) streams := make(map[uint64]pdpb.PD_RegionHeartbeatClient, cfg.StoreCount) for i := 1; i <= cfg.StoreCount; i++ { streams[uint64(i)] = createHeartbeatStream(ctx, cfg) @@ -533,7 +545,7 @@ func main() { zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)), ) log.Info("store heartbeat stats", zap.String("max", fmt.Sprintf("%.4fs", since))) - regions.update(cfg, options, indexes) + regions.update(cfg, options) go stores.update(regions) // update stores in background, unusually region heartbeat is slower than store update. case <-ctx.Done(): log.Info("got signal to exit") @@ -551,6 +563,22 @@ func exit(code int) { os.Exit(code) } +func deleteOperators(ctx context.Context, httpCli pdHttp.Client) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + err := httpCli.DeleteOperators(ctx) + if err != nil { + log.Error("fail to delete operators", zap.Error(err)) + } + } + } +} + func newReport(cfg *config.Config) report.Report { p := "%4.4f" if cfg.Sample { @@ -599,6 +627,7 @@ func runHTTPServer(cfg *config.Config, options *config.Options) { pprof.Register(engine) engine.PUT("config", func(c *gin.Context) { newCfg := cfg.Clone() + newCfg.HotStoreCount = options.GetHotStoreCount() newCfg.FlowUpdateRatio = options.GetFlowUpdateRatio() newCfg.LeaderUpdateRatio = options.GetLeaderUpdateRatio() newCfg.EpochUpdateRatio = options.GetEpochUpdateRatio() @@ -617,6 +646,7 @@ func runHTTPServer(cfg *config.Config, options *config.Options) { }) engine.GET("config", func(c *gin.Context) { output := cfg.Clone() + output.HotStoreCount = options.GetHotStoreCount() output.FlowUpdateRatio = options.GetFlowUpdateRatio() output.LeaderUpdateRatio = options.GetLeaderUpdateRatio() output.EpochUpdateRatio = options.GetEpochUpdateRatio() @@ -627,3 +657,32 @@ func runHTTPServer(cfg *config.Config, options *config.Options) { }) engine.Run(cfg.StatusAddr) } + +func loadTLSConfig(cfg *config.Config) *tls.Config { + if len(cfg.Security.CAPath) == 0 { + return nil + } + caData, err := os.ReadFile(cfg.Security.CAPath) + if err != nil { + log.Error("fail to read ca file", zap.Error(err)) + } + certData, err := os.ReadFile(cfg.Security.CertPath) + if err != nil { + log.Error("fail to read cert file", zap.Error(err)) + } + keyData, err := os.ReadFile(cfg.Security.KeyPath) + if err != nil { + log.Error("fail to read key file", zap.Error(err)) + } + + tlsConf, err := tlsutil.TLSConfig{ + SSLCABytes: caData, + SSLCertBytes: certData, + SSLKEYBytes: keyData, + }.ToTLSConfig() + if err != nil { + log.Fatal("failed to load tlc config", zap.Error(err)) + } + + return tlsConf +}