Skip to content

Commit

Permalink
Merge pull request #3122 from didier-wenzek/fix/builtin-bridge-discon…
Browse files Browse the repository at this point in the history
…nected

fix: builtin bridge disconnected when under heavy load
  • Loading branch information
didier-wenzek committed Sep 13, 2024
2 parents a7e4d9f + 80be771 commit 1fcd3b3
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 64 deletions.
33 changes: 16 additions & 17 deletions crates/extensions/tedge_mqtt_bridge/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::overall_status;
use crate::BidirectionalChannelHalf;
use crate::BridgeAsyncClient;
use crate::BridgeMessageSender;
use crate::Status;
use futures::channel::mpsc;
use futures::SinkExt;
use futures::StreamExt;
use rumqttc::AsyncClient;
use rumqttc::ConnectionError;
use rumqttc::Event;
use rumqttc::Incoming;
use rumqttc::Publish;
use rumqttc::QoS;
use std::collections::HashMap;
use tracing::error;
Expand All @@ -17,24 +18,21 @@ use tracing::log::info;
///
/// When [Self::monitor] runs, this will watch the status of the bridge halves, and notify the
/// relevant MQTT topic about the overall health.
pub struct BridgeHealthMonitor<T> {
target: AsyncClient,
pub struct BridgeHealthMonitor {
topic: String,
rx_status: mpsc::Receiver<(&'static str, Status)>,
companion_bridge_half: mpsc::Sender<Option<T>>,
companion_bridge_half: BridgeMessageSender,
}

impl<T> BridgeHealthMonitor<T> {
impl BridgeHealthMonitor {
pub(crate) fn new(
target: AsyncClient,
topic: String,
bridge_half: &BidirectionalChannelHalf<Option<T>>,
bridge_half: &BridgeAsyncClient,
) -> (mpsc::Sender<(&'static str, Status)>, Self) {
let (tx, rx_status) = mpsc::channel(10);
(
tx,
BridgeHealthMonitor {
target,
topic,
rx_status,
companion_bridge_half: bridge_half.clone_sender(),
Expand All @@ -53,14 +51,15 @@ impl<T> BridgeHealthMonitor<T> {
if last_status != status {
last_status = status;

// TODO could this deadlock?
self.target
.publish(&self.topic, QoS::AtLeastOnce, true, status.unwrap().json())
.await
.unwrap();
// Send a note that a message has been published to maintain synchronisation
// between the two bridge halves
self.companion_bridge_half.send(None).await.unwrap();
let mut health_msg =
Publish::new(&self.topic, QoS::AtLeastOnce, status.unwrap().json());
health_msg.retain = true;

// Publish the health message over MQTT, but with no duplicate for the companion
// as this message doesn't have to be acknowledged
self.companion_bridge_half
.internal_publish(health_msg)
.await;
}
}
}
Expand Down
Loading

0 comments on commit 1fcd3b3

Please sign in to comment.