-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Send acknowledgements to source when events are forwarded to remote peer #4305
Conversation
@@ -59,6 +59,7 @@ public class PipelineTransformer { | |||
private final EventFactory eventFactory; | |||
private final AcknowledgementSetManager acknowledgementSetManager; | |||
private final SourceCoordinatorFactory sourceCoordinatorFactory; | |||
private boolean acknowledgementsEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be good to have some basic unit test coverage
@@ -117,6 +119,8 @@ private void buildPipelineFromConfiguration( | |||
LOG.info("Building buffer for the pipeline [{}]", pipelineName); | |||
final Buffer pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, pipelineConfiguration.getBufferPluginSetting(), source.getDecoder()); | |||
|
|||
if (pipelineDefinedBuffer.isByteBuffer()) | |||
acknowledgementsEnabled = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be set to true for all sub-pipelines just because one sub-pipeline uses persistent buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I have added comment and the test case also validates this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the need for unit tests.
@@ -50,7 +50,7 @@ public ProcessWorker( | |||
this.pipeline = pipeline; | |||
this.pluginMetrics = PluginMetrics.fromNames("ProcessWorker", pipeline.getName()); | |||
this.invalidEventHandlesCounter = pluginMetrics.counter(INVALID_EVENT_HANDLES); | |||
this.acknowledgementsEnabled = pipeline.getSource().areAcknowledgementsEnabled(); | |||
this.acknowledgementsEnabled = pipeline.getSource().areAcknowledgementsEnabled() || readBuffer.isByteBuffer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this logic is right. A byte buffer in itself does not require acknowledgements. Can we add a new default method on Buffer
to express the actual need?
boolean isPersistentBuffer();
Or some other boolean?
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
@@ -78,6 +79,7 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel, | |||
this.eventFactory = eventFactory; | |||
this.acknowledgementSetManager = acknowledgementSetManager; | |||
this.sourceCoordinatorFactory = sourceCoordinatorFactory; | |||
this.acknowledgementsEnabled = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to @graytaylor0 's comment below. This should probably not be a field variable and should be set for each pipeline independently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. It should be for all pipelines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kkondaka , If I understand correctly, then a downstream pipeline needs to have acknowledgements enabled. This makes sense.
But, the current code ends up changing all pipelines. Some pipelines can run fully independently of each other without any direct relationship. Those would have acknowledgements enabled incorrectly.
@@ -78,6 +79,7 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel, | |||
this.eventFactory = eventFactory; | |||
this.acknowledgementSetManager = acknowledgementSetManager; | |||
this.sourceCoordinatorFactory = sourceCoordinatorFactory; | |||
this.acknowledgementsEnabled = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kkondaka , If I understand correctly, then a downstream pipeline needs to have acknowledgements enabled. This makes sense.
But, the current code ends up changing all pipelines. Some pipelines can run fully independently of each other without any direct relationship. Those would have acknowledgements enabled incorrectly.
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice clean solution for that last request. Thank you!
Description
Send acknowledgements to source when events are forwarded to remote peer
Also, propagate the acknowledgements enabled flag in pipeline correctly when ByteBuffer is used.
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.