Skip to content

Commit

Permalink
Use custom topic prefix in service name
Browse files Browse the repository at this point in the history
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
  • Loading branch information
jarhodes314 committed Aug 22, 2024
1 parent e0f1082 commit a1051b7
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 48 deletions.
21 changes: 8 additions & 13 deletions crates/core/tedge_mapper/src/aws/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,26 @@ use tedge_mqtt_bridge::BridgeConfig;
use tedge_mqtt_bridge::MqttBridgeActorBuilder;
use tracing::warn;

const AWS_MAPPER_NAME: &str = "tedge-mapper-aws";
const BUILT_IN_BRIDGE_NAME: &str = "tedge-mapper-bridge-aws";

pub struct AwsMapper;

#[async_trait]
impl TEdgeComponent for AwsMapper {
fn session_name(&self) -> &str {
AWS_MAPPER_NAME
}

async fn start(
&self,
tedge_config: TEdgeConfig,
_config_dir: &tedge_config::Path,
) -> Result<(), anyhow::Error> {
let prefix = &tedge_config.aws.bridge.topic_prefix;
let aws_mapper_name = format!("tedge-mapper-{prefix}");
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;
start_basic_actors(&aws_mapper_name, &tedge_config).await?;

let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone());
if tedge_config.mqtt.bridge.built_in {
let device_id = tedge_config.device.id.try_read(&tedge_config)?;
let device_topic_id = EntityTopicId::from_str(&tedge_config.mqtt.device_topic_id)?;

let rules = built_in_bridge_rules(device_id, &tedge_config.aws.bridge.topic_prefix)?;
let rules = built_in_bridge_rules(device_id, prefix)?;

let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new(
tedge_config.device.id.try_read(&tedge_config)?,
Expand All @@ -57,12 +52,12 @@ impl TEdgeComponent for AwsMapper {
&tedge_config,
)?;

let health_topic =
service_health_topic(&mqtt_schema, &device_topic_id, BUILT_IN_BRIDGE_NAME);
let bridge_name = format!("tedge-mapper-bridge-{prefix}");
let health_topic = service_health_topic(&mqtt_schema, &device_topic_id, &bridge_name);

let bridge_actor = MqttBridgeActorBuilder::new(
&tedge_config,
BUILT_IN_BRIDGE_NAME,
&bridge_name,
&health_topic,
rules,
cloud_config,
Expand All @@ -76,7 +71,7 @@ impl TEdgeComponent for AwsMapper {
clock,
mqtt_schema,
tedge_config.aws.mapper.timestamp_format,
tedge_config.aws.bridge.topic_prefix.clone(),
prefix.clone(),
);
let mut aws_converting_actor = ConvertingActor::builder("AwsConverter", aws_converter);

Expand Down
21 changes: 8 additions & 13 deletions crates/core/tedge_mapper/src/az/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,26 @@ use tedge_mqtt_bridge::BridgeConfig;
use tedge_mqtt_bridge::MqttBridgeActorBuilder;
use tracing::warn;

const AZURE_MAPPER_NAME: &str = "tedge-mapper-az";
const BUILT_IN_BRIDGE_NAME: &str = "tedge-mapper-bridge-az";

pub struct AzureMapper;

#[async_trait]
impl TEdgeComponent for AzureMapper {
fn session_name(&self) -> &str {
AZURE_MAPPER_NAME
}

async fn start(
&self,
tedge_config: TEdgeConfig,
_config_dir: &tedge_config::Path,
) -> Result<(), anyhow::Error> {
let prefix = &tedge_config.az.bridge.topic_prefix;
let az_mapper_name = format!("tedge-mapper-{prefix}");
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;
start_basic_actors(&az_mapper_name, &tedge_config).await?;
let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone());

if tedge_config.mqtt.bridge.built_in {
let device_topic_id = EntityTopicId::from_str(&tedge_config.mqtt.device_topic_id)?;

let remote_clientid = tedge_config.device.id.try_read(&tedge_config)?;
let topic_prefix = &tedge_config.az.bridge.topic_prefix;
let rules = built_in_bridge_rules(remote_clientid, topic_prefix)?;
let rules = built_in_bridge_rules(remote_clientid, prefix)?;

let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new(
remote_clientid,
Expand All @@ -66,12 +60,13 @@ impl TEdgeComponent for AzureMapper {
&tedge_config,
)?;

let built_in_bridge_name = format!("tedge-mapper-bridge-{prefix}");
let health_topic =
service_health_topic(&mqtt_schema, &device_topic_id, BUILT_IN_BRIDGE_NAME);
service_health_topic(&mqtt_schema, &device_topic_id, &built_in_bridge_name);

let bridge_actor = MqttBridgeActorBuilder::new(
&tedge_config,
BUILT_IN_BRIDGE_NAME,
&built_in_bridge_name,
&health_topic,
rules,
cloud_config,
Expand All @@ -85,7 +80,7 @@ impl TEdgeComponent for AzureMapper {
Box::new(WallClock),
mqtt_schema,
tedge_config.az.mapper.timestamp_format,
&tedge_config.az.bridge.topic_prefix,
prefix,
);
let mut az_converting_actor = ConvertingActor::builder("AzConverter", az_converter);
az_converting_actor.connect_source(get_topic_filter(&tedge_config), &mut mqtt_actor);
Expand Down
29 changes: 15 additions & 14 deletions crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,19 @@ use tedge_mqtt_ext::MqttActorBuilder;
use tedge_timer_ext::TimerActor;
use tedge_uploader_ext::UploaderActor;

const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y";

pub struct CumulocityMapper;

#[async_trait]
impl TEdgeComponent for CumulocityMapper {
fn session_name(&self) -> &str {
CUMULOCITY_MAPPER_NAME
}

async fn start(
&self,
tedge_config: TEdgeConfig,
cfg_dir: &tedge_config::Path,
) -> Result<(), anyhow::Error> {
let prefix = &tedge_config.c8y.bridge.topic_prefix;
let c8y_mapper_name = format!("tedge-mapper-{prefix}");
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;
start_basic_actors(&c8y_mapper_name, &tedge_config).await?;

let mqtt_config = tedge_config.mqtt_config()?;
let c8y_mapper_config = C8yMapperConfig::from_tedge_config(cfg_dir, &tedge_config)?;
Expand Down Expand Up @@ -143,7 +139,7 @@ impl TEdgeComponent for CumulocityMapper {
.context("Invalid device_topic_id")?;

let mapper_service_topic_id = entity_topic_id
.default_service_for_device(CUMULOCITY_MAPPER_NAME)
.default_service_for_device(&c8y_mapper_name)
.context("Can't derive service name if device topic id not in default scheme")?;

let mapper_service_external_id = CumulocityConverter::map_to_c8y_external_id(
Expand All @@ -154,7 +150,7 @@ impl TEdgeComponent for CumulocityMapper {
let last_will_message_mapper =
c8y_api::smartrest::inventory::service_creation_message_payload(
mapper_service_external_id.as_ref(),
CUMULOCITY_MAPPER_NAME,
&c8y_mapper_name,
service_type.as_str(),
"down",
)?;
Expand Down Expand Up @@ -210,8 +206,10 @@ impl TEdgeComponent for CumulocityMapper {
// set service down status on shutdown, using a last-will message.
// A separate MQTT actor/client is required as the last will message of the main MQTT actor
// is used to send down status to health topic.
let mut service_monitor_actor =
MqttActorBuilder::new(service_monitor_client_config(&tedge_config)?);
let mut service_monitor_actor = MqttActorBuilder::new(service_monitor_client_config(
&c8y_mapper_name,
&tedge_config,
)?);

let mut c8y_mapper_actor = C8yMapperBuilder::try_new(
c8y_mapper_config,
Expand Down Expand Up @@ -260,7 +258,10 @@ impl TEdgeComponent for CumulocityMapper {
}
}

pub fn service_monitor_client_config(tedge_config: &TEdgeConfig) -> Result<Config, anyhow::Error> {
pub fn service_monitor_client_config(
c8y_mapper_name: &str,
tedge_config: &TEdgeConfig,
) -> Result<Config, anyhow::Error> {
let main_device_xid: EntityExternalId = tedge_config.device.id.try_read(tedge_config)?.into();
let service_type = &tedge_config.service.ty;
let service_type = if service_type.is_empty() {
Expand All @@ -281,15 +282,15 @@ pub fn service_monitor_client_config(tedge_config: &TEdgeConfig) -> Result<Confi
.context("Invalid device_topic_id")?;

let mapper_service_topic_id = entity_topic_id
.default_service_for_device(CUMULOCITY_MAPPER_NAME)
.default_service_for_device(c8y_mapper_name)
.context("Can't derive service name if device topic id not in default scheme")?;

let mapper_service_external_id =
CumulocityConverter::map_to_c8y_external_id(&mapper_service_topic_id, &main_device_xid);

let last_will_message = c8y_api::smartrest::inventory::service_creation_message(
mapper_service_external_id.as_ref(),
CUMULOCITY_MAPPER_NAME,
c8y_mapper_name,
service_type.as_str(),
"down",
&[],
Expand Down
6 changes: 1 addition & 5 deletions crates/core/tedge_mapper/src/collectd/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,13 @@ impl CollectdMapper {

#[async_trait]
impl TEdgeComponent for CollectdMapper {
fn session_name(&self) -> &str {
COLLECTD_MAPPER_NAME
}

async fn start(
&self,
tedge_config: TEdgeConfig,
_config_dir: &tedge_config::Path,
) -> Result<(), anyhow::Error> {
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;
start_basic_actors(COLLECTD_MAPPER_NAME, &tedge_config).await?;

let input_topic = CollectdMapper::input_topics();
let output_topic = CollectdMapper::output_topic();
Expand Down
2 changes: 0 additions & 2 deletions crates/core/tedge_mapper/src/core/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use tedge_config::TEdgeConfig;

#[async_trait]
pub trait TEdgeComponent: Sync + Send {
fn session_name(&self) -> &str;

async fn start(
&self,
tedge_config: TEdgeConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Bridge stops if mapper stops running
Log ${measurements}
Execute Command systemctl stop tedge-mapper-c8y
Service Health Status Should Be Down tedge-mapper-bridge-custom-c8y-prefix
Service Health Status Should Be Down tedge-mapper-custom-c8y-prefix
Execute Command tedge mqtt pub ${C8Y_TOPIC_PREFIX}/s/us '200,CustomMeasurement,temperature,25'
${measurements}= Device Should Have Measurements minimum=1 maximum=1 type=CustomMeasurement series=temperature
Log ${measurements}
Expand Down Expand Up @@ -277,6 +278,6 @@ Custom Setup
ThinEdgeIO.Execute Command tedge config set mqtt.bridge.built_in true
ThinEdgeIO.Execute Command tedge config set c8y.bridge.topic_prefix custom-c8y-prefix
ThinEdgeIO.Execute Command tedge reconnect c8y
Service Health Status Should Be Up tedge-mapper-c8y
Service Health Status Should Be Up tedge-mapper-custom-c8y-prefix
${output}= Execute Command sudo tedge connect c8y --test
Should Contain ${output} Connection check to c8y cloud is successful.

0 comments on commit a1051b7

Please sign in to comment.