diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handler.rs b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs index 41ce2f0121..d8fe6c9e3d 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handler.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs @@ -1,7 +1,6 @@ use super::handlers::EntityTarget; use super::handlers::OperationContext; use super::handlers::OperationMessage; -use super::handlers::UpdateStatus; use crate::actor::IdDownloadRequest; use crate::actor::IdDownloadResult; use crate::actor::IdUploadRequest; @@ -11,6 +10,7 @@ use crate::Capabilities; use c8y_api::http_proxy::C8yEndPoint; use c8y_auth_proxy::url::ProxyUrlGenerator; use c8y_http_proxy::handle::C8YHttpProxy; +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; use tedge_actors::ClientMessageBox; @@ -19,8 +19,11 @@ use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::ChannelFilter; use tedge_api::mqtt_topics::EntityFilter; use tedge_api::mqtt_topics::IdGenerator; +use tedge_api::workflow::GenericCommandState; use tedge_mqtt_ext::MqttMessage; -use tokio::task::JoinError; +use tracing::debug; +use tracing::error; +use tracing::warn; /// Handles operations. /// @@ -138,23 +141,74 @@ impl OperationHandler { message, }; - let topic: Arc = message.message.topic.name.clone().into(); + let topic = Arc::from(message.message.topic.name.as_str()); - let running_operation = self.running_operations.remove(&topic); + let status = match GenericCommandState::from_command_message(&message.message) { + Ok(command) if command.is_cleared() => None, + Ok(command) => Some(command.status), + Err(err) => { + error!(%err, ?message, "could not parse command payload"); + return; + } + }; - let running_operation = - running_operation.unwrap_or_else(|| RunningOperation::spawn(Arc::clone(&self.context))); + let current_operation = self.running_operations.entry(topic); - let operation_status = running_operation - .update(message) - .await - .expect("operation task should not panic"); + match current_operation { + Entry::Vacant(entry) => { + let Some(status) = status else { + debug!(topic = %entry.key(), "unexpected clearing message"); + return; + }; + + let context = Arc::clone(&self.context); + let handle = tokio::spawn(async move { context.update(message).await }); + + let running_operation = RunningOperation { handle, status }; + + entry.insert(running_operation); + } + + Entry::Occupied(entry) => { + let previous_status = entry.get().status.as_str(); + if status.as_ref().is_some_and(|s| *s == previous_status) { + debug!( + "already handling operation message with this topic and status, ignoring" + ); + return; + } - match operation_status { - OperationStatus::Ongoing(operation) => { - self.running_operations.insert(topic, operation); + // if handling a clearing message, wait for a task to finish + let Some(status) = status else { + let operation = entry.remove(); + operation + .handle + .await + .expect("operation task should not panic"); + return; + }; + + // we got a new status, check if it's not invalid and then await previous one and + // handle the new one + if !is_operation_status_transition_valid(previous_status, &status) { + warn!( + topic = %entry.key(), + previous = previous_status, + next = status, + "attempted invalid status transition, ignoring" + ); + return; + } + + let (key, operation) = entry.remove_entry(); + let context = Arc::clone(&self.context); + let handle = tokio::spawn(async move { + operation.handle.await.unwrap(); + context.update(message).await; + }); + let running_operation = RunningOperation { handle, status }; + self.running_operations.insert(key, running_operation); } - OperationStatus::Terminated => {} } } @@ -206,51 +260,27 @@ impl OperationHandler { topics } } - struct RunningOperation { handle: tokio::task::JoinHandle<()>, - tx: tokio::sync::mpsc::UnboundedSender, + status: String, } -impl RunningOperation { - /// Spawns a task that handles the operation. - /// - /// The task handles a single operation with a given command id, and via a channel it receives - /// operation state changes (if any) to drive an operation to completion. - fn spawn(operation: Arc) -> Self { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - - let handle = tokio::spawn(async move { - while let Some(message) = rx.recv().await { - if let UpdateStatus::Terminated = operation.update(message).await { - break; - } - } - }); +// TODO: logic of which status transitions are valid should be defined in tedge_api and be +// considered together with custom statuses of custom workflows +fn is_operation_status_transition_valid(previous: &str, next: &str) -> bool { + #[allow(clippy::match_like_matches_macro)] + match (previous, next) { + // not really a transition but false to make sure we're not sending multiple smartrest msgs + (prev, next) if prev == next => false, - Self { handle, tx } - } + // successful and failed are terminal, can't change them + ("successful", _) => false, + ("failed", _) => false, - /// Updates the operation with new state. - /// - /// Can keep an operation running, or terminate it. Returns an error if operation panicked. - async fn update(self, message: OperationMessage) -> Result { - let send_result = self.tx.send(message); - - if send_result.is_err() { - self.handle.await?; - Ok(OperationStatus::Terminated) - } else { - Ok(OperationStatus::Ongoing(self)) - } + _ => true, } } -enum OperationStatus { - Ongoing(RunningOperation), - Terminated, -} - #[cfg(test)] mod tests { use super::*; @@ -472,6 +502,230 @@ mod tests { assert_eq!(sut.running_operations.len(), 0); } + #[tokio::test] + async fn ignores_malformed_command_payloads() { + let TestHandle { + operation_handler: mut sut, + ttd: _ttd, + .. + } = setup_operation_handler(); + + let mqtt_schema = sut.context.mqtt_schema.clone(); + + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_target = EntityTarget { + topic_id: entity_topic_id.clone(), + external_id: EntityExternalId::from("anything"), + smartrest_publish_topic: Topic::new("anything").unwrap(), + }; + + let command_topic = mqtt_schema.topic_for( + &entity_topic_id, + &Channel::Command { + operation: OperationType::ConfigSnapshot, + cmd_id: "config-snapshot-1".to_string(), + }, + ); + + let invalid_command_message = MqttMessage::new(&command_topic, "invalid command payload"); + + sut.handle(entity_target, invalid_command_message).await; + + assert!(!sut + .running_operations + .contains_key(command_topic.name.as_str())); + } + + #[tokio::test] + async fn ignores_unexpected_clearing_messages() { + let TestHandle { + operation_handler: mut sut, + ttd: _ttd, + .. + } = setup_operation_handler(); + + let mqtt_schema = sut.context.mqtt_schema.clone(); + + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_target = EntityTarget { + topic_id: entity_topic_id.clone(), + external_id: EntityExternalId::from("anything"), + smartrest_publish_topic: Topic::new("anything").unwrap(), + }; + + let config_snapshot_operation = ConfigSnapshotCmd { + target: entity_topic_id, + cmd_id: "config-snapshot-1".to_string(), + payload: ConfigSnapshotCmdPayload { + status: CommandStatus::Executing, + tedge_url: Some("asdf".to_string()), + config_type: "typeA".to_string(), + path: None, + log_path: None, + }, + }; + let clearing_message = config_snapshot_operation.clearing_message(&mqtt_schema); + let clearing_message_topic = clearing_message.topic.name.clone(); + + sut.handle(entity_target, clearing_message).await; + + assert!(!sut + .running_operations + .contains_key(clearing_message_topic.as_str())); + } + + #[tokio::test] + async fn shouldnt_process_duplicate_messages() { + let TestHandle { + operation_handler: mut sut, + downloader: dl, + uploader: ul, + mqtt, + c8y_proxy, + ttd: _ttd, + .. + } = setup_operation_handler(); + + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + let _dl = dl.with_timeout(TEST_TIMEOUT_MS); + let _ul = ul.with_timeout(TEST_TIMEOUT_MS); + let _c8y_proxy = c8y_proxy.with_timeout(TEST_TIMEOUT_MS); + + let mqtt_schema = sut.context.mqtt_schema.clone(); + + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_target = EntityTarget { + topic_id: entity_topic_id.clone(), + external_id: EntityExternalId::from("anything"), + smartrest_publish_topic: Topic::new("anything").unwrap(), + }; + + let config_snapshot_operation = ConfigSnapshotCmd { + target: entity_topic_id, + cmd_id: "config-snapshot-1".to_string(), + payload: ConfigSnapshotCmdPayload { + status: CommandStatus::Executing, + tedge_url: Some("asdf".to_string()), + config_type: "typeA".to_string(), + path: None, + log_path: None, + }, + }; + + // check that if the same message is handled 3 times by mistake, we don't call process it multiple times + for _ in 0..3 { + sut.handle( + entity_target.clone(), + config_snapshot_operation.command_message(&mqtt_schema), + ) + .await; + } + + let smartrest_executing_message = mqtt.recv().await.unwrap(); + assert_eq!( + smartrest_executing_message.payload_str().unwrap(), + "501,c8y_UploadConfigFile" + ); + + assert_eq!( + mqtt.recv().await, + None, + "shouldn't receive duplicates of EXECUTING message" + ) + } + + #[tokio::test] + async fn shouldnt_process_invalid_status_transitions() { + tedge_config::system_services::set_log_level(tracing::Level::DEBUG); + let TestHandle { + operation_handler: mut sut, + ttd: _ttd, + .. + } = setup_operation_handler(); + + let mqtt_schema = sut.context.mqtt_schema.clone(); + + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_target = EntityTarget { + topic_id: entity_topic_id.clone(), + external_id: EntityExternalId::from("anything"), + smartrest_publish_topic: Topic::new("anything").unwrap(), + }; + + let failed_message = ConfigSnapshotCmd { + target: entity_topic_id.clone(), + cmd_id: "config-snapshot-1".to_string(), + payload: ConfigSnapshotCmdPayload { + status: CommandStatus::Failed { + reason: "test".to_string(), + }, + tedge_url: Some("asdf".to_string()), + config_type: "typeA".to_string(), + path: None, + log_path: None, + }, + }; + + let successful_message = ConfigSnapshotCmd { + target: entity_topic_id, + cmd_id: "config-snapshot-2".to_string(), + payload: ConfigSnapshotCmdPayload { + status: CommandStatus::Successful, + tedge_url: Some("asdf".to_string()), + config_type: "typeA".to_string(), + path: None, + log_path: None, + }, + }; + + let failed_message_mqtt = failed_message.command_message(&mqtt_schema); + let failed_topic = failed_message_mqtt.topic.name.as_str(); + sut.handle(entity_target.clone(), failed_message_mqtt.clone()) + .await; + assert_eq!( + &sut.running_operations.get(failed_topic).unwrap().status, + "failed" + ); + + let successful_message_mqtt = successful_message.command_message(&mqtt_schema); + let successful_topic = successful_message_mqtt.topic.name.as_str(); + sut.handle(entity_target.clone(), successful_message_mqtt.clone()) + .await; + assert_eq!( + &sut.running_operations + .get(successful_message_mqtt.topic.name.as_str()) + .unwrap() + .status, + "successful" + ); + + // status shouldn't change from successful/failed to executing + let executing_message = failed_message.with_status(CommandStatus::Executing); + sut.handle( + entity_target.clone(), + executing_message.command_message(&mqtt_schema), + ) + .await; + assert_eq!( + &sut.running_operations + .get(dbg!(failed_topic)) + .unwrap() + .status, + "failed" + ); + + let executing_message = successful_message.with_status(CommandStatus::Executing); + sut.handle( + entity_target.clone(), + executing_message.command_message(&mqtt_schema), + ) + .await; + assert_eq!( + &sut.running_operations.get(successful_topic).unwrap().status, + "successful" + ); + } + #[tokio::test] #[should_panic] async fn handle_should_panic_when_background_task_panics() { diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs index c16795f215..5bf7121dd2 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs @@ -61,7 +61,7 @@ pub(super) struct OperationContext { } impl OperationContext { - pub async fn update(&self, message: OperationMessage) -> UpdateStatus { + pub async fn update(&self, message: OperationMessage) { let OperationMessage { entity, cmd_id, @@ -74,7 +74,7 @@ impl OperationContext { Ok(command) => command, Err(err) => { error!(%err, ?message, "could not parse command payload"); - return UpdateStatus::Terminated; + return; } }; @@ -109,11 +109,11 @@ impl OperationContext { } } // command is not yet finished, avoid clearing the command topic - Ok(_) => return UpdateStatus::Ongoing, + Ok(_) => return, } clear_command_topic(command, &mut mqtt_publisher).await; - return UpdateStatus::Terminated; + return; } OperationType::SoftwareUpdate => { self.publish_software_update_status(&entity, &cmd_id, &message) @@ -152,7 +152,7 @@ impl OperationContext { c8y_operation, &entity.smartrest_publish_topic, ) { - OperationOutcome::Ignored => UpdateStatus::Ongoing, + OperationOutcome::Ignored => {} OperationOutcome::Executing { mut extra_messages } => { let c8y_state_executing_payload = set_operation_executing(c8y_operation); let c8y_state_executing_message = @@ -164,8 +164,6 @@ impl OperationContext { for message in messages { mqtt_publisher.send(message).await.unwrap(); } - - UpdateStatus::Ongoing } OperationOutcome::Finished { messages } => { if let Err(e) = self @@ -180,19 +178,11 @@ impl OperationContext { } clear_command_topic(command, &mut mqtt_publisher).await; - - UpdateStatus::Terminated } } } } -/// Whether or not this operation requires more messages to be handled or is it terminated. -pub enum UpdateStatus { - Ongoing, - Terminated, -} - async fn clear_command_topic( command: GenericCommandState, mqtt_publisher: &mut LoggingSender, @@ -276,6 +266,7 @@ fn to_c8y_operation(operation_type: &OperationType) -> Option