diff --git a/crates/extensions/tedge_mqtt_bridge/src/health.rs b/crates/extensions/tedge_mqtt_bridge/src/health.rs index e24284c0d8..456cc6f4b4 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/health.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/health.rs @@ -1,13 +1,14 @@ use crate::overall_status; -use crate::BidirectionalChannelHalf; +use crate::BridgeAsyncClient; +use crate::BridgeMessageSender; use crate::Status; use futures::channel::mpsc; use futures::SinkExt; use futures::StreamExt; -use rumqttc::AsyncClient; use rumqttc::ConnectionError; use rumqttc::Event; use rumqttc::Incoming; +use rumqttc::Publish; use rumqttc::QoS; use std::collections::HashMap; use tracing::error; @@ -17,24 +18,21 @@ use tracing::log::info; /// /// When [Self::monitor] runs, this will watch the status of the bridge halves, and notify the /// relevant MQTT topic about the overall health. -pub struct BridgeHealthMonitor { - target: AsyncClient, +pub struct BridgeHealthMonitor { topic: String, rx_status: mpsc::Receiver<(&'static str, Status)>, - companion_bridge_half: mpsc::Sender>, + companion_bridge_half: BridgeMessageSender, } -impl BridgeHealthMonitor { +impl BridgeHealthMonitor { pub(crate) fn new( - target: AsyncClient, topic: String, - bridge_half: &BidirectionalChannelHalf>, + bridge_half: &BridgeAsyncClient, ) -> (mpsc::Sender<(&'static str, Status)>, Self) { let (tx, rx_status) = mpsc::channel(10); ( tx, BridgeHealthMonitor { - target, topic, rx_status, companion_bridge_half: bridge_half.clone_sender(), @@ -53,14 +51,15 @@ impl BridgeHealthMonitor { if last_status != status { last_status = status; - // TODO could this deadlock? - self.target - .publish(&self.topic, QoS::AtLeastOnce, true, status.unwrap().json()) - .await - .unwrap(); - // Send a note that a message has been published to maintain synchronisation - // between the two bridge halves - self.companion_bridge_half.send(None).await.unwrap(); + let mut health_msg = + Publish::new(&self.topic, QoS::AtLeastOnce, status.unwrap().json()); + health_msg.retain = true; + + // Publish the health message over MQTT, but with no duplicate for the companion + // as this message doesn't have to be acknowledged + self.companion_bridge_half + .internal_publish(health_msg) + .await; } } } diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index f1d32d6cfc..191a1028ed 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -26,6 +26,9 @@ use std::collections::hash_map; use std::collections::HashMap; use std::collections::VecDeque; use std::convert::Infallible; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; use tedge_actors::futures::channel::mpsc; @@ -107,8 +110,19 @@ impl MqttBridgeActorBuilder { cloud_config.set_manual_acks(true); cloud_config.set_max_packet_size(MAX_PACKET_SIZE, MAX_PACKET_SIZE); - let (local_client, local_event_loop) = AsyncClient::new(local_config, 10); - let (cloud_client, cloud_event_loop) = AsyncClient::new(cloud_config, 10); + // When configured with a low max inflight count of messages, rumqttc might reuse the pkid of message not acknowledged yet + // leading to the confusing messages: + // 2024-09-10T16:13:23.497043857Z INFO rumqttc::state: Collision on packet id = 1 + // 2024-09-10T16:13:23.497072791Z INFO tedge_mqtt_bridge: Received notification (cloud) Outgoing(AwaitAck(1)) + // 2024-09-10T16:13:23.497100479Z INFO tedge_mqtt_bridge: Bridge cloud connection still waiting ack for pkid=1 + // 2024-09-10T16:13:23.608007183Z INFO tedge_mqtt_bridge: Received notification (cloud) Outgoing(Publish(1)) + // 2024-09-10T16:13:23.608233911Z INFO tedge_mqtt_bridge: Bridge cloud connection ignoring already known pkid=1 + // + // To prevent that, rumqttc inflight is set far bigger than the number of expected inflight messages. + let in_flight: u16 = 100; + cloud_config.set_inflight(in_flight * 5); + let (local_client, local_event_loop) = AsyncClient::new(local_config, in_flight.into()); + let (cloud_client, cloud_event_loop) = AsyncClient::new(cloud_config, in_flight.into()); let local_topics: Vec<_> = rules .local_subscriptions() @@ -119,19 +133,19 @@ impl MqttBridgeActorBuilder { .map(|t| SubscribeFilter::new(t.to_owned(), QoS::AtLeastOnce)) .collect(); - let [msgs_local, msgs_cloud] = bidirectional_channel(10); + let [cloud_target, local_target] = + bidirectional_channel(cloud_client.clone(), local_client.clone(), in_flight.into()); let [(convert_local, bidir_local), (convert_cloud, bidir_cloud)] = rules.converters_and_bidirectional_topic_filters(); let (tx_status, monitor) = - BridgeHealthMonitor::new(local_client.clone(), health_topic.name.clone(), &msgs_cloud); + BridgeHealthMonitor::new(health_topic.name.clone(), &local_target); tokio::spawn(monitor.monitor()); tokio::spawn(half_bridge( local_event_loop, - cloud_client.clone(), local_client.clone(), + cloud_target, convert_local, bidir_local, - msgs_local, tx_status.clone(), "local", local_topics, @@ -139,11 +153,10 @@ impl MqttBridgeActorBuilder { )); tokio::spawn(half_bridge( cloud_event_loop, - local_client.clone(), cloud_client.clone(), + local_target, convert_cloud, bidir_cloud, - msgs_cloud, tx_status.clone(), "cloud", cloud_topics, @@ -158,37 +171,171 @@ impl MqttBridgeActorBuilder { } } -fn bidirectional_channel(buffer: usize) -> [BidirectionalChannelHalf; 2] { +fn bidirectional_channel( + cloud_client: AsyncClient, + local_client: AsyncClient, + buffer: usize, +) -> [BridgeAsyncClient; 2] { let (tx_first, rx_first) = mpsc::channel(buffer); let (tx_second, rx_second) = mpsc::channel(buffer); [ - BidirectionalChannelHalf { - tx: tx_first, - rx: rx_second, - }, - BidirectionalChannelHalf { - tx: tx_second, - rx: rx_first, - }, + BridgeAsyncClient::new(cloud_client, tx_first, rx_second), + BridgeAsyncClient::new(local_client, tx_second, rx_first), ] } -struct BidirectionalChannelHalf { - tx: mpsc::Sender, - rx: mpsc::Receiver, +enum BridgeMessage { + /// A message to be published to a given target topic + /// + /// This message will have to be acknowledged to its source by the companion half bridge + BridgePub { + target_topic: String, + publish: Publish, + }, + + /// A message to be acknowledged on the target + /// + /// This message has been received by the companion half bridge + BridgeAck { publish: Publish }, + + /// A message *generated* by the bridge + /// + /// This message has not to be acknowledged, as not received by the bridge. + Pub { publish: Publish }, } -impl<'a, T> BidirectionalChannelHalf { - pub fn send(&'a mut self, item: T) -> futures::sink::Send<'a, mpsc::Sender, T> { - self.tx.send(item) - } +/// Wraps the target of an half bridge with a channel to its half bridge companion. +/// +/// So when a message is received and published by this half, +/// the companion will await for that message to be acknowledged by the target +/// before acknowledging to the source. +struct BridgeAsyncClient { + /// MQTT target for the messages + target: AsyncClient, + + /// Receives messages from the companion half bridge + rx: mpsc::Receiver>, + + /// Sends messages to a background task that forwards the messages to the target and companion + sender: BridgeMessageSender, + + /// Count of messages that have been published (excluding health messages) + published: Arc, - pub fn recv(&mut self) -> futures::stream::Next> { + /// Count of messages that have been acknowledged + acknowledged: Arc, +} + +impl BridgeAsyncClient { + pub fn recv(&mut self) -> futures::stream::Next>> { self.rx.next() } - pub fn clone_sender(&self) -> mpsc::Sender { - self.tx.clone() + pub fn clone_sender(&self) -> BridgeMessageSender { + self.sender.clone() + } + + fn new( + target: AsyncClient, + tx: mpsc::Sender>, + rx: mpsc::Receiver>, + ) -> Self { + let (unbounded_tx, unbounded_rx) = mpsc::unbounded(); + let companion_bridge_half = BridgeAsyncClient { + target, + rx, + sender: BridgeMessageSender { unbounded_tx }, + published: Arc::new(AtomicUsize::new(0)), + acknowledged: Arc::new(AtomicUsize::new(0)), + }; + companion_bridge_half.spawn_publisher(tx, unbounded_rx); + companion_bridge_half + } + + async fn publish(&mut self, target_topic: String, publish: Publish) { + self.sender.publish(target_topic, publish).await + } + + async fn ack(&mut self, publish: Publish) { + self.sender.ack(publish).await + } + + fn published(&self) -> usize { + self.published.load(Ordering::Relaxed) + } + + fn acknowledged(&self) -> usize { + self.acknowledged.load(Ordering::Relaxed) + } + + fn spawn_publisher( + &self, + mut tx: mpsc::Sender>, + mut unbounded_rx: mpsc::UnboundedReceiver, + ) { + let target = self.target.clone(); + let published = self.published.clone(); + let acknowledged = self.acknowledged.clone(); + tokio::spawn(async move { + while let Some(message) = unbounded_rx.next().await { + match message { + BridgeMessage::BridgePub { + target_topic, + publish, + } => { + let duplicate = (target_topic.clone(), publish.clone()); + tx.send(Some(duplicate)).await.unwrap(); + target + .publish(target_topic, publish.qos, publish.retain, publish.payload) + .await + .unwrap(); + published.fetch_add(1, Ordering::Relaxed); + } + BridgeMessage::Pub { publish } => { + tx.send(None).await.unwrap(); + target + .publish(publish.topic, publish.qos, publish.retain, publish.payload) + .await + .unwrap(); + } + BridgeMessage::BridgeAck { publish } => { + target.ack(&publish).await.unwrap(); + acknowledged.fetch_add(1, Ordering::Relaxed); + } + } + } + }); + } +} + +#[derive(Clone)] +struct BridgeMessageSender { + unbounded_tx: mpsc::UnboundedSender, +} + +impl BridgeMessageSender { + async fn internal_publish(&mut self, publish: Publish) { + self.unbounded_tx + .send(BridgeMessage::Pub { publish }) + .await + .unwrap() + } + + async fn publish(&mut self, target_topic: String, publish: Publish) { + self.unbounded_tx + .send(BridgeMessage::BridgePub { + target_topic, + publish, + }) + .await + .unwrap() + } + + async fn ack(&mut self, publish: Publish) { + self.unbounded_tx + .send(BridgeMessage::BridgeAck { publish }) + .await + .unwrap() } } @@ -296,11 +443,10 @@ impl<'a, T> BidirectionalChannelHalf { #[allow(clippy::too_many_arguments)] async fn half_bridge( mut recv_event_loop: EventLoop, - target: AsyncClient, recv_client: AsyncClient, + mut target: BridgeAsyncClient, transformer: TopicConverter, bidirectional_topic_filters: Vec>, - mut companion_bridge_half: BidirectionalChannelHalf>, tx_health: mpsc::Sender<(&'static str, Status)>, name: &'static str, topics: Vec, @@ -317,6 +463,10 @@ async fn half_bridge( let mut loop_breaker = MessageLoopBreaker::new(recv_client.clone(), bidirectional_topic_filters); + let mut received = 0; // Count of messages received by this half-bridge + let mut published = 0; // Count of messages published (by the companion) + let mut acknowledged = 0; // Count of messages acknowledged (by the MQTT end-point of the companion) + loop { let res = recv_event_loop.poll().await; bridge_health.update(&res).await; @@ -336,10 +486,15 @@ async fn half_bridge( } }; debug!("Received notification ({name}) {notification:?}"); + debug!("Bridge {name} connection: received={received} forwarded={forwarded} published={published} waiting={waiting} acknowledged={acknowledged} finalized={finalized}", + forwarded = target.published(), + waiting = forward_pkid_to_received_msg.len(), + finalized = target.acknowledged(), + ); match notification { Event::Incoming(Incoming::ConnAck(_)) => { - info!("Bridge cloud connection {name:?} subscribing to {topics:?}"); + info!("Bridge {name} connection subscribing to {topics:?}"); let recv_client = recv_client.clone(); let topics = topics.clone(); // We have to subscribe to this asynchronously (i.e. in a task) since we might at @@ -351,19 +506,12 @@ async fn half_bridge( Event::Incoming(Incoming::Publish(publish)) => { if let Some(publish) = loop_breaker.ensure_not_looped(publish).await { if let Some(topic) = transformer.convert_topic(&publish.topic) { - target - .publish( - topic.clone(), - publish.qos, - publish.retain, - publish.payload.clone(), - ) - .await - .unwrap(); - companion_bridge_half - .send(Some((topic.into_owned(), publish))) - .await - .unwrap(); + received += 1; + target.publish(topic.to_string(), publish).await; + } else { + // Being not forwarded to this bridge target + // The message has to be acknowledged + recv_client.ack(&publish).await.unwrap() } } } @@ -374,17 +522,20 @@ async fn half_bridge( | Incoming::PubRec(PubRec { pkid: ack_pkid }), ) => { if let Some(msg) = forward_pkid_to_received_msg.remove(&ack_pkid) { - let target = target.clone(); - tokio::spawn(async move { target.ack(&msg).await.unwrap() }); + acknowledged += 1; + target.ack(msg).await; + } else { + info!("Bridge {name} connection received ack for unknown pkid={ack_pkid}"); } } // Keep track of packet IDs so we can acknowledge messages Event::Outgoing(Outgoing::Publish(pkid)) => { if let hash_map::Entry::Vacant(e) = forward_pkid_to_received_msg.entry(pkid) { - match companion_bridge_half.recv().await { + match target.recv().await { // A message was forwarded by the other bridge half, note the packet id Some(Some((topic, msg))) => { + published += 1; loop_breaker.forward_on_topic(topic, &msg); if pkid != 0 { // Messages with pkid 0 (meaning QoS=0) should not be added to the hashmap @@ -400,9 +551,18 @@ async fn half_bridge( None => break, } } else { - info!("Bridge cloud connection {name} ignoring already known pkid={pkid}"); + info!("Bridge {name} connection ignoring already known pkid={pkid}"); } } + + Event::Outgoing(Outgoing::AwaitAck(pkid)) => { + info!("Bridge {name} connection still waiting ack for pkid={pkid}"); + } + + Event::Incoming(Incoming::Disconnect) => { + info!("Bridge {name} connection closed by peer"); + } + _ => {} } }