diff --git a/replicator/partition_replicator.go b/replicator/partition_replicator.go index c0405244..30e618c9 100644 --- a/replicator/partition_replicator.go +++ b/replicator/partition_replicator.go @@ -6,6 +6,7 @@ package replicator import ( "bytes" + "fmt" "io" "math/rand" @@ -23,16 +24,19 @@ type Options struct { type PartitionReplicator struct { *Options highwaterMarkOffset int64 + clientID string offset int64 msgs chan []byte done chan struct{} } func NewPartitionReplicator(options *Options) (*PartitionReplicator, error) { + clientID := fmt.Sprintf("Partition Replicator for Broker/Topic/Partition: [%d/%s/%d]", options.Partition.Leader.ID, options.Partition.Topic, options.Partition.Partition) return &PartitionReplicator{ - Options: options, - done: make(chan struct{}, 2), - msgs: make(chan []byte, 2), + Options: options, + clientID: clientID, + done: make(chan struct{}, 2), + msgs: make(chan []byte, 2), }, nil } @@ -60,7 +64,7 @@ func (r *PartitionReplicator) fetchMessages() error { } var req protocol.Encoder = &protocol.Request{ CorrelationID: rand.Int31(), - ClientID: "client_id", + ClientID: r.clientID, Body: fetchBody, } b, err := protocol.Encode(req)