-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
topicctl new action: rebalance #142
Merged
ssingudasu
merged 21 commits into
master
from
ssingudasu/dp-1413_topiccl_new_action_rebalance
Jul 11, 2023
Merged
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
f4e1493
topicctl new action: rebalance
ssingudasu a548f44
topicctl new action: rebalance
ssingudasu 1289c36
topicctl new action: rebalance
ssingudasu afc0bb8
topicctl new action: rebalance
ssingudasu f24d375
topicctl new action: rebalance
ssingudasu 7f4d820
topicctl new action: rebalance
ssingudasu aa90036
topicctl new action: rebalance
ssingudasu 0662376
topicctl new action: rebalance
ssingudasu e0e1bed
topicctl new action: rebalance
ssingudasu 6f937b6
topicctl new action: rebalance
ssingudasu c71ade7
topicctl new action: rebalance
ssingudasu d7b35a5
topicctl new action: rebalance
ssingudasu 2a3cb3e
topicctl new action: rebalance
ssingudasu fc00a4d
topicctl new action: rebalance
ssingudasu e1b59d8
topicctl new action: rebalance
ssingudasu 842c06d
topicctl new action: rebalance
ssingudasu 3889be0
README updated with subcomand rebalance information
ssingudasu d005451
Updated topicctl version
ssingudasu 221a56d
README updated with subcomand rebalance information
ssingudasu 10d181b
README updated with subcomand rebalance information
ssingudasu 019ef11
README updated with subcommand rebalance information
ssingudasu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,363 @@ | ||
package subcmd | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"github.com/spf13/cobra" | ||
"os" | ||
"os/signal" | ||
"path/filepath" | ||
"strconv" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/segmentio/topicctl/pkg/admin" | ||
"github.com/segmentio/topicctl/pkg/apply" | ||
"github.com/segmentio/topicctl/pkg/cli" | ||
"github.com/segmentio/topicctl/pkg/config" | ||
"github.com/segmentio/topicctl/pkg/util" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
var rebalanceCmd = &cobra.Command{ | ||
Use: "rebalance", | ||
Short: "rebalance all topics for a given topic prefix path", | ||
PreRunE: rebalancePreRun, | ||
RunE: rebalanceRun, | ||
} | ||
|
||
type rebalanceCmdConfig struct { | ||
brokersToRemove []int | ||
brokerThrottleMBsOverride int | ||
dryRun bool | ||
partitionBatchSizeOverride int | ||
pathPrefix string | ||
sleepLoopDuration time.Duration | ||
showProgressInterval time.Duration | ||
|
||
shared sharedOptions | ||
} | ||
|
||
var rebalanceConfig rebalanceCmdConfig | ||
|
||
func init() { | ||
rebalanceCmd.Flags().IntSliceVar( | ||
&rebalanceConfig.brokersToRemove, | ||
"to-remove", | ||
[]int{}, | ||
"Brokers to remove; only applies if rebalance is also set", | ||
) | ||
rebalanceCmd.Flags().IntVar( | ||
&rebalanceConfig.brokerThrottleMBsOverride, | ||
"broker-throttle-mb", | ||
0, | ||
"Broker throttle override (MB/sec)", | ||
) | ||
rebalanceCmd.Flags().BoolVar( | ||
&rebalanceConfig.dryRun, | ||
"dry-run", | ||
false, | ||
"Do a dry-run", | ||
) | ||
rebalanceCmd.Flags().IntVar( | ||
&rebalanceConfig.partitionBatchSizeOverride, | ||
"partition-batch-size", | ||
0, | ||
"Partition batch size override", | ||
) | ||
rebalanceCmd.Flags().StringVar( | ||
&rebalanceConfig.pathPrefix, | ||
"path-prefix", | ||
os.Getenv("TOPICCTL_APPLY_PATH_PREFIX"), | ||
"Prefix for topic config paths", | ||
) | ||
rebalanceCmd.Flags().DurationVar( | ||
&rebalanceConfig.sleepLoopDuration, | ||
"sleep-loop-duration", | ||
10*time.Second, | ||
"Amount of time to wait between partition checks", | ||
) | ||
rebalanceCmd.Flags().DurationVar( | ||
&rebalanceConfig.showProgressInterval, | ||
"show-progress-interval", | ||
0*time.Second, | ||
"Interval of time to show progress during rebalance", | ||
) | ||
|
||
addSharedConfigOnlyFlags(rebalanceCmd, &rebalanceConfig.shared) | ||
RootCmd.AddCommand(rebalanceCmd) | ||
} | ||
|
||
func rebalancePreRun(cmd *cobra.Command, args []string) error { | ||
if rebalanceConfig.shared.clusterConfig == "" || rebalanceConfig.pathPrefix == "" { | ||
return fmt.Errorf("Requires args --cluster-config & --path-prefix (or) env variables TOPICCTL_CLUSTER_CONFIG & TOPICCTL_APPLY_PATH_PREFIX") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func rebalanceRun(cmd *cobra.Command, args []string) error { | ||
ctx := context.Background() | ||
rebalanceCtxStruct, err := getRebalanceCtxStruct(&rebalanceConfig) | ||
if err != nil { | ||
return err | ||
} | ||
ctx = context.WithValue(ctx, "progress", rebalanceCtxStruct) | ||
ctx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
|
||
sigChan := make(chan os.Signal, 1) | ||
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) | ||
go func() { | ||
// for an interrupt, cancel context and exit program to end all topic rebalances | ||
<-sigChan | ||
cancel() | ||
os.Exit(1) | ||
}() | ||
|
||
clusterConfigPath := rebalanceConfig.shared.clusterConfig | ||
topicConfigDir := rebalanceConfig.pathPrefix | ||
clusterConfig, err := config.LoadClusterFile(clusterConfigPath, rebalanceConfig.shared.expandEnv) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
adminClient, err := clusterConfig.NewAdminClient(ctx, | ||
nil, | ||
rebalanceConfig.dryRun, | ||
rebalanceConfig.shared.saslUsername, | ||
rebalanceConfig.shared.saslPassword, | ||
) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
defer adminClient.Close() | ||
|
||
// get all topic configs from --path-prefix i.e topics folder | ||
// we perform a recursive on the --path-prefix because there can be nested directories with | ||
// more topics for the --cluster-config | ||
// | ||
// NOTE: a topic file is ignored for rebalance if | ||
// - a file is not a valid topic yaml file | ||
// - any topic config is not consistent with cluster config | ||
log.Infof("Getting all topic configs from path prefix %v", topicConfigDir) | ||
topicFiles, err := getAllFiles(topicConfigDir) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// iterate through each topic config and initiate rebalance | ||
topicConfigs := []config.TopicConfig{} | ||
topicErrorDict := make(map[string]error) | ||
for _, topicFile := range topicFiles { | ||
// do not consider invalid topic yaml files for rebalance | ||
topicConfigs, err = config.LoadTopicsFile(topicFile) | ||
if err != nil { | ||
log.Errorf("Invalid topic yaml file: %s", topicFile) | ||
continue | ||
} | ||
|
||
for _, topicConfig := range topicConfigs { | ||
// topic config should be consistent with the cluster config | ||
if err := config.CheckConsistency(topicConfig, clusterConfig); err != nil { | ||
log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath) | ||
continue | ||
} | ||
|
||
log.Infof( | ||
"Rebalancing topic %s from config file %s with cluster config %s", | ||
topicConfig.Meta.Name, | ||
topicFile, | ||
clusterConfigPath, | ||
) | ||
|
||
topicErrorDict[topicConfig.Meta.Name] = nil | ||
rebalanceTopicProgressConfig := util.RebalanceTopicProgressConfig{ | ||
TopicName: topicConfig.Meta.Name, | ||
ClusterName: clusterConfig.Meta.Name, | ||
ClusterEnvironment: clusterConfig.Meta.Environment, | ||
ToRemove: rebalanceConfig.brokersToRemove, | ||
RebalanceError: false, | ||
} | ||
if err := rebalanceApplyTopic(ctx, topicConfig, clusterConfig, adminClient); err != nil { | ||
topicErrorDict[topicConfig.Meta.Name] = err | ||
rebalanceTopicProgressConfig.RebalanceError = true | ||
log.Errorf("topic: %s rebalance failed with error: %v", topicConfig.Meta.Name, err) | ||
} | ||
|
||
// show topic final progress | ||
if rebalanceCtxStruct.Enabled { | ||
progressStr, err := util.StructToStr(rebalanceTopicProgressConfig) | ||
if err != nil { | ||
log.Errorf("progress struct to string error: %+v", err) | ||
} else { | ||
log.Infof("Rebalance Progress: %s", progressStr) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// audit at the end of all topic rebalances | ||
successTopics := 0 | ||
errorTopics := 0 | ||
for thisTopicName, thisTopicError := range topicErrorDict { | ||
if thisTopicError != nil { | ||
errorTopics += 1 | ||
log.Errorf("topic: %s rebalance failed with error: %v", thisTopicName, thisTopicError) | ||
} else { | ||
log.Infof("topic: %s rebalance is successful", thisTopicName) | ||
successTopics += 1 | ||
} | ||
} | ||
|
||
// show overall rebalance summary report | ||
if rebalanceCtxStruct.Enabled { | ||
progressStr, err := util.StructToStr(util.RebalanceProgressConfig{ | ||
SuccessTopics: successTopics, | ||
ErrorTopics: errorTopics, | ||
ClusterName: clusterConfig.Meta.Name, | ||
ClusterEnvironment: clusterConfig.Meta.Environment, | ||
ToRemove: rebalanceConfig.brokersToRemove, | ||
}) | ||
if err != nil { | ||
log.Errorf("progress struct to string error: %+v", err) | ||
} else { | ||
log.Infof("Rebalance Progress: %s", progressStr) | ||
} | ||
} | ||
|
||
log.Infof("Rebalance complete! %d topics rebalanced successfully, %d topics had errors", successTopics, errorTopics) | ||
return nil | ||
} | ||
|
||
// Check whether a topic is a candidate for action rebalance | ||
// settings(partitions, retention time) of topic config with settings for topic in the cluster | ||
func rebalanceTopicCheck( | ||
topicConfig config.TopicConfig, | ||
topicInfo admin.TopicInfo, | ||
) error { | ||
log.Debugf("Check topic partitions...") | ||
if len(topicInfo.Partitions) != topicConfig.Spec.Partitions { | ||
return fmt.Errorf("Topic partitions in kafka: %d does not match with topic config: %d", | ||
len(topicInfo.Partitions), | ||
topicConfig.Spec.Partitions, | ||
) | ||
} | ||
|
||
log.Debugf("Check topic retention.ms...") | ||
topicInfoRetentionMs := topicInfo.Config["retention.ms"] | ||
topicConfigRetentionMs := strconv.Itoa(topicConfig.Spec.RetentionMinutes * 60000) | ||
if topicInfoRetentionMs == "" { | ||
topicInfoRetentionMs = strconv.Itoa(0) | ||
} | ||
if topicInfoRetentionMs != topicConfigRetentionMs { | ||
return fmt.Errorf("Topic retention in kafka: %s does not match with topic config: %s", | ||
topicInfoRetentionMs, | ||
topicConfigRetentionMs, | ||
) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Perform rebalance on a topic. returns error if unsuccessful | ||
// topic will not be rebalanced if | ||
// - partitions of a topic in kafka cluster does not match with topic partition setting in topic config | ||
// - retention.ms of a topic in kafka cluster does not match with topic retentionMinutes setting in topic config | ||
// | ||
// to ensure there are no disruptions to kafka cluster | ||
// | ||
// NOTE: topic that is not present in kafka cluster will not be applied | ||
func rebalanceApplyTopic( | ||
petedannemann marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ctx context.Context, | ||
topicConfig config.TopicConfig, | ||
clusterConfig config.ClusterConfig, | ||
adminClient admin.Client, | ||
) error { | ||
topicConfig.SetDefaults() | ||
topicInfo, err := adminClient.GetTopic(ctx, topicConfig.Meta.Name, true) | ||
if err != nil { | ||
if err == admin.ErrTopicDoesNotExist { | ||
return fmt.Errorf("Topic: %s does not exist in Kafka cluster", topicConfig.Meta.Name) | ||
} | ||
return err | ||
} | ||
log.Debugf("topicInfo from kafka: %+v", topicInfo) | ||
|
||
if err := rebalanceTopicCheck(topicConfig, topicInfo); err != nil { | ||
return err | ||
} | ||
|
||
retentionDropStepDuration, err := clusterConfig.GetDefaultRetentionDropStepDuration() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
applierConfig := apply.TopicApplierConfig{ | ||
BrokerThrottleMBsOverride: rebalanceConfig.brokerThrottleMBsOverride, | ||
BrokersToRemove: rebalanceConfig.brokersToRemove, | ||
ClusterConfig: clusterConfig, | ||
DryRun: rebalanceConfig.dryRun, | ||
PartitionBatchSizeOverride: rebalanceConfig.partitionBatchSizeOverride, | ||
Rebalance: true, // to enforce action: rebalance | ||
AutoContinueRebalance: true, // to continue without prompts | ||
RetentionDropStepDuration: retentionDropStepDuration, // not needed for rebalance | ||
SkipConfirm: true, // to enforce action: rebalance | ||
SleepLoopDuration: rebalanceConfig.sleepLoopDuration, | ||
TopicConfig: topicConfig, | ||
} | ||
|
||
cliRunner := cli.NewCLIRunner(adminClient, log.Infof, false) | ||
if err := cliRunner.ApplyTopic(ctx, applierConfig); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// build ctx map for rebalance progress | ||
func getRebalanceCtxStruct(rebalanceConfig *rebalanceCmdConfig) (util.RebalanceCtxStruct, error) { | ||
rebalanceCtxStruct := util.RebalanceCtxStruct{ | ||
Enabled: true, | ||
Interval: rebalanceConfig.showProgressInterval, | ||
} | ||
|
||
zeroDur, _ := time.ParseDuration("0s") | ||
if rebalanceConfig.showProgressInterval == zeroDur { | ||
rebalanceCtxStruct.Enabled = false | ||
log.Infof("--progress-interval is 0s. Not showing progress...") | ||
} else if rebalanceConfig.showProgressInterval < zeroDur { | ||
return rebalanceCtxStruct, fmt.Errorf("--show-progress-interval should be > 0s") | ||
} | ||
|
||
if rebalanceConfig.dryRun { | ||
rebalanceCtxStruct.Enabled = false | ||
log.Infof("--dry-run enabled. Not showing progress...") | ||
return rebalanceCtxStruct, nil | ||
} | ||
|
||
return rebalanceCtxStruct, nil | ||
} | ||
|
||
// get all files for a given dir path | ||
func getAllFiles(dir string) ([]string, error) { | ||
var files []string | ||
|
||
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if !info.IsDir() { | ||
files = append(files, path) | ||
} | ||
|
||
return nil | ||
}) | ||
|
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return files, err | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming you just used the wording from the rebalance section but "full broker rebalance" suggests something else to me. How about "full cluster rebalance" or "all broker rebalance"? This change should be made in all places it appears.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"full cluster rebalance" this makes sense to me..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about modifying at other places. But I am modifying atleast in the subcommand
rebalance