Skip to content

Commit

Permalink
KAFKA-12297 - Making the MockProducer completion to return a dummy Re…
Browse files Browse the repository at this point in the history
…cordMetadata class on exception to honour the completion contract for exception scenario
  • Loading branch information
aadubey authored and omkreddy committed Feb 12, 2021
1 parent 2294d10 commit 4c4b91f
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record, Cal
0L, 0, 0, Time.SYSTEM);
long offset = nextOffset(topicPartition);
Completion completion = new Completion(offset, new RecordMetadata(topicPartition, 0, offset,
RecordBatch.NO_TIMESTAMP, 0L, 0, 0), result, callback);
RecordBatch.NO_TIMESTAMP, 0L, 0, 0), result, callback, topicPartition);

if (!this.transactionInFlight)
this.sent.add(record);
Expand Down Expand Up @@ -512,15 +512,18 @@ private static class Completion {
private final RecordMetadata metadata;
private final ProduceRequestResult result;
private final Callback callback;
private final TopicPartition tp;

public Completion(long offset,
RecordMetadata metadata,
ProduceRequestResult result,
Callback callback) {
Callback callback,
TopicPartition tp) {
this.metadata = metadata;
this.offset = offset;
this.result = result;
this.callback = callback;
this.tp = tp;
}

public void complete(RuntimeException e) {
Expand All @@ -529,7 +532,7 @@ public void complete(RuntimeException e) {
if (e == null)
callback.onCompletion(metadata, null);
else
callback.onCompletion(null, e);
callback.onCompletion(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1L, -1, -1), e);
}
result.done();
}
Expand Down

0 comments on commit 4c4b91f

Please sign in to comment.