Skip to content

Commit

Permalink
Fix reduce replica count generator
Browse files Browse the repository at this point in the history
  • Loading branch information
pecigonzalo committed Oct 24, 2022
1 parent 9a0dc08 commit e4bb594
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 14 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ default: testacc
testacc:
TF_ACC=1 go test ./... -v $(TESTARGS) -timeout 120m

.PHONY:
test:
go test ./... -v $(TESTARGS) -timeout 120m

.PHONY: install
install:
go install
Expand Down
39 changes: 25 additions & 14 deletions internal/provider/topic_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package provider
import (
"context"
"fmt"
"sort"
"time"

"github.com/hashicorp/terraform-plugin-framework/attr"
"github.com/hashicorp/terraform-plugin-framework/diag"
Expand Down Expand Up @@ -242,23 +240,23 @@ func (r *TopicResource) Update(ctx context.Context, req resource.UpdateRequest,
}

if !data.Config.Equal(state.Config) {
tflog.Debug(ctx, "Updating topic configuration")
tflog.Info(ctx, "Updating topic configuration")
err := r.updateConfig(ctx, data, req, resp)
if err != nil {
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Unable to update topic configuration, got error: %s", err))
return
}
}
if !data.ReplicationFactor.Equal(state.ReplicationFactor) {
tflog.Debug(ctx, "Updating topic replication factor")
tflog.Info(ctx, "Updating topic replication factor")
err := r.updateReplicationFactor(ctx, state, data, req, resp)
if err != nil {
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Unable to update topic replication factor, got error: %s", err))
return
}
}
if !data.Partitions.Equal(state.Partitions) {
tflog.Debug(ctx, "Updating topic partitions")
tflog.Info(ctx, "Updating topic partitions")
err := r.updatePartitions(ctx, state, data, req, resp)
if err != nil {
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Unable to update topic partitions, got error: %s", err))
Expand Down Expand Up @@ -358,7 +356,6 @@ func (r *TopicResource) updateReplicationFactor(ctx context.Context, state *Topi
replicasWanted := data.ReplicationFactor.Value
replicasPresent := state.ReplicationFactor.Value

// TODO: From here on, this method is quite ugly, we need to refactor it
var newPartitionsInfo []admin.PartitionInfo
for _, partition := range topicInfo.Partitions {
if replicasWanted > replicasPresent {
Expand Down Expand Up @@ -395,7 +392,6 @@ func (r *TopicResource) updateReplicationFactor(ctx context.Context, state *Topi
alterPartitionReassignmentsRequest := kafka.AlterPartitionReassignmentsRequest{
Topic: data.Name.Value,
Assignments: apiAssignments,
Timeout: time.Minute * 30, // TODO: Allow for configuration in the provider
}

tflog.Info(ctx, fmt.Sprintf("%v", alterPartitionReassignmentsRequest.Assignments))
Expand All @@ -407,6 +403,17 @@ func (r *TopicResource) updateReplicationFactor(ctx context.Context, state *Topi
if clientResp.Error != nil {
return err
}
if len(clientResp.PartitionResults) > 0 {
partErrors := []error{}
for _, partResult := range clientResp.PartitionResults {
if partResult.Error != nil {
partErrors = append(partErrors, partResult.Error)
}
}
if len(partErrors) > 0 {
return fmt.Errorf("errors changing replication factor: %s", partErrors)
}
}
return nil
}

Expand Down Expand Up @@ -434,14 +441,15 @@ func increaseReplicas(desired int, replicas []int, brokerIDs []int) []int {

func reduceReplicas(desired int, replicas []int, leader int) []int {
if len(replicas) > desired {
newReplicas := []int{}
for i, replica := range replicas {
if replica == leader {
continue
} else {
replicas = append(replicas[:i], replicas[i+1:]...)
newReplicas = append(replicas[:i], replicas[i+1:]...)
}
}
return reduceReplicas(desired, replicas, leader)
return reduceReplicas(desired, newReplicas, leader)
} else {
return replicas
}
Expand All @@ -451,7 +459,6 @@ func (r *TopicResource) updatePartitions(ctx context.Context, state *TopicResour
if data.Partitions.Value < state.Partitions.Value {
return fmt.Errorf("partition count can't be reduced")
}
extraPartitions := int(data.Partitions.Value) - int(state.Partitions.Value)

brokerIDs, err := r.client.GetBrokerIDs(ctx)
if err != nil {
Expand All @@ -468,14 +475,14 @@ func (r *TopicResource) updatePartitions(ctx context.Context, state *TopicResour

currAssignments := topicInfo.ToAssignments()
for _, b := range brokersInfo {
tflog.Warn(ctx, b.Rack)
tflog.Debug(ctx, fmt.Sprintf("Broker ID: %v Rack: %s", b.ID, b.Rack))
}
extraPartitions := int(data.Partitions.Value) - int(state.Partitions.Value)

picker := pickers.NewRandomizedPicker()

extender := extenders.NewBalancedExtender(
brokersInfo,
true,
false,
picker,
)
desiredAssignments, err := extender.Extend(
Expand All @@ -486,11 +493,15 @@ func (r *TopicResource) updatePartitions(ctx context.Context, state *TopicResour
if err != nil {
return err
}
desiredAssignments = desiredAssignments[len(desiredAssignments)-extraPartitions:]

err = r.client.AssignPartitions(ctx, data.Name.Value, desiredAssignments)
tflog.Info(ctx, fmt.Sprintf("Assignments: %v", desiredAssignments))

err = r.client.AddPartitions(ctx, data.Name.Value, desiredAssignments)
if err != nil {
return err
}

return nil
}

Expand Down
44 changes: 44 additions & 0 deletions internal/provider/topic_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/stretchr/testify/assert"
)

func TestAccTopicResource(t *testing.T) {
Expand Down Expand Up @@ -60,3 +61,46 @@ resource "kafka_topic" "test" {
}
`, name, partitions, replication_factor)
}

func TestIncreaseReplicas(t *testing.T) {
assert := assert.New(t)
desiredCount := 3
currentReplicas := []int{2, 1}
brokerIDs := []int{1, 2, 3}

expectedReplicas := []int{2, 1, 3}
newReplicas := increaseReplicas(desiredCount, currentReplicas, brokerIDs)

assert.EqualValues(expectedReplicas, newReplicas, "Increase replica expected should be the same")
}

func TestReduceReplicas(t *testing.T) {
assert := assert.New(t)

desiredCount := 1
currentReplicas := []int{2, 1, 3}
leader := 2

expectedReplicas := []int{2}
newReplicas := reduceReplicas(desiredCount, currentReplicas, leader)

assert.Equal(expectedReplicas, newReplicas, "Increase replica expected should be the same")

desiredCount = 2
currentReplicas = []int{2, 1, 3}
leader = 2

expectedReplicas = []int{2, 3}
newReplicas = reduceReplicas(desiredCount, currentReplicas, leader)

assert.Equal(expectedReplicas, newReplicas, "Increase replica expected should be the same")

desiredCount = 2
currentReplicas = []int{2, 1, 3, 5, 4}
leader = 2

expectedReplicas = []int{2, 4}
newReplicas = reduceReplicas(desiredCount, currentReplicas, leader)

assert.Equal(expectedReplicas, newReplicas, "Increase replica expected should be the same")
}

0 comments on commit e4bb594

Please sign in to comment.