Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Aggregate event handle #4625

Merged
merged 4 commits into from
Jun 17, 2024

Conversation

kkondaka
Copy link
Collaborator

Description

Add Aggregate event handle, for supporting e2e acks for aggregated events (in future)

Cleaned up a lot of API in AcknowledgementSetManager as part of this to simplify the code

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • [X ] New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • [X ] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Krishna Kondaka added 2 commits June 13, 2024 03:16
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
@Override
public void acquireReference() {
synchronized (this) {
for (WeakReference<AcknowledgementSet> acknowledgementSetRef: acknowledgementSetRefList) {;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have a dangling semi-colon. {;

import java.time.Instant;
import java.io.Serializable;

public class AggregateEventHandle implements EventHandle, InternalEventHandle, Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make a package-protected AbstractEventHandle that shares some of the logic? This will help clarify the differences between the two and also consolidate similar code.

when(((DefaultEventHandle) eventHandle).getAcknowledgementSet()).thenReturn(mock(AcknowledgementSet.class));
doNothing().when(eventHandle).release(true);
final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class);
lenient().doAnswer(a-> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can just be when(acknowledgementSet).release(eventHandle, true).thenReturn(null);

when(((DefaultEventHandle) eventHandle).getAcknowledgementSet()).thenReturn(mock(AcknowledgementSet.class));
doNothing().when(eventHandle).release(true);
final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class);
lenient().doAnswer(a-> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment to above.

import java.time.Instant;
import java.io.Serializable;

public class AggregateEventHandle implements EventHandle, InternalEventHandle, Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this class be made internal? Are we going to have processors use this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It will be used by AggregateProcessor.

eventHandle.acquireReference();
verify(acknowledgementSet1).acquire(eventHandle);
verify(acknowledgementSet2).acquire(eventHandle);
assertThat(acquireCount, equalTo(2));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this assertion? And also, why the acquireCount tracking?

I believe that lines 67-68 fully assert the same thing.

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
graytaylor0
graytaylor0 previously approved these changes Jun 17, 2024
Comment on lines -70 to -91
public void acquire(final EventHandle eventHandle) {
if (eventHandle == null) {
numNullHandles.incrementAndGet();
return;
}

DefaultAcknowledgementSet acknowledgementSet = getAcknowledgementSet(eventHandle);
lock.lock();
boolean exists = false;
try {
exists = acknowledgementSets.contains(acknowledgementSet);
} finally {
lock.unlock();
}
// if acknowledgementSet doesn't exist then it means that the
// event still active even after the acknowledgement set is
// cleaned up.
if (exists) {
acknowledgementSet.acquire(eventHandle);
} else {
LOG.warn("Trying acquire an event in an AcknowledgementSet that does not exist");
numInvalidAcquires.incrementAndGet();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this was unused?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graytaylor0 It was used but not needed after the new changes where the acquire is directly done on the event handle. When we first implemented this, event handle was not guaranteed to be there, so we have implemented it to go through DefaultAcknowledgementSetManager which in turn invokes AcknowledgementSetMonitor.acquire(). Now all that became unnecessary.

dlvenable
dlvenable previously approved these changes Jun 17, 2024
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
@kkondaka kkondaka dismissed stale reviews from dlvenable and graytaylor0 via f0153a6 June 17, 2024 16:43
@dlvenable dlvenable merged commit a85e05e into opensearch-project:main Jun 17, 2024
43 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants