-
Notifications
You must be signed in to change notification settings - Fork 804
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
7311: add peertask foundation code #7628
base: main
Are you sure you want to change the base?
Changes from 5 commits
4b80016
a8d5a9f
e4be5c0
5859444
c335cbe
08c66fd
049cae2
5afba63
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright contributors to Hyperledger Besu. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.hyperledger.besu.ethereum.eth.manager.peertask; | ||
|
||
public class InvalidPeerTaskResponseException extends Exception { | ||
|
||
public InvalidPeerTaskResponseException() { | ||
super(); | ||
} | ||
|
||
public InvalidPeerTaskResponseException(final Throwable cause) { | ||
super(cause); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
/* | ||
* Copyright contributors to Hyperledger Besu. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.hyperledger.besu.ethereum.eth.manager.peertask; | ||
|
||
public class NoAvailablePeerException extends Exception {} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
/* | ||
* Copyright contributors to Hyperledger Besu. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.hyperledger.besu.ethereum.eth.manager.peertask; | ||
|
||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; | ||
import org.hyperledger.besu.ethereum.p2p.peers.PeerId; | ||
|
||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.function.Predicate; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** "Manages" the EthPeers for the PeerTaskExecutor */ | ||
public class PeerManager { | ||
private static final Logger LOG = LoggerFactory.getLogger(PeerManager.class); | ||
|
||
// use a synchronized map to ensure the map is never modified by multiple threads at once | ||
private final Map<PeerId, EthPeer> ethPeersByPeerId = | ||
Collections.synchronizedMap(new HashMap<>()); | ||
|
||
/** | ||
* Gets the highest reputation peer matching the supplies filter | ||
* | ||
* @param filter a filter to match prospective peers with | ||
* @return the highest reputation peer matching the supplies filter | ||
* @throws NoAvailablePeerException If there are no suitable peers | ||
*/ | ||
public EthPeer getPeer(final Predicate<EthPeer> filter) throws NoAvailablePeerException { | ||
LOG.trace("Getting peer from pool of {} peers", ethPeersByPeerId.size()); | ||
return ethPeersByPeerId.values().stream() | ||
.filter(filter) | ||
.max(Comparator.naturalOrder()) | ||
.orElseThrow(NoAvailablePeerException::new); | ||
} | ||
|
||
public Optional<EthPeer> getPeerByPeerId(final PeerId peerId) { | ||
return Optional.ofNullable(ethPeersByPeerId.get(peerId)); | ||
} | ||
|
||
public void addPeer(final EthPeer ethPeer) { | ||
ethPeersByPeerId.put(ethPeer.getConnection().getPeer(), ethPeer); | ||
} | ||
|
||
public void removePeer(final PeerId peerId) { | ||
ethPeersByPeerId.remove(peerId); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* Copyright contributors to Hyperledger Besu. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.hyperledger.besu.ethereum.eth.manager.peertask; | ||
|
||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; | ||
|
||
import java.util.Collection; | ||
|
||
/** | ||
* Represents a task to be executed on an EthPeer by the PeerTaskExecutor | ||
* | ||
* @param <T> The type of the result of this PeerTask | ||
*/ | ||
public interface PeerTask<T> { | ||
/** | ||
* Returns the SubProtocol used for this PeerTask | ||
* | ||
* @return the SubProtocol used for this PeerTask | ||
*/ | ||
String getSubProtocol(); | ||
|
||
/** | ||
* Gets the minimum required block number for a peer to have to successfully execute this task | ||
* | ||
* @return the minimum required block number for a peer to have to successfully execute this task | ||
*/ | ||
long getRequiredBlockNumber(); | ||
|
||
/** | ||
* Gets the request data to send to the EthPeer | ||
* | ||
* @return the request data to send to the EthPeer | ||
*/ | ||
MessageData getRequestMessage(); | ||
|
||
/** | ||
* Parses the MessageData response from the EthPeer | ||
* | ||
* @param messageData the response MessageData to be parsed | ||
* @return a T built from the response MessageData | ||
* @throws InvalidPeerTaskResponseException if the response messageData is invalid | ||
*/ | ||
T parseResponse(MessageData messageData) throws InvalidPeerTaskResponseException; | ||
|
||
/** | ||
* Gets the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor | ||
* | ||
* @return the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor | ||
*/ | ||
Collection<PeerTaskBehavior> getPeerTaskBehaviors(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
/* | ||
* Copyright contributors to Hyperledger Besu. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.hyperledger.besu.ethereum.eth.manager.peertask; | ||
|
||
public enum PeerTaskBehavior { | ||
RETRY_WITH_SAME_PEER, | ||
RETRY_WITH_OTHER_PEERS | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
/* | ||
* Copyright contributors to Hyperledger Besu. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.hyperledger.besu.ethereum.eth.manager.peertask; | ||
|
||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; | ||
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; | ||
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; | ||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; | ||
import org.hyperledger.besu.metrics.BesuMetricCategory; | ||
import org.hyperledger.besu.plugin.services.MetricsSystem; | ||
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; | ||
import org.hyperledger.besu.plugin.services.metrics.OperationTimer; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.function.Supplier; | ||
|
||
/** Manages the execution of PeerTasks, respecting their PeerTaskBehavior */ | ||
public class PeerTaskExecutor { | ||
private static final long[] WAIT_TIME_BEFORE_RETRY = {0, 20000, 5000}; | ||
|
||
private final PeerManager peerManager; | ||
private final PeerTaskRequestSender requestSender; | ||
private final Supplier<ProtocolSpec> protocolSpecSupplier; | ||
private final LabelledMetric<OperationTimer> requestTimer; | ||
|
||
public PeerTaskExecutor( | ||
final PeerManager peerManager, | ||
final PeerTaskRequestSender requestSender, | ||
final Supplier<ProtocolSpec> protocolSpecSupplier, | ||
final MetricsSystem metricsSystem) { | ||
this.peerManager = peerManager; | ||
this.requestSender = requestSender; | ||
this.protocolSpecSupplier = protocolSpecSupplier; | ||
requestTimer = | ||
metricsSystem.createLabelledTimer( | ||
BesuMetricCategory.PEERS, "Peer Task Executor Request Time", "", "Task Class Name"); | ||
} | ||
|
||
public <T> PeerTaskExecutorResult<T> execute(final PeerTask<T> peerTask) { | ||
PeerTaskExecutorResult<T> executorResult; | ||
int triesRemaining = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this could be handled by the peer manager There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I understand how that would work. The peer manager doesn't have any connection to the tasks being run |
||
peerTask.getPeerTaskBehaviors().contains(PeerTaskBehavior.RETRY_WITH_OTHER_PEERS) ? 3 : 1; | ||
final Collection<EthPeer> usedEthPeers = new ArrayList<>(); | ||
do { | ||
EthPeer peer; | ||
try { | ||
peer = | ||
peerManager.getPeer( | ||
(candidatePeer) -> | ||
isPeerUnused(candidatePeer, usedEthPeers) | ||
&& (protocolSpecSupplier.get().isPoS() | ||
|| isPeerHeightHighEnough( | ||
candidatePeer, peerTask.getRequiredBlockNumber())) | ||
&& isPeerProtocolSuitable(candidatePeer, peerTask.getSubProtocol())); | ||
usedEthPeers.add(peer); | ||
executorResult = executeAgainstPeer(peerTask, peer); | ||
} catch (NoAvailablePeerException e) { | ||
executorResult = | ||
new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE); | ||
} | ||
} while (--triesRemaining > 0 | ||
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.SUCCESS); | ||
|
||
return executorResult; | ||
} | ||
|
||
public <T> CompletableFuture<PeerTaskExecutorResult<T>> executeAsync(final PeerTask<T> peerTask) { | ||
return CompletableFuture.supplyAsync(() -> execute(peerTask)); | ||
} | ||
|
||
public <T> PeerTaskExecutorResult<T> executeAgainstPeer( | ||
final PeerTask<T> peerTask, final EthPeer peer) { | ||
MessageData requestMessageData = peerTask.getRequestMessage(); | ||
PeerTaskExecutorResult<T> executorResult; | ||
int triesRemaining = | ||
peerTask.getPeerTaskBehaviors().contains(PeerTaskBehavior.RETRY_WITH_SAME_PEER) ? 3 : 1; | ||
do { | ||
try { | ||
|
||
MessageData responseMessageData; | ||
try (final OperationTimer.TimingContext timingContext = | ||
requestTimer.labels(peerTask.getClass().getSimpleName()).startTimer()) { | ||
responseMessageData = | ||
requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer); | ||
} | ||
T result = peerTask.parseResponse(responseMessageData); | ||
peer.recordUsefulResponse(); | ||
executorResult = new PeerTaskExecutorResult<>(result, PeerTaskExecutorResponseCode.SUCCESS); | ||
|
||
} catch (PeerConnection.PeerNotConnected e) { | ||
executorResult = | ||
new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.PEER_DISCONNECTED); | ||
|
||
} catch (InterruptedException | TimeoutException e) { | ||
peer.recordRequestTimeout(requestMessageData.getCode()); | ||
executorResult = new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.TIMEOUT); | ||
|
||
} catch (InvalidPeerTaskResponseException e) { | ||
peer.recordUselessResponse(e.getMessage()); | ||
executorResult = | ||
new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.INVALID_RESPONSE); | ||
|
||
} catch (ExecutionException e) { | ||
executorResult = | ||
new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR); | ||
} | ||
} while (--triesRemaining > 0 | ||
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.SUCCESS | ||
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.PEER_DISCONNECTED | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you don't care about which peer is used you would try with another peer ... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is in the |
||
&& sleepBetweenRetries(WAIT_TIME_BEFORE_RETRY[triesRemaining])); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could be handled by the peer manager There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure how exactly. The peer manager doesn't (and shouldn't) know anything about whether a peer is requested for a retry or for a first attempt. |
||
|
||
return executorResult; | ||
} | ||
|
||
public <T> CompletableFuture<PeerTaskExecutorResult<T>> executeAgainstPeerAsync( | ||
final PeerTask<T> peerTask, final EthPeer peer) { | ||
return CompletableFuture.supplyAsync(() -> executeAgainstPeer(peerTask, peer)); | ||
} | ||
|
||
private boolean sleepBetweenRetries(final long sleepTime) { | ||
try { | ||
Thread.sleep(sleepTime); | ||
return true; | ||
} catch (InterruptedException e) { | ||
return false; | ||
} | ||
} | ||
|
||
private static boolean isPeerUnused( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be handled by the peer manager There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These methods are here because they are used in the predicate specifically to request peers for the peer task executor. They may not be applicable to all calls to They certainly could be moved to PeerManager, but that more tightly couples the peer manager to this one specific use case. Maintaining separation is what gives and maintains the simplicity of this code, as compared with the old system. |
||
final EthPeer ethPeer, final Collection<EthPeer> usedEthPeers) { | ||
return !usedEthPeers.contains(ethPeer); | ||
} | ||
|
||
private static boolean isPeerHeightHighEnough(final EthPeer ethPeer, final long requiredHeight) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be handled by the peer manager |
||
return ethPeer.chainState().getEstimatedHeight() >= requiredHeight; | ||
} | ||
|
||
private static boolean isPeerProtocolSuitable(final EthPeer ethPeer, final String protocol) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be handled by the peer manager |
||
return ethPeer.getProtocolName().equals(protocol); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright contributors to Hyperledger Besu. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.hyperledger.besu.ethereum.eth.manager.peertask; | ||
|
||
public enum PeerTaskExecutorResponseCode { | ||
SUCCESS, | ||
NO_PEER_AVAILABLE, | ||
PEER_DISCONNECTED, | ||
INTERNAL_SERVER_ERROR, | ||
TIMEOUT, | ||
INVALID_RESPONSE | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the code a bit I think that PeerManager could just be an interface with one method: getPeer()
Different implementations could give us the behaviour that we are looking for, like how many retries, retry the same peer/other peers.
This would also allow us to pick peers when they are needed, e.g. to make sure that these peers don't have too many open requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An interface to allow easy swapping of different implementations is a fine, but I'm not sure how we'd implement retries in the peer manager... the peer manager is completely separated from the PeerTasks and has no idea what tasks are being executed and whether they succeed or fail.