Skip to content

Commit

Permalink
add Earliest and Latest reset options (#125)
Browse files Browse the repository at this point in the history
* update reset to be able to reset to earliest and latest to specific partitions

* remove comments

* rename reset flags

* rename var

* reverting changes to example cluster config

* refactoring code

* revert changes in cluster.yaml for local-cluster

* remove comments

* update reset.go

* update reset.go

* add test

* refactoring code

* change offset val in test

* refactoring code

* refactor code, add testcases for new strategies

* update comments

* rename GetOffset function to GetEarliestOrLatestOffset

* update reset.go
  • Loading branch information
SaiPrasanna9 committed Mar 27, 2023
1 parent 0f3b45c commit 9fc691d
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 3 deletions.
45 changes: 42 additions & 3 deletions cmd/topicctl/subcmd/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var resetOffsetsCmd = &cobra.Command{
type resetOffsetsCmdConfig struct {
offset int64
partitions []int
toEarliest bool
toLatest bool

shared sharedOptions
}
Expand All @@ -42,12 +44,30 @@ func init() {
[]int{},
"Partition (defaults to all)",
)
resetOffsetsCmd.Flags().BoolVar(
&resetOffsetsConfig.toEarliest,
"to-earliest",
false,
"Resets offsets of consumer group members to earliest offsets of partitions")
resetOffsetsCmd.Flags().BoolVar(
&resetOffsetsConfig.toLatest,
"to-latest",
false,
"Resets offsets of consumer group members to latest offsets of partitions")

addSharedFlags(resetOffsetsCmd, &resetOffsetsConfig.shared)
RootCmd.AddCommand(resetOffsetsCmd)
}

func resetOffsetsPreRun(cmd *cobra.Command, args []string) error {
resetOffsetSpecificaton := "You must choose only one of the following reset-offset specifications: --to-earliest, --to-latest, --offset."

if resetOffsetsConfig.toEarliest && resetOffsetsConfig.toLatest {
return errors.New(resetOffsetSpecificaton)

} else if cmd.Flags().Changed("offset") && (resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest) {
return errors.New(resetOffsetSpecificaton)
}
return resetOffsetsConfig.shared.validate()
}

Expand All @@ -72,20 +92,39 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error {
for _, partitionInfo := range topicInfo.Partitions {
partitionIDsMap[partitionInfo.ID] = struct{}{}
}

var resetOffsetsStrategy string
if resetOffsetsConfig.toLatest {
resetOffsetsStrategy = groups.LatestResetOffsetsStrategy
} else if resetOffsetsConfig.toEarliest {
resetOffsetsStrategy = groups.EarliestResetOffsetsStrategy
}
partitionOffsets := map[int]int64{}

if len(resetOffsetsConfig.partitions) > 0 {
for _, partition := range resetOffsetsConfig.partitions {
if _, ok := partitionIDsMap[partition]; !ok {
return fmt.Errorf("Partition %d not found in topic %s", partition, topic)
}
if resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest {
partitionOffsets[partition], err = groups.GetEarliestOrLatestOffset(ctx, adminClient.GetConnector(), topic, resetOffsetsStrategy, partition)
if err != nil {
return err
}
} else {
partitionOffsets[partition] = resetOffsetsConfig.offset
}

partitionOffsets[partition] = resetOffsetsConfig.offset
}
} else {
for _, partitionInfo := range topicInfo.Partitions {
partitionOffsets[partitionInfo.ID] = resetOffsetsConfig.offset
if resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest {
partitionOffsets[partitionInfo.ID], err = groups.GetEarliestOrLatestOffset(ctx, adminClient.GetConnector(), topic, resetOffsetsStrategy, partitionInfo.ID)
if err != nil {
return err
}
} else {
partitionOffsets[partitionInfo.ID] = resetOffsetsConfig.offset
}
}
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/groups/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,27 @@ func ResetOffsets(
},
)
}

// GetEarliestorLatestOffset gets earliest/latest offset for a given topic partition for resetting offsets of consumer group
func GetEarliestOrLatestOffset(
ctx context.Context,
connector *admin.Connector,
topic string,
strategy string,
partition int,
) (int64, error) {
if strategy == EarliestResetOffsetsStrategy {
partitionBound, err := messages.GetPartitionBounds(ctx, connector, topic, partition, 0)
if err != nil {
return 0, err
}
return partitionBound.FirstOffset, nil
} else if strategy == LatestResetOffsetsStrategy {
partitionBound, err := messages.GetPartitionBounds(ctx, connector, topic, partition, 0)
if err != nil {
return 0, err
}
return partitionBound.LastOffset, nil
}
return 0, errors.New("Invalid reset offset strategy provided.")
}
72 changes: 72 additions & 0 deletions pkg/groups/groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,51 @@ func TestGetLags(t *testing.T) {
}
}

func TestGetEarliestOrLatestOffset(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
})
require.NoError(t, err)

topicName := createTestTopic(ctx, t, connector)
groupID := fmt.Sprintf("test-group-%s", topicName)

reader := kafka.NewReader(
kafka.ReaderConfig{
Brokers: []string{connector.Config.BrokerAddr},
Dialer: connector.Dialer,
GroupID: groupID,
Topic: topicName,
MinBytes: 50,
MaxBytes: 10000,
},
)

readerCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

for i := 0; i < 8; i++ {
_, err := reader.ReadMessage(readerCtx)
require.NoError(t, err)
}

groupDetails, err := GetGroupDetails(ctx, connector, groupID)
require.NoError(t, err)

groupPartitions := groupDetails.Members[0].TopicPartitions[topicName]

for _, partition := range groupPartitions {
offset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, LatestResetOffsetsStrategy, partition)
require.NoError(t, err)
assert.Equal(t, int64(4), offset)

offset, err = GetEarliestOrLatestOffset(ctx, connector, topicName, EarliestResetOffsetsStrategy, partition)
require.NoError(t, err)
assert.Equal(t, int64(0), offset)
}
}

func TestResetOffsets(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
Expand Down Expand Up @@ -162,6 +207,33 @@ func TestResetOffsets(t *testing.T) {
require.Equal(t, 2, len(lags))
assert.Equal(t, int64(2), lags[0].MemberOffset)
assert.Equal(t, int64(1), lags[1].MemberOffset)

// latest offset of partition 0
latestOffset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, LatestResetOffsetsStrategy, 0)
require.NoError(t, err)
// earliest offset of partition 1
earliestOffset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, EarliestResetOffsetsStrategy, 1)
require.NoError(t, err)

err = ResetOffsets(
ctx,
connector,
topicName,
groupID,
map[int]int64{
0: latestOffset,
1: earliestOffset,
},
)
require.NoError(t, err)

lags, err = GetMemberLags(ctx, connector, topicName, groupID)
require.NoError(t, err)

require.Equal(t, 2, len(lags))
assert.Equal(t, int64(4), lags[0].MemberOffset)
assert.Equal(t, int64(0), lags[1].MemberOffset)

}

func createTestTopic(
Expand Down
6 changes: 6 additions & 0 deletions pkg/groups/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,9 @@ func (m MemberPartitionLag) OffsetLag() int64 {
func (m MemberPartitionLag) TimeLag() time.Duration {
return m.NewestTime.Sub(m.MemberTime)
}

// Consumer Group Offset reset strategies
const (
LatestResetOffsetsStrategy string = "latest"
EarliestResetOffsetsStrategy string = "earliest"
)

0 comments on commit 9fc691d

Please sign in to comment.