Skip to content

Commit

Permalink
Allow injecting an existing connection
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaka committed Jun 2, 2019
1 parent e7ab8eb commit 1be015e
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 6 deletions.
42 changes: 39 additions & 3 deletions core/src/nodes/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,13 @@ where
TFut: Future<Item = (TConnInfo, TMuxer), Error = TReachErr> + Send + 'static,
THandler: IntoNodeHandler<TConnInfo> + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,
TReachErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TConnInfo: Send + 'static,
{
ReachAttemptId(self.inner.add_reach_attempt(future, TaskState::Pending, handler))
Expand Down Expand Up @@ -365,6 +365,42 @@ where
self.inner.broadcast_event(event)
}

/// Adds an existing connection to a node to the collection.
///
/// Returns whether we have replaced an existing connection, or not.
pub fn add_connection<TMuxer>(&mut self, conn_info: TConnInfo, user_data: TUserData, muxer: TMuxer, handler: THandler::Handler)
-> CollectionNodeAccept<TConnInfo, TUserData>
where
THandler: IntoNodeHandler<TConnInfo> + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,
TReachErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TConnInfo: Clone + Send + 'static,
TPeerId: Clone,
{
// Calling `HandledNodesTasks::add_connection` is the same as calling
// `HandledNodesTasks::add_reach_attempt`, except that we don't get any `NodeReached` event.
// We therefore implement this method the same way as calling `add_reach_attempt` followed
// with simulating a received `NodeReached` event and accepting it.

let task_id = self.inner.add_connection(
TaskState::Pending,
muxer,
handler
);

CollectionReachEvent {
conn_info: Some(conn_info),
id: task_id,
parent: self,
}.accept(user_data).0
}

/// Grants access to an object that allows controlling a peer of the collection.
///
/// Returns `None` if we don't have a connection to this peer.
Expand Down
43 changes: 40 additions & 3 deletions core/src/nodes/handled_node_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConn
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
<TIntoHandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
<TIntoHandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TConnInfo: Send + 'static,
{
let task_id = self.next_task_id;
Expand All @@ -244,6 +244,43 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConn
task_id
}

/// Adds an existing connection to a node to the collection.
///
/// This method spawns a task dedicated to processing the node's events.
///
/// No `NodeReached` event will be emitted for this task, since the node has already been
/// reached.
pub fn add_connection<TMuxer, THandler>(&mut self, user_data: TUserData, muxer: TMuxer, handler: THandler) -> TaskId
where
TIntoHandler: IntoNodeHandler<TConnInfo, Handler = THandler> + Send + 'static,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
TReachErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
<TIntoHandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TConnInfo: Send + 'static,
{
let task_id = self.next_task_id;
self.next_task_id.0 += 1;

let (tx, rx) = mpsc::unbounded();
self.tasks.insert(task_id, (tx, user_data));

let task: NodeTask<futures::future::Empty<_, _>, _, _, _, _, _, _> = NodeTask {
taken_over: SmallVec::new(),
inner: NodeTaskInner::Node(HandledNode::new(muxer, handler)),
events_tx: self.events_tx.clone(),
in_events_rx: rx.fuse(),
id: task_id,
};

self.to_spawn.push(Box::new(task));
task_id
}

/// Sends an event to all the tasks, including the pending ones.
pub fn broadcast_event(&mut self, event: &TInEvent)
where TInEvent: Clone,
Expand Down
33 changes: 33 additions & 0 deletions core/src/nodes/raw_swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1784,6 +1784,39 @@ where
Ok(self.connect_inner(handler, first, rest))
}

/// Moves the given node to a connected state using the given connection info and muxer.
///
/// No `Connected` event is generated for this action.
///
/// # Panic
///
/// Panics if `conn_info.peer_id()` is not the current peer.
///
pub fn inject_connection(self, conn_info: TConnInfo, connected_point: ConnectedPoint, muxer: TMuxer, handler: THandler::Handler)
-> PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
where
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Clone + Send + 'static,
TPeerId: Eq + Hash + Clone,
{
if conn_info.peer_id() != &self.peer_id {
panic!("Mismatch between conn_info PeerId and request PeerId");
}

match self.nodes.active_nodes.add_connection((conn_info, connected_point), (), muxer, handler) {
CollectionNodeAccept::NewEntry => {},
CollectionNodeAccept::ReplacedExisting { .. } =>
unreachable!("We can only build a PeerNotConnected if we don't have this peer in \
the collection yet"),
}

PeerConnected {
active_nodes: &mut self.nodes.active_nodes,
connected_points: &mut self.nodes.reach_attempts.connected_points,
out_reach_attempts: &mut self.nodes.reach_attempts.out_reach_attempts,
peer_id: self.peer_id,
}
}

/// Inner implementation of `connect`.
fn connect_inner(self, handler: THandler, first: Multiaddr, rest: Vec<Multiaddr>)
-> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
Expand Down

0 comments on commit 1be015e

Please sign in to comment.