Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

7311: add peertask foundation code #7628

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

Matilda-Clerke
Copy link
Contributor

PR description

This PR adds the foundational code for the new peer task system. For details about how this is used, please see the spike PR proving the concept is workable.

Copy link
Contributor

@pinges pinges left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments

import org.slf4j.LoggerFactory;

/** "Manages" the EthPeers for the PeerTaskExecutor */
public class PeerManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code a bit I think that PeerManager could just be an interface with one method: getPeer()
Different implementations could give us the behaviour that we are looking for, like how many retries, retry the same peer/other peers.
This would also allow us to pick peers when they are needed, e.g. to make sure that these peers don't have too many open requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interface to allow easy swapping of different implementations is a fine, but I'm not sure how we'd implement retries in the peer manager... the peer manager is completely separated from the PeerTasks and has no idea what tasks are being executed and whether they succeed or fail.


public <T> PeerTaskExecutorResult<T> execute(final PeerTask<T> peerTask) {
PeerTaskExecutorResult<T> executorResult;
int triesRemaining =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be handled by the peer manager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand how that would work. The peer manager doesn't have any connection to the tasks being run

}
} while (--triesRemaining > 0
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.SUCCESS
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.PEER_DISCONNECTED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't care about which peer is used you would try with another peer ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the executeAgainstPeer() method, so at this point in the code we do care which peer is used. If the calling code doesn't care which peer a task is run against, it can call the execute() method instead, which contains the logic for peer switching.

} while (--triesRemaining > 0
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.SUCCESS
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.PEER_DISCONNECTED
&& sleepBetweenRetries(WAIT_TIME_BEFORE_RETRY[triesRemaining]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be handled by the peer manager

Copy link
Contributor Author

@Matilda-Clerke Matilda-Clerke Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how exactly. The peer manager doesn't (and shouldn't) know anything about whether a peer is requested for a retry or for a first attempt.

}
}

private static boolean isPeerUnused(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be handled by the peer manager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods are here because they are used in the predicate specifically to request peers for the peer task executor. They may not be applicable to all calls to peerManager.getPeer.

They certainly could be moved to PeerManager, but that more tightly couples the peer manager to this one specific use case. Maintaining separation is what gives and maintains the simplicity of this code, as compared with the old system.

return !usedEthPeers.contains(ethPeer);
}

private static boolean isPeerHeightHighEnough(final EthPeer ethPeer, final long requiredHeight) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be handled by the peer manager

return ethPeer.chainState().getEstimatedHeight() >= requiredHeight;
}

private static boolean isPeerProtocolSuitable(final EthPeer ethPeer, final String protocol) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be handled by the peer manager

import java.util.concurrent.TimeoutException;

public class PeerTaskRequestSender {
private static final long DEFAULT_TIMEOUT_MS = 20_000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at the moment we have a timeout of 5s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll update it to 5s for now. In future, it might be good to make this configurable somehow, so end users can tune it to work best in their environment. But that's definitely out of scope for this work.

(boolean streamClosed, MessageData message, EthPeer peer) -> {
responseMessageDataFuture.complete(message);
});
return responseMessageDataFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that!

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants