-
Notifications
You must be signed in to change notification settings - Fork 804
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
7311: add peertask foundation code #7628
base: main
Are you sure you want to change the base?
7311: add peertask foundation code #7628
Conversation
Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments
import org.slf4j.LoggerFactory; | ||
|
||
/** "Manages" the EthPeers for the PeerTaskExecutor */ | ||
public class PeerManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the code a bit I think that PeerManager could just be an interface with one method: getPeer()
Different implementations could give us the behaviour that we are looking for, like how many retries, retry the same peer/other peers.
This would also allow us to pick peers when they are needed, e.g. to make sure that these peers don't have too many open requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An interface to allow easy swapping of different implementations is a fine, but I'm not sure how we'd implement retries in the peer manager... the peer manager is completely separated from the PeerTasks and has no idea what tasks are being executed and whether they succeed or fail.
|
||
public <T> PeerTaskExecutorResult<T> execute(final PeerTask<T> peerTask) { | ||
PeerTaskExecutorResult<T> executorResult; | ||
int triesRemaining = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be handled by the peer manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand how that would work. The peer manager doesn't have any connection to the tasks being run
} | ||
} while (--triesRemaining > 0 | ||
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.SUCCESS | ||
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.PEER_DISCONNECTED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't care about which peer is used you would try with another peer ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in the executeAgainstPeer()
method, so at this point in the code we do care which peer is used. If the calling code doesn't care which peer a task is run against, it can call the execute()
method instead, which contains the logic for peer switching.
} while (--triesRemaining > 0 | ||
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.SUCCESS | ||
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.PEER_DISCONNECTED | ||
&& sleepBetweenRetries(WAIT_TIME_BEFORE_RETRY[triesRemaining])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be handled by the peer manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how exactly. The peer manager doesn't (and shouldn't) know anything about whether a peer is requested for a retry or for a first attempt.
} | ||
} | ||
|
||
private static boolean isPeerUnused( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be handled by the peer manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods are here because they are used in the predicate specifically to request peers for the peer task executor. They may not be applicable to all calls to peerManager.getPeer
.
They certainly could be moved to PeerManager, but that more tightly couples the peer manager to this one specific use case. Maintaining separation is what gives and maintains the simplicity of this code, as compared with the old system.
return !usedEthPeers.contains(ethPeer); | ||
} | ||
|
||
private static boolean isPeerHeightHighEnough(final EthPeer ethPeer, final long requiredHeight) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be handled by the peer manager
return ethPeer.chainState().getEstimatedHeight() >= requiredHeight; | ||
} | ||
|
||
private static boolean isPeerProtocolSuitable(final EthPeer ethPeer, final String protocol) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be handled by the peer manager
import java.util.concurrent.TimeoutException; | ||
|
||
public class PeerTaskRequestSender { | ||
private static final long DEFAULT_TIMEOUT_MS = 20_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think at the moment we have a timeout of 5s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll update it to 5s for now. In future, it might be good to make this configurable somehow, so end users can tune it to work best in their environment. But that's definitely out of scope for this work.
(boolean streamClosed, MessageData message, EthPeer peer) -> { | ||
responseMessageDataFuture.complete(message); | ||
}); | ||
return responseMessageDataFuture.get(timeoutMs, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that!
Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
PR description
This PR adds the foundational code for the new peer task system. For details about how this is used, please see the spike PR proving the concept is workable.