Skip to content

Commit

Permalink
Simplify BidirectionalChannelHalf new and spawn
Browse files Browse the repository at this point in the history
As suggested by @jarhodes314.

Let BidirectionalChannelHalf::new take the AsyncClient target and spawn
the publisher loop. Doing so, the unbounded receiver can be directly passed to spawn_publisher
with no need to be temporarily store in the BidirectionalChannelHalf struct.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
  • Loading branch information
didier-wenzek committed Sep 13, 2024
1 parent 3b2235f commit 5783f15
Showing 1 changed file with 34 additions and 22 deletions.
56 changes: 34 additions & 22 deletions crates/extensions/tedge_mqtt_bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ impl MqttBridgeActorBuilder {
.map(|t| SubscribeFilter::new(t.to_owned(), QoS::AtLeastOnce))
.collect();

let [msgs_local, msgs_cloud] = bidirectional_channel(in_flight.into());
let [msgs_local, msgs_cloud] =
bidirectional_channel(cloud_client.clone(), local_client.clone(), in_flight.into());
let [(convert_local, bidir_local), (convert_cloud, bidir_cloud)] =
rules.converters_and_bidirectional_topic_filters();
let (tx_status, monitor) = BridgeHealthMonitor::new(health_topic.name.clone(), &msgs_cloud);
Expand Down Expand Up @@ -168,16 +169,22 @@ impl MqttBridgeActorBuilder {
}
}

fn bidirectional_channel<T>(buffer: usize) -> [BidirectionalChannelHalf<T>; 2] {
fn bidirectional_channel(
cloud_client: AsyncClient,
local_client: AsyncClient,
buffer: usize,
) -> [BidirectionalChannelHalf<Publish>; 2] {
let (tx_first, rx_first) = mpsc::channel(buffer);
let (tx_second, rx_second) = mpsc::channel(buffer);
[
BidirectionalChannelHalf::new(tx_first, rx_second),
BidirectionalChannelHalf::new(tx_second, rx_first),
BidirectionalChannelHalf::new(cloud_client, tx_first, rx_second),
BidirectionalChannelHalf::new(local_client, tx_second, rx_first),
]
}

struct BidirectionalChannelHalf<T> {
/// MQTT target for the messages
target: AsyncClient,
/// Sends messages to the companion half bridge
tx: mpsc::Sender<Option<(String, T)>>,
/// Receives messages from the companion half bridge
Expand All @@ -194,22 +201,10 @@ struct BidirectionalChannelHalf<T> {
/// }
///
unbounded_tx: mpsc::UnboundedSender<(Option<String>, T)>,
/// Used by the background task
unbounded_rx: Option<mpsc::UnboundedReceiver<(Option<String>, T)>>,
}

impl<'a, T> BidirectionalChannelHalf<T> {
fn new(tx: mpsc::Sender<Option<(String, T)>>, rx: mpsc::Receiver<Option<(String, T)>>) -> Self {
let (unbounded_tx, unbounded_rx) = mpsc::unbounded::<(Option<String>, T)>();
BidirectionalChannelHalf {
tx,
rx,
unbounded_tx,
unbounded_rx: Some(unbounded_rx),
}
}

pub fn send(
impl<T> BidirectionalChannelHalf<T> {
pub fn send<'a>(
&'a mut self,
target_topic: Option<String>,
message: T,
Expand All @@ -228,8 +223,27 @@ impl<'a, T> BidirectionalChannelHalf<T> {
}

impl BidirectionalChannelHalf<Publish> {
pub fn spawn_publisher(&mut self, target: AsyncClient) {
let mut unbounded_rx = self.unbounded_rx.take().unwrap();
fn new(
target: AsyncClient,
tx: mpsc::Sender<Option<(String, Publish)>>,
rx: mpsc::Receiver<Option<(String, Publish)>>,
) -> Self {
let (unbounded_tx, unbounded_rx) = mpsc::unbounded();
let companion_bridge_half = BidirectionalChannelHalf {
target,
tx,
rx,
unbounded_tx,
};
companion_bridge_half.spawn_publisher(unbounded_rx);
companion_bridge_half
}

fn spawn_publisher(
&self,
mut unbounded_rx: mpsc::UnboundedReceiver<(Option<String>, Publish)>,
) {
let target = self.target.clone();
let mut tx = self.tx.clone();
tokio::spawn(async move {
while let Some((target_topic, publish)) = unbounded_rx.next().await {
Expand Down Expand Up @@ -359,8 +373,6 @@ async fn half_bridge(
topics: Vec<SubscribeFilter>,
reconnect_policy: TEdgeConfigReaderMqttBridgeReconnectPolicy,
) {
companion_bridge_half.spawn_publisher(target.clone());

let mut backoff = CustomBackoff::new(
::backoff::SystemClock {},
reconnect_policy.initial_interval.duration(),
Expand Down

0 comments on commit 5783f15

Please sign in to comment.