From de2cc45bae517446b14bc2c01e5eb24788f081c7 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 20 Aug 2024 18:36:29 +0200 Subject: [PATCH 01/11] Make the builtin action explicit This commit introduces no changes to the user. This is only a preparation step to replace builtin actions, which usage was very restricted and specific, with builtin operations that can be used as any other operations. Signed-off-by: Didier Wenzek --- .../src/operation_workflows/actor.rs | 15 +++- crates/core/tedge_api/src/workflow/mod.rs | 90 +++++++++++++++++-- crates/core/tedge_api/src/workflow/script.rs | 19 ++++ crates/core/tedge_api/src/workflow/state.rs | 23 ++++- .../tedge_api/src/workflow/toml_config.rs | 86 +++++++++++++++--- 5 files changed, 212 insertions(+), 21 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/actor.rs b/crates/core/tedge_agent/src/operation_workflows/actor.rs index 4a1999914e..e4c15f4d28 100644 --- a/crates/core/tedge_agent/src/operation_workflows/actor.rs +++ b/crates/core/tedge_agent/src/operation_workflows/actor.rs @@ -205,11 +205,20 @@ impl WorkflowActor { let new_state = state.move_to(next_step); self.publish_command_state(new_state, &mut log_file).await } - OperationAction::BuiltIn => { + OperationAction::BuiltIn(_, _) => { let step = &state.status; info!("Processing {operation} operation {step} step"); + + // TODO Honor the await and exit handlers Ok(self.command_dispatcher.send(state).await?) } + OperationAction::BuiltInOperation(ref builtin_op, _) => { + let step = &state.status; + info!("Executing builtin:{builtin_op} operation {step} step"); + + let builtin_state = action.adapt_builtin_request(state); + Ok(self.command_dispatcher.send(builtin_state).await?) + } OperationAction::AwaitingAgentRestart(handlers) => { let step = &state.status; info!("{operation} operation {step} waiting for agent restart"); @@ -380,6 +389,10 @@ impl WorkflowActor { &mut self, new_state: GenericCommandState, ) -> Result<(), RuntimeError> { + // TODO rewrite the command status + // depending the operation is executing, successful or failed + // set the new state using the user provided handlers. + if let Err(err) = self.workflows.apply_internal_update(new_state.clone()) { error!("Fail to persist workflow operation state: {err}"); } diff --git a/crates/core/tedge_api/src/workflow/mod.rs b/crates/core/tedge_api/src/workflow/mod.rs index 5754db26e9..0e67850b25 100644 --- a/crates/core/tedge_api/src/workflow/mod.rs +++ b/crates/core/tedge_api/src/workflow/mod.rs @@ -57,13 +57,18 @@ pub enum OperationAction { /// ``` MoveTo(GenericStateUpdate), - /// The built-in behavior is used + /// Implied built-in operation (for backward compatibility) + /// + /// - the operation name is derived from the workflow + /// - the step (trigger vs await) is derived from the command status (scheduled vs executing) /// /// ```toml /// action = "builtin" + /// on_exec = "" /// on_success = "" + /// on_error = "" /// ``` - BuiltIn, + BuiltIn(BgExitHandlers, AwaitHandlers), /// Await agent restart /// @@ -106,6 +111,14 @@ pub enum OperationAction { BgExitHandlers, ), + /// Trigger a built-in operation + /// + /// ```toml + /// operation = "" + /// on_exec = "" + /// ``` + BuiltInOperation(OperationName, BgExitHandlers), + /// Await the completion of a sub-operation /// /// The sub-operation is stored in the command state. @@ -140,7 +153,7 @@ impl Display for OperationAction { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let str = match self { OperationAction::MoveTo(step) => format!("move to {step} state"), - OperationAction::BuiltIn => "builtin".to_string(), + OperationAction::BuiltIn(_, _) => "builtin action".to_string(), OperationAction::AwaitingAgentRestart { .. } => "await agent restart".to_string(), OperationAction::Script(script, _) => script.to_string(), OperationAction::BgScript(script, _) => script.to_string(), @@ -151,6 +164,9 @@ impl Display for OperationAction { script ), }, + OperationAction::BuiltInOperation(operation, _) => { + format!("execute builtin:{operation}") + } OperationAction::AwaitOperationCompletion { .. } => { "await sub-operation completion".to_string() } @@ -212,10 +228,22 @@ impl OperationWorkflow { /// Create a built-in operation workflow pub fn built_in(operation: OperationType) -> Self { + let operation_name = operation.to_string(); + let exec_handler = BgExitHandlers::builtin_default(); + let await_handler = AwaitHandlers::builtin_default(); let states = [ ("init", OperationAction::MoveTo("scheduled".into())), - ("scheduled", OperationAction::BuiltIn), - ("executing", OperationAction::BuiltIn), + ( + "scheduled", + OperationAction::BuiltInOperation(operation_name.clone(), exec_handler), + ), + ( + "executing", + OperationAction::AwaitOperationCompletion( + await_handler, + StateExcerpt::whole_payload(), + ), + ), ("successful", OperationAction::Clear), ("failed", OperationAction::Clear), ] @@ -384,6 +412,58 @@ impl OperationAction { Ok(new_state) } + + /// Rewrite a command state before pushing it to a builtin operation actor + /// + /// Depending the action is to trigger or await the operation, + /// set the status to schedule or executing. + /// + /// Return the command state unchanged if there is no appropriate substitute. + pub fn adapt_builtin_request(&self, command_state: GenericCommandState) -> GenericCommandState { + match self { + OperationAction::BuiltInOperation(_, _) => command_state.update("scheduled".into()), + _ => command_state, + } + } + + /// Rewrite the command state returned by a builtin operation actor + /// + /// Depending the operation is executing, successful or failed, + /// set the new state using the user provided handlers + /// + /// Return the command state unchanged if there is no appropriate handlers. + pub fn adapt_builtin_response( + &self, + command_state: GenericCommandState, + ) -> GenericCommandState { + match self { + OperationAction::BuiltIn(exec_handlers, _) + | OperationAction::BuiltInOperation(_, exec_handlers) + if command_state.is_executing() => + { + command_state.update(exec_handlers.on_exec.clone()) + } + OperationAction::BuiltIn(_, await_handlers) + | OperationAction::AwaitOperationCompletion(await_handlers, _) + if command_state.is_successful() => + { + command_state.update(await_handlers.on_success.clone()) + } + OperationAction::BuiltIn(_, await_handlers) + | OperationAction::AwaitOperationCompletion(await_handlers, _) + if command_state.is_failed() => + { + let mut on_error = await_handlers.on_error.clone(); + if on_error.reason.is_none() { + if let Some(builtin_reason) = command_state.failure_reason() { + on_error.reason = Some(builtin_reason.to_string()); + } + } + command_state.update(on_error) + } + _ => command_state, + } + } } #[derive(thiserror::Error, Debug, Eq, PartialEq)] diff --git a/crates/core/tedge_api/src/workflow/script.rs b/crates/core/tedge_api/src/workflow/script.rs index 3ed09d9a57..417dead63d 100644 --- a/crates/core/tedge_api/src/workflow/script.rs +++ b/crates/core/tedge_api/src/workflow/script.rs @@ -311,6 +311,14 @@ impl BgExitHandlers { } } +impl BgExitHandlers { + pub fn builtin_default() -> Self { + BgExitHandlers { + on_exec: GenericStateUpdate::executing(), + } + } +} + /// Define how to await the completion of a command #[derive(Clone, Debug, Eq, PartialEq)] pub struct AwaitHandlers { @@ -320,6 +328,17 @@ pub struct AwaitHandlers { pub on_timeout: GenericStateUpdate, } +impl AwaitHandlers { + pub fn builtin_default() -> Self { + AwaitHandlers { + timeout: None, + on_success: GenericStateUpdate::successful(), + on_error: GenericStateUpdate::unknown_error(), + on_timeout: GenericStateUpdate::timeout(), + } + } +} + /// Define state transition on each iteration outcome #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct IterateHandlers { diff --git a/crates/core/tedge_api/src/workflow/state.rs b/crates/core/tedge_api/src/workflow/state.rs index cac75fc8f9..bfc2afdbf2 100644 --- a/crates/core/tedge_api/src/workflow/state.rs +++ b/crates/core/tedge_api/src/workflow/state.rs @@ -64,6 +64,7 @@ pub struct GenericStateUpdate { const STATUS: &str = "status"; const INIT: &str = "init"; +const EXECUTING: &str = "executing"; const SUCCESSFUL: &str = "successful"; const FAILED: &str = "failed"; const REASON: &str = "reason"; @@ -421,15 +422,19 @@ impl GenericCommandState { } pub fn is_init(&self) -> bool { - matches!(self.status.as_str(), INIT) + self.status.as_str() == INIT + } + + pub fn is_executing(&self) -> bool { + self.status.as_str() == EXECUTING } pub fn is_successful(&self) -> bool { - matches!(self.status.as_str(), SUCCESSFUL) + self.status.as_str() == SUCCESSFUL } pub fn is_failed(&self) -> bool { - matches!(self.status.as_str(), FAILED) + self.status.as_str() == FAILED } pub fn is_finished(&self) -> bool { @@ -450,6 +455,13 @@ impl GenericStateUpdate { json!({STATUS: INIT}) } + pub fn executing() -> Self { + GenericStateUpdate { + status: EXECUTING.to_string(), + reason: None, + } + } + pub fn successful() -> Self { GenericStateUpdate { status: SUCCESSFUL.to_string(), @@ -583,6 +595,11 @@ pub enum StateExcerpt { } impl StateExcerpt { + /// Excerpt returning the whole payload of a command state + pub fn whole_payload() -> Self { + StateExcerpt::PathExpr("${.}".to_string()) + } + /// Extract a JSON value from the input state pub fn extract_value_from(&self, input: &GenericCommandState) -> Value { match self { diff --git a/crates/core/tedge_api/src/workflow/toml_config.rs b/crates/core/tedge_api/src/workflow/toml_config.rs index 457e70fc56..1e6c2faaa0 100644 --- a/crates/core/tedge_api/src/workflow/toml_config.rs +++ b/crates/core/tedge_api/src/workflow/toml_config.rs @@ -131,17 +131,26 @@ impl TryFrom<(TomlOperationState, DefaultHandlers)> for OperationAction { let handlers = TryInto::::try_into((input.handlers, defaults))?; Ok(OperationAction::BgScript(script, handlers)) } - TomlOperationAction::Operation(operation) => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; - let input_script = input.input_script; - let cmd_input = input.input.try_into()?; - Ok(OperationAction::Operation( - operation, - input_script, - cmd_input, - handlers, - )) - } + TomlOperationAction::Operation(operation) => match operation.strip_prefix("builtin:") { + None => { + let handlers = TryInto::::try_into((input.handlers, defaults))?; + let input_script = input.input_script; + let cmd_input = input.input.try_into()?; + Ok(OperationAction::Operation( + operation, + input_script, + cmd_input, + handlers, + )) + } + Some(builtin_operation_name) => { + let handlers = TryInto::::try_into((input.handlers, defaults))?; + Ok(OperationAction::BuiltInOperation( + builtin_operation_name.to_string(), + handlers, + )) + } + }, TomlOperationAction::Iterate(target_json_path) => { let handlers = TryInto::::try_into((input.handlers, defaults))?; let Some(json_path) = GenericCommandState::extract_path(&target_json_path) else { @@ -152,7 +161,6 @@ impl TryFrom<(TomlOperationState, DefaultHandlers)> for OperationAction { Ok(OperationAction::Iterate(json_path.to_string(), handlers)) } TomlOperationAction::Action(command) => match command.as_str() { - "builtin" => Ok(OperationAction::BuiltIn), "cleanup" => Ok(OperationAction::Clear), "proceed" => { let on_success: GenericStateUpdate = input @@ -173,6 +181,17 @@ impl TryFrom<(TomlOperationState, DefaultHandlers)> for OperationAction { handlers, cmd_output, )) } + "builtin" => { + let exec_handlers = TryInto::::try_into(( + input.handlers.clone(), + BgExitHandlers::builtin_default(), + ))?; + let await_handlers = TryInto::::try_into(( + input.handlers, + AwaitHandlers::builtin_default(), + ))?; + Ok(OperationAction::BuiltIn(exec_handlers, await_handlers)) + } _ => Err(WorkflowDefinitionError::UnknownAction { action: command }), }, } @@ -304,6 +323,17 @@ impl TryFrom<(TomlExitHandlers, DefaultHandlers)> for BgExitHandlers { } } +impl TryFrom<(TomlExitHandlers, BgExitHandlers)> for BgExitHandlers { + type Error = ScriptDefinitionError; + + fn try_from( + (value, defaults): (TomlExitHandlers, BgExitHandlers), + ) -> Result { + let on_exec = value.on_exec.map(|u| u.into()).or(Some(defaults.on_exec)); + BgExitHandlers::try_new(on_exec) + } +} + impl TryFrom<(TomlExitHandlers, DefaultHandlers)> for AwaitHandlers { type Error = ScriptDefinitionError; @@ -336,6 +366,38 @@ impl TryFrom<(TomlExitHandlers, DefaultHandlers)> for AwaitHandlers { } } +impl TryFrom<(TomlExitHandlers, AwaitHandlers)> for AwaitHandlers { + type Error = ScriptDefinitionError; + + fn try_from( + (handlers, defaults): (TomlExitHandlers, AwaitHandlers), + ) -> Result { + let timeout = handlers + .timeout_second + .map(Duration::from_secs) + .or(defaults.timeout); + let on_success: GenericStateUpdate = handlers + .on_success + .map(|u| u.into()) + .unwrap_or(defaults.on_success); + let on_error = handlers + .on_error + .map(|u| u.into()) + .unwrap_or(defaults.on_error); + let on_timeout = handlers + .on_timeout + .map(|u| u.into()) + .unwrap_or(defaults.on_timeout); + + Ok(AwaitHandlers { + timeout, + on_success, + on_error, + on_timeout, + }) + } +} + impl TryFrom<(TomlExitHandlers, DefaultHandlers)> for IterateHandlers { type Error = WorkflowDefinitionError; From 3a3a7d7a90747444b02ee53ab35e920b9ddfe0fa Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Thu, 22 Aug 2024 12:16:39 +0200 Subject: [PATCH 02/11] Clarify name and role of workflow processing methods Signed-off-by: Didier Wenzek --- .../src/operation_workflows/actor.rs | 39 ++++++++++++------- .../src/operation_workflows/builder.rs | 2 +- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/actor.rs b/crates/core/tedge_agent/src/operation_workflows/actor.rs index e4c15f4d28..a57893a76c 100644 --- a/crates/core/tedge_agent/src/operation_workflows/actor.rs +++ b/crates/core/tedge_agent/src/operation_workflows/actor.rs @@ -51,7 +51,7 @@ pub struct WorkflowActor { pub(crate) state_repository: AgentStateRepository, pub(crate) log_dir: Utf8PathBuf, pub(crate) input_receiver: UnboundedLoggingReceiver, - pub(crate) command_dispatcher: CommandDispatcher, + pub(crate) builtin_command_dispatcher: CommandDispatcher, pub(crate) command_sender: DynSender, pub(crate) mqtt_publisher: LoggingSender, pub(crate) script_runner: ClientMessageBox>, @@ -73,16 +73,15 @@ impl Actor for WorkflowActor { self.process_mqtt_message(message).await?; } AgentInput::InternalCommandState(InternalCommandState(command_state)) => { - self.process_internal_state_update(command_state).await?; + self.process_command_update(command_state).await?; } AgentInput::GenericCommandData(GenericCommandData::State(new_state)) => { - self.process_command_state_update(new_state).await?; + self.process_builtin_command_update(new_state).await?; } AgentInput::GenericCommandData(GenericCommandData::Metadata( GenericCommandMetadata { operation, payload }, )) => { - self.publish_operation_capability(operation, payload) - .await?; + self.publish_builtin_capability(operation, payload).await?; } } } @@ -101,7 +100,7 @@ impl WorkflowActor { Ok(()) } - async fn publish_operation_capability( + async fn publish_builtin_capability( &mut self, operation: OperationName, payload: serde_json::Value, @@ -117,6 +116,11 @@ impl WorkflowActor { Ok(()) } + /// Process a command update received from MQTT + /// + /// Beware, these updates are coming from external components (the mapper inits and clears commands), + /// but also from *this* actor as all its state transitions are published over MQTT. + /// Only the former will be actually processed with [Self::process_command_update]. async fn process_mqtt_message(&mut self, message: MqttMessage) -> Result<(), RuntimeError> { let Ok((operation, cmd_id)) = self.extract_command_identifiers(&message.topic.name) else { log::error!("Unknown command channel: {}", &message.topic.name); @@ -136,7 +140,7 @@ impl WorkflowActor { Ok(Some(new_state)) => { self.persist_command_board().await?; if new_state.is_init() { - self.process_internal_state_update(new_state.set_log_path(&log_file.path)) + self.process_command_update(new_state.set_log_path(&log_file.path)) .await?; } } @@ -152,7 +156,13 @@ impl WorkflowActor { Ok(()) } - async fn process_internal_state_update( + /// Process a command state update taking any action as defined by the workflow + /// + /// A new state can be received: + /// - from MQTT as for init and clear messages + /// - from the engine itself when a progress is made + /// - from one of the builtin operation actors + async fn process_command_update( &mut self, state: GenericCommandState, ) -> Result<(), RuntimeError> { @@ -210,14 +220,14 @@ impl WorkflowActor { info!("Processing {operation} operation {step} step"); // TODO Honor the await and exit handlers - Ok(self.command_dispatcher.send(state).await?) + Ok(self.builtin_command_dispatcher.send(state).await?) } OperationAction::BuiltInOperation(ref builtin_op, _) => { let step = &state.status; info!("Executing builtin:{builtin_op} operation {step} step"); let builtin_state = action.adapt_builtin_request(state); - Ok(self.command_dispatcher.send(builtin_state).await?) + Ok(self.builtin_command_dispatcher.send(builtin_state).await?) } OperationAction::AwaitingAgentRestart(handlers) => { let step = &state.status; @@ -385,7 +395,10 @@ impl WorkflowActor { } } - async fn process_command_state_update( + /// Pre-process an update received from a builtin operation actor + /// + /// The actual work will be done by [Self::process_command_update]. + async fn process_builtin_command_update( &mut self, new_state: GenericCommandState, ) -> Result<(), RuntimeError> { @@ -400,7 +413,7 @@ impl WorkflowActor { self.mqtt_publisher .send(new_state.clone().into_message()) .await?; - self.process_internal_state_update(new_state).await + self.process_command_update(new_state).await } fn open_command_log( @@ -453,7 +466,7 @@ impl WorkflowActor { match self.state_repository.load().await { Ok(Some(pending_commands)) => { for command in self.workflows.load_pending_commands(pending_commands) { - self.process_internal_state_update(command.clone()).await?; + self.process_command_update(command.clone()).await?; } } Ok(None) => {} diff --git a/crates/core/tedge_agent/src/operation_workflows/builder.rs b/crates/core/tedge_agent/src/operation_workflows/builder.rs index 7fa0d7029d..1d4d846ace 100644 --- a/crates/core/tedge_agent/src/operation_workflows/builder.rs +++ b/crates/core/tedge_agent/src/operation_workflows/builder.rs @@ -137,7 +137,7 @@ impl Builder for WorkflowActorBuilder { state_repository: repository, log_dir: self.config.log_dir, input_receiver: self.input_receiver, - command_dispatcher: self.command_dispatcher, + builtin_command_dispatcher: self.command_dispatcher, mqtt_publisher: self.mqtt_publisher, command_sender: self.command_sender, script_runner: self.script_runner, From 1aee7a79d31f78dd2dd2798214c38774d98b416e Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Thu, 22 Aug 2024 16:03:47 +0200 Subject: [PATCH 03/11] Use user provider exec and await handlers for builtin actions Signed-off-by: Didier Wenzek --- .../src/operation_workflows/actor.rs | 42 +++++++++++++------ .../src/operation_workflows/tests.rs | 12 +++--- crates/core/tedge_api/src/workflow/mod.rs | 7 ++-- crates/core/tedge_api/src/workflow/state.rs | 21 ++++++++++ .../core/tedge_api/src/workflow/supervisor.rs | 29 ++++++++++++- .../workflows/custom_operation.robot | 12 +++--- .../tedge_agent/workflows/software_list.toml | 10 +++-- 7 files changed, 101 insertions(+), 32 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/actor.rs b/crates/core/tedge_agent/src/operation_workflows/actor.rs index a57893a76c..b2cb34c111 100644 --- a/crates/core/tedge_agent/src/operation_workflows/actor.rs +++ b/crates/core/tedge_agent/src/operation_workflows/actor.rs @@ -219,14 +219,20 @@ impl WorkflowActor { let step = &state.status; info!("Processing {operation} operation {step} step"); - // TODO Honor the await and exit handlers Ok(self.builtin_command_dispatcher.send(state).await?) } - OperationAction::BuiltInOperation(ref builtin_op, _) => { + OperationAction::BuiltInOperation(ref builtin_op, ref handlers) => { let step = &state.status; info!("Executing builtin:{builtin_op} operation {step} step"); - let builtin_state = action.adapt_builtin_request(state); + // Fork a builtin state + let builtin_state = action.adapt_builtin_request(state.clone()); + + // Move to the next state to await the builtin operation outcome + let new_state = state.update(handlers.on_exec.clone()); + self.publish_command_state(new_state, &mut log_file).await?; + + // Forward the command to the builtin operation actor Ok(self.builtin_command_dispatcher.send(builtin_state).await?) } OperationAction::AwaitingAgentRestart(handlers) => { @@ -326,7 +332,7 @@ impl WorkflowActor { } OperationAction::AwaitOperationCompletion(handlers, output_excerpt) => { let step = &state.status; - info!("{operation} operation {step} waiting for sub-operation completion"); + info!("{operation} operation {step}: waiting for sub-operation completion"); // Get the sub-operation state and resume this command when the sub-operation is in a terminal state if let Some(sub_state) = self @@ -368,8 +374,6 @@ impl WorkflowActor { )) .await; } - } else { - log_file.log_info("=> sub-operation not yet launched").await; }; Ok(()) @@ -402,18 +406,32 @@ impl WorkflowActor { &mut self, new_state: GenericCommandState, ) -> Result<(), RuntimeError> { - // TODO rewrite the command status - // depending the operation is executing, successful or failed - // set the new state using the user provided handlers. + if new_state.is_finished() { + self.finalize_builtin_command_update(new_state).await + } else { + // As not finalized, the builtin state is sent back + // to the builtin operation actor for further processing. + let builtin_state = new_state.clone(); + Ok(self.builtin_command_dispatcher.send(builtin_state).await?) + } + } - if let Err(err) = self.workflows.apply_internal_update(new_state.clone()) { + /// Finalize a builtin operation + /// + /// Moving to the next step calling [Self::process_command_update]. + async fn finalize_builtin_command_update( + &mut self, + new_state: GenericCommandState, + ) -> Result<(), RuntimeError> { + let adapted_state = self.workflows.adapt_builtin_response(new_state); + if let Err(err) = self.workflows.apply_internal_update(adapted_state.clone()) { error!("Fail to persist workflow operation state: {err}"); } self.persist_command_board().await?; self.mqtt_publisher - .send(new_state.clone().into_message()) + .send(adapted_state.clone().into_message()) .await?; - self.process_command_update(new_state).await + self.process_command_update(adapted_state).await } fn open_command_log( diff --git a/crates/core/tedge_agent/src/operation_workflows/tests.rs b/crates/core/tedge_agent/src/operation_workflows/tests.rs index c2e6b8060d..ab8cd39dc4 100644 --- a/crates/core/tedge_agent/src/operation_workflows/tests.rs +++ b/crates/core/tedge_agent/src/operation_workflows/tests.rs @@ -204,13 +204,13 @@ async fn convert_outgoing_software_list_response() -> Result<(), DynError> { SoftwareListCommand::new(&EntityTopicId::default_main_device(), "1234".to_string()); let software_list_response = software_list_request .clone() - .with_status(CommandStatus::Executing); + .with_status(CommandStatus::Successful); software_box.send(software_list_response.into()).await?; mqtt_box .assert_received([MqttMessage::new( &Topic::new_unchecked("te/device/main///cmd/software_list/1234"), - r#"{"status":"executing"}"#, + r#"{"status":"successful"}"#, ) .with_retain()]) .await; @@ -286,13 +286,13 @@ async fn convert_outgoing_software_update_response() -> Result<(), DynError> { // Simulate SoftwareUpdate response message received. let software_update_request = SoftwareUpdateCommand::new(&EntityTopicId::default_main_device(), "1234".to_string()); - let software_update_response = software_update_request.with_status(CommandStatus::Executing); + let software_update_response = software_update_request.with_status(CommandStatus::Successful); software_box.send(software_update_response.into()).await?; mqtt_box .assert_received([MqttMessage::new( &Topic::new_unchecked("te/device/main///cmd/software_update/1234"), - r#"{"status":"executing"}"#, + r#"{"status":"successful"}"#, ) .with_retain()]) .await; @@ -325,7 +325,7 @@ async fn convert_outgoing_restart_response() -> Result<(), DynError> { let executing_response = RestartCommand { target: EntityTopicId::default_main_device(), cmd_id: "abc".to_string(), - payload: RestartCommandPayload::new(CommandStatus::Executing), + payload: RestartCommandPayload::new(CommandStatus::Successful), }; restart_box.send(executing_response).await?; @@ -335,7 +335,7 @@ async fn convert_outgoing_restart_response() -> Result<(), DynError> { .map(|msg| (msg.topic, msg.payload)) .expect("MqttMessage"); assert_eq!(topic.name, "te/device/main///cmd/restart/abc"); - assert!(format!("{:?}", payload).contains(r#"status":"executing"#)); + assert!(format!("{:?}", payload).contains(r#"status":"successful"#)); Ok(()) } diff --git a/crates/core/tedge_api/src/workflow/mod.rs b/crates/core/tedge_api/src/workflow/mod.rs index 0e67850b25..5ba95bb802 100644 --- a/crates/core/tedge_api/src/workflow/mod.rs +++ b/crates/core/tedge_api/src/workflow/mod.rs @@ -415,13 +415,12 @@ impl OperationAction { /// Rewrite a command state before pushing it to a builtin operation actor /// - /// Depending the action is to trigger or await the operation, - /// set the status to schedule or executing. - /// /// Return the command state unchanged if there is no appropriate substitute. pub fn adapt_builtin_request(&self, command_state: GenericCommandState) -> GenericCommandState { match self { - OperationAction::BuiltInOperation(_, _) => command_state.update("scheduled".into()), + OperationAction::BuiltInOperation(_, _) => { + command_state.update(GenericStateUpdate::scheduled()) + } _ => command_state, } } diff --git a/crates/core/tedge_api/src/workflow/state.rs b/crates/core/tedge_api/src/workflow/state.rs index bfc2afdbf2..ccfd7fdd4c 100644 --- a/crates/core/tedge_api/src/workflow/state.rs +++ b/crates/core/tedge_api/src/workflow/state.rs @@ -64,6 +64,7 @@ pub struct GenericStateUpdate { const STATUS: &str = "status"; const INIT: &str = "init"; +const SCHEDULED: &str = "scheduled"; const EXECUTING: &str = "executing"; const SUCCESSFUL: &str = "successful"; const FAILED: &str = "failed"; @@ -207,6 +208,19 @@ impl GenericCommandState { self.update_with_json(json_update) } + /// Merge this state into a more complete state overriding all values defined both side + pub fn merge_into(self, mut state: Self) -> Self { + state.status = self.status; + if let Some(properties) = state.payload.as_object_mut() { + if let Value::Object(new_properties) = self.payload { + for (key, value) in new_properties.into_iter() { + properties.insert(key, value); + } + } + } + state + } + /// Update the command state with a new status describing the next state pub fn move_to(mut self, update: GenericStateUpdate) -> Self { let status = update.status; @@ -455,6 +469,13 @@ impl GenericStateUpdate { json!({STATUS: INIT}) } + pub fn scheduled() -> Self { + GenericStateUpdate { + status: SCHEDULED.to_string(), + reason: None, + } + } + pub fn executing() -> Self { GenericStateUpdate { status: EXECUTING.to_string(), diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index 4221e6b8d7..2d18317170 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -131,6 +131,33 @@ impl WorkflowSupervisor { self.commands.get_state(command).map(|(_, state)| state) } + /// Rewrite the command state returned by a builtin operation actor + /// + /// Depending the operation is executing, successful or failed, + /// set the new state using the user provided handlers + /// + /// This method also takes care of the fact that the builtin operations + /// only return the state properties they care about. + /// Hence the command state is merged into the persisted state of the command. + /// + /// Return the command state unchanged if there is an error or no appropriate handlers. + pub fn adapt_builtin_response( + &self, + command_state: GenericCommandState, + ) -> GenericCommandState { + let command_id = &command_state.topic; + if let Some(current_state) = self.get_state(command_id.as_ref()) { + let new_state = command_state.merge_into(current_state.clone()); + if let Ok(current_action) = self.get_action(current_state) { + return current_action.adapt_builtin_response(new_state); + } else { + return new_state; + } + }; + + command_state + } + /// Return the state of the invoking command of a command, if any pub fn invoking_command_state( &self, @@ -165,8 +192,6 @@ impl WorkflowSupervisor { } /// Update the state of the command board on reception of new state for a command - /// - /// Return the next CommandRequest state if any is required. pub fn apply_internal_update( &mut self, new_command_state: GenericCommandState, diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot b/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot index ab0079739f..61685dc32d 100644 --- a/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot +++ b/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot @@ -26,15 +26,17 @@ Trigger Custom Download Operation Should Be Equal ${actual_log} ${expected_log} Override Built-In Operation - Execute Command tedge mqtt pub --retain te/device/main///cmd/software_list/robot-456 '{"status":"init"}' + Execute Command tedge mqtt pub --retain te/device/main///cmd/software_list/robot-456 '{"status":"init"}' ${software_list} Should Have MQTT Messages ... te/device/main///cmd/software_list/robot-456 ... message_pattern=.*successful.* ... maximum=1 - Should Contain ${software_list[0]} "currentSoftwareList" - Should Contain ${software_list[0]} "mosquitto" - Should Contain ${software_list[0]} "tedge" - Execute Command tedge mqtt pub --retain te/device/main///cmd/software_list/robot-456 '' + Should Contain ${software_list[0]} "currentSoftwareList" + Should Contain ${software_list[0]} "mosquitto" + Should Contain ${software_list[0]} "tedge" + Should Contain ${software_list[0]} "postprocess" + Should Contain ${software_list[0]} "done" + Execute Command tedge mqtt pub --retain te/device/main///cmd/software_list/robot-456 '' Trigger Device Restart Using A Sub-Command [Documentation] To detect if the device has been rebooted, a marker file is created in the /run directory diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/software_list.toml b/tests/RobotFramework/tests/tedge_agent/workflows/software_list.toml index 0b78617ba9..25d91c6cac 100644 --- a/tests/RobotFramework/tests/tedge_agent/workflows/software_list.toml +++ b/tests/RobotFramework/tests/tedge_agent/workflows/software_list.toml @@ -5,11 +5,15 @@ script = "/etc/tedge/operations/init-software-list.sh ${.topic}" # The json outp on_stdout = ["scheduled"] [scheduled] -action = "builtin" -on_success = "executing" +operation = "builtin:software_list" +on_exec = "executing" [executing] -action = "builtin" +action = "await-operation-completion" +on_success = "postprocess" + +[postprocess] +script = "/etc/tedge/operations/echo-as-json.sh postprocess done" on_success = "successful" [successful] From 55a2ceb7f459c3cce9f84ddbf90663b3e91d243c Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Thu, 22 Aug 2024 16:08:48 +0200 Subject: [PATCH 04/11] Rename BgExitHandlers -> ExecHandlers The new name is more appropriate, now that these handlers are used not only for background scripts but also to trigger a sub operation or a builtin action. Signed-off-by: Didier Wenzek --- crates/core/tedge_api/src/workflow/mod.rs | 10 ++++---- crates/core/tedge_api/src/workflow/script.rs | 12 +++++----- .../tedge_api/src/workflow/toml_config.rs | 24 +++++++++---------- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/crates/core/tedge_api/src/workflow/mod.rs b/crates/core/tedge_api/src/workflow/mod.rs index 5ba95bb802..73015278ee 100644 --- a/crates/core/tedge_api/src/workflow/mod.rs +++ b/crates/core/tedge_api/src/workflow/mod.rs @@ -68,7 +68,7 @@ pub enum OperationAction { /// on_success = "" /// on_error = "" /// ``` - BuiltIn(BgExitHandlers, AwaitHandlers), + BuiltIn(ExecHandlers, AwaitHandlers), /// Await agent restart /// @@ -94,7 +94,7 @@ pub enum OperationAction { /// background_script = "sudo systemctl restart tedge-agent" /// on_exec = "" /// ``` - BgScript(ShellScript, BgExitHandlers), + BgScript(ShellScript, ExecHandlers), /// Trigger an operation and move to the next state from where the outcome of the operation will be awaited /// @@ -108,7 +108,7 @@ pub enum OperationAction { OperationName, Option, StateExcerpt, - BgExitHandlers, + ExecHandlers, ), /// Trigger a built-in operation @@ -117,7 +117,7 @@ pub enum OperationAction { /// operation = "" /// on_exec = "" /// ``` - BuiltInOperation(OperationName, BgExitHandlers), + BuiltInOperation(OperationName, ExecHandlers), /// Await the completion of a sub-operation /// @@ -229,7 +229,7 @@ impl OperationWorkflow { /// Create a built-in operation workflow pub fn built_in(operation: OperationType) -> Self { let operation_name = operation.to_string(); - let exec_handler = BgExitHandlers::builtin_default(); + let exec_handler = ExecHandlers::builtin_default(); let await_handler = AwaitHandlers::builtin_default(); let states = [ ("init", OperationAction::MoveTo("scheduled".into())), diff --git a/crates/core/tedge_api/src/workflow/script.rs b/crates/core/tedge_api/src/workflow/script.rs index 417dead63d..3fa4a2a80e 100644 --- a/crates/core/tedge_api/src/workflow/script.rs +++ b/crates/core/tedge_api/src/workflow/script.rs @@ -297,23 +297,23 @@ fn extract_script_output(stdout: String) -> Option { None } -/// Define how to handle a background script +/// Define how to handle background scripts and actions #[derive(Clone, Debug, Default, Eq, PartialEq)] -pub struct BgExitHandlers { +pub struct ExecHandlers { pub on_exec: GenericStateUpdate, } -impl BgExitHandlers { +impl ExecHandlers { pub fn try_new(on_exec: Option) -> Result { - Ok(BgExitHandlers { + Ok(ExecHandlers { on_exec: on_exec.unwrap_or_else(GenericStateUpdate::successful), }) } } -impl BgExitHandlers { +impl ExecHandlers { pub fn builtin_default() -> Self { - BgExitHandlers { + ExecHandlers { on_exec: GenericStateUpdate::executing(), } } diff --git a/crates/core/tedge_api/src/workflow/toml_config.rs b/crates/core/tedge_api/src/workflow/toml_config.rs index 1e6c2faaa0..c8f5fcf59b 100644 --- a/crates/core/tedge_api/src/workflow/toml_config.rs +++ b/crates/core/tedge_api/src/workflow/toml_config.rs @@ -1,7 +1,7 @@ use crate::mqtt_topics::OperationType; use crate::workflow::AwaitHandlers; -use crate::workflow::BgExitHandlers; use crate::workflow::DefaultHandlers; +use crate::workflow::ExecHandlers; use crate::workflow::ExitHandlers; use crate::workflow::GenericCommandState; use crate::workflow::GenericStateUpdate; @@ -128,12 +128,12 @@ impl TryFrom<(TomlOperationState, DefaultHandlers)> for OperationAction { Ok(OperationAction::Script(script, handlers)) } TomlOperationAction::BackgroundScript(script) => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; + let handlers = TryInto::::try_into((input.handlers, defaults))?; Ok(OperationAction::BgScript(script, handlers)) } TomlOperationAction::Operation(operation) => match operation.strip_prefix("builtin:") { None => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; + let handlers = TryInto::::try_into((input.handlers, defaults))?; let input_script = input.input_script; let cmd_input = input.input.try_into()?; Ok(OperationAction::Operation( @@ -144,7 +144,7 @@ impl TryFrom<(TomlOperationState, DefaultHandlers)> for OperationAction { )) } Some(builtin_operation_name) => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; + let handlers = TryInto::::try_into((input.handlers, defaults))?; Ok(OperationAction::BuiltInOperation( builtin_operation_name.to_string(), handlers, @@ -182,9 +182,9 @@ impl TryFrom<(TomlOperationState, DefaultHandlers)> for OperationAction { )) } "builtin" => { - let exec_handlers = TryInto::::try_into(( + let exec_handlers = TryInto::::try_into(( input.handlers.clone(), - BgExitHandlers::builtin_default(), + ExecHandlers::builtin_default(), ))?; let await_handlers = TryInto::::try_into(( input.handlers, @@ -312,25 +312,23 @@ impl TryFrom<(TomlExitHandlers, DefaultHandlers)> for ExitHandlers { } } -impl TryFrom<(TomlExitHandlers, DefaultHandlers)> for BgExitHandlers { +impl TryFrom<(TomlExitHandlers, DefaultHandlers)> for ExecHandlers { type Error = ScriptDefinitionError; fn try_from( (value, _defaults): (TomlExitHandlers, DefaultHandlers), ) -> Result { let on_exec = value.on_exec.map(|u| u.into()); - BgExitHandlers::try_new(on_exec) + ExecHandlers::try_new(on_exec) } } -impl TryFrom<(TomlExitHandlers, BgExitHandlers)> for BgExitHandlers { +impl TryFrom<(TomlExitHandlers, ExecHandlers)> for ExecHandlers { type Error = ScriptDefinitionError; - fn try_from( - (value, defaults): (TomlExitHandlers, BgExitHandlers), - ) -> Result { + fn try_from((value, defaults): (TomlExitHandlers, ExecHandlers)) -> Result { let on_exec = value.on_exec.map(|u| u.into()).or(Some(defaults.on_exec)); - BgExitHandlers::try_new(on_exec) + ExecHandlers::try_new(on_exec) } } From a699181b1a40fde16fa201a27858f108b140d2db Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Fri, 23 Aug 2024 12:32:30 +0200 Subject: [PATCH 05/11] Improve documentation of builtin actions in a workflow Signed-off-by: Didier Wenzek --- .../references/agent/operation-workflow.md | 116 ++++++++++++++---- 1 file changed, 89 insertions(+), 27 deletions(-) diff --git a/docs/src/references/agent/operation-workflow.md b/docs/src/references/agent/operation-workflow.md index 815dff9018..8c4c9e7cd1 100644 --- a/docs/src/references/agent/operation-workflow.md +++ b/docs/src/references/agent/operation-workflow.md @@ -464,21 +464,20 @@ The set of accepted handlers for an action are the following: - `timeout_second = 3600` the number of second given to the action to execute - `on_timeout = { status = "", reason = ""}` defines the next state when the action is not be completed within the time limit -For some action, notably a device `restart`, the handlers are limited to one: -- `on_exec = ""` defines the next state once the action has been launched in the background. - The action outcome will have to be observed in this `on_exec` state. - Currently, here are the available actions: - `await-agent-restart` awaits for **tedge-agent** to restart - `await-operation-completion` awaits for a sub-operation to reach a success, failure or timeout -- `builtin` is used when a builtin operation is overwritten by a custom workflow and indicates that for that state - the builtin action has to be applied. - `proceed` is a no-op action, simply proceeding to the next state, which is useful when a builtin operation is customized but no specific behavior has to be added on a workflow extension point. - `cleanup` marks the terminal state of the workflow where the command has been fully processed and where the original requester is expected to clean up the command retained message storing its state. - +- ( *deprecated* ) `builtin` is used when a builtin operation is overwritten by a custom workflow and indicates that for that state + the builtin action has to be applied. For backward compatibility, this keyword is rewritten by the agent + as a combination of `operation = "builtin:"` and `action = "await-operation-completion"`. +- ( *deprecated* ) `restart` trigger a device restart. For backward compatibility, this keyword is rewritten by the agent + as a combination of `operation = "restart"` and `action = "await-operation-completion"`. + #### Awaiting the agent to restart When the expected outcome of a script is to restart the device or the agent, @@ -555,62 +554,125 @@ action = "cleanup" action = "cleanup" ``` -#### Proceed and Builtin actions +#### Proceed -The `"proceed"` and `"builtin"` actions are useful when customizing a builtin operation -(`software_list`, `software_update`, `restart`, `config_snapshot`, `config_update`, `log_upload`). -Indeed, the first step is to start with a workflow specification which mimics the builtin behavior. +`proceed` simply let the command proceeds to the next state. -For instance, here is the builtin workflow for the `software_update` operation: +Adding such a no-op step helps to later customize the workflow +without changing the observable sequence of steps a command has to go through. + +This is notably used by all the builtin operations that proceed from the *init* state to the *scheduled* state: ```toml -operation = "software_update" # an operation for which tedge-agent provides an implementation +[init] +action = "proceed" +on_success = "scheduled" +``` + +Adding some pre-processing step is then simply done by replacing the `proceed` action with something more specific: + +```toml +[init] +script = "/usr/bin/check-if-operation-is-timely.sh ${.}" +on_success = "scheduled" +on_error = { status = "failed", reason = "not timely" } +``` + +### 🚧 Customizing builtin operations + +:::info +🚧 The syntax for customizing builtin workflows is still being finalized so please avoid using it in production environments. +::: + +__tedge-agent__ supports out-of-the-box a set of so-called builtin operations: +`software_list`, `software_update`, `restart`, `config_snapshot`, `config_update`, `log_upload`. + +The workflows of these builtin operations can be customized. + +For each, there are *two* workflows: an internal workflow and a customized version of the former. +- The `builtin:` operation workflow describes the builtin behavior as provided by __tedge-agent__. +- The `` operation workflow rules the actual behavior of __tedge-agent__, + possibly invoking `builtin:` behind the scene. +- When a user or a mapper triggers a command over MQTT for that ``, + by publishing an init message on `te/+/+/+/+/cmd/builtin:/+`, + __tedge_agent__ uses the customized version of the workflow for that operation. + - From MQTT, there is no way to trigger directly the builtin version of an operation, + i.e. there is no `te/+/+/+/+/cmd/builtin:/+` command topic. + - The builtin versions are only internal triggered by the agent from a customized workflow. +- On a fresh install of %%te%%, the `` is simply a copy of `builtin:`. + - This copy is not materialized on the file system, but created in memory by the agent when no customized version is found. +- A customized version for an `` is provided as a TOML workflow definition file in `/etc/tedge/operations` + for that operation, as for any user provided workflow. + - By convention, this file is named after the operation name, as in `/etc/tedge/operations/software_update.toml`. + However, this is not mandatory: the operation is determined by the `operation` property, e.g. `operation = "software_update"`. + - If this definition is valid, then it will be used by the agent as the `` workflow. +- The customized version of an `` can invoke its builtin version + with a step triggering `operation = "builtin:"`. + - A customized workflow can also be a complete rework of the feature, ignoring the builtin behavior. +- A user-provided workflow can invoke any builtin operations as well as the custom versions. + - `operation = ""` triggers the custom-version of the operation workflow. + - `operation = "builtin:"` triggers the operation ignoring any customization. + +In order to customize a builtin operation, the first step is to materialize its definition in `/etc/tedge/operations`. +For instance, here is the builtin workflow for the `software_update` operation: + +```toml title="/etc/tedge/operations/software_update.toml" +operation = "software_update" # any builtion operation can be customized ["init"] -action = "proceed" # open to customization +action = "proceed" # open to customization on_success = "scheduled" [scheduled] -action = "builtin" # delegated to the tedge-agent -on_success = "executing" +operation = "builtin:software_update" # trigger the built-in behavior for software update +on_exec = "executing" [executing] -action = "builtin" # delegated to the tedge-agent +action = "await-operation-completion" # awaiting the builtin operation to complete on_success = "successful" [successful] -action = "cleanup" # waiting for the mapper to clean up the command +action = "cleanup" # waiting for the mapper to clean up the command [failed] -action = "cleanup" # waiting for the mapper to clean up the command +action = "cleanup" # waiting for the mapper to clean up the command ``` The action for the `"init"` state is a `"proceed"` action, meaning nothing specific is done by the __tedge-agent__ and that a user can provide its own implementation. -By contrast, the actions marked as `"builtin"` are those delegated to the __tedge-agent__ -and where the main task of the operation is performed, in that case, installing software. +By contrast, for the `scheduled` and `executing` states, the work is delegated to the __tedge-agent__ +and this is where the main task of the operation is performed, in that case, installing software. Here is a customized version of the same operation. -```toml +```toml title="/etc/tedge/operations/software_update.toml" operation = "software_update" # a customized workflow [init] -script = "/usr/bin/schedule-software-update.sh ${.}" # checking is the software update command is timely -on_success = ["scheduled"] +script = "/usr/bin/schedule-software-update.sh ${.}" # one can override any `proceed` action - here with a checking step +on_success = "scheduled" on_error = { status = "failed", reason = "not timely" } [scheduled] -action = "builtin" # the software installation steps are unchanged +operation = "builtin:software_update" # trigger the built-in behavior for software update on_success = "executing" [executing] -action = "builtin" +action = "await-operation-completion" # awaiting the builtin operation to complete +on_success = "commit" # with customized chaining rules +on_error = "rollback" + +[commit] +script = "/usr/bin/commit-software-update.sh ${.}" # one can add extra steps - here a commit step on_success = "successful" +[rollback] +script = "/usr/bin/rollback-software-update.sh ${.}" # one can add extra steps - here a rollback step +on_success = "failed" + [successful] -action = "cleanup" +action = "cleanup" # terminal steps cannot be changed [failed] action = "cleanup" From f273967391fd41b2e4636cde9293ea20b2b98d6d Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 27 Aug 2024 10:29:26 +0200 Subject: [PATCH 06/11] Simplify operation workflow test Signed-off-by: Didier Wenzek --- .../tests/tedge_agent/workflows/custom_operation.robot | 1 - .../tests/tedge_agent/workflows/init-software-list.sh | 8 -------- .../tests/tedge_agent/workflows/software_list.toml | 2 +- 3 files changed, 1 insertion(+), 10 deletions(-) delete mode 100755 tests/RobotFramework/tests/tedge_agent/workflows/init-software-list.sh diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot b/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot index 61685dc32d..50600c2e03 100644 --- a/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot +++ b/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot @@ -198,7 +198,6 @@ Custom Setup Copy Configuration Files ThinEdgeIO.Transfer To Device ${CURDIR}/software_list.toml /etc/tedge/operations/ - ThinEdgeIO.Transfer To Device ${CURDIR}/init-software-list.sh /etc/tedge/operations/ ThinEdgeIO.Transfer To Device ${CURDIR}/custom-download.toml /etc/tedge/operations/ ThinEdgeIO.Transfer To Device ${CURDIR}/schedule-download.sh /etc/tedge/operations/ ThinEdgeIO.Transfer To Device ${CURDIR}/launch-download.sh /etc/tedge/operations/ diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/init-software-list.sh b/tests/RobotFramework/tests/tedge_agent/workflows/init-software-list.sh deleted file mode 100755 index 6a2c754dcd..0000000000 --- a/tests/RobotFramework/tests/tedge_agent/workflows/init-software-list.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/sh -set -e - -echo new software list request topic = "$1" >>/tmp/operations.log - -echo ':::begin-tedge:::' -echo '{ "status":"scheduled" }' -echo ':::end-tedge:::' diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/software_list.toml b/tests/RobotFramework/tests/tedge_agent/workflows/software_list.toml index 25d91c6cac..37309240ee 100644 --- a/tests/RobotFramework/tests/tedge_agent/workflows/software_list.toml +++ b/tests/RobotFramework/tests/tedge_agent/workflows/software_list.toml @@ -1,7 +1,7 @@ operation = "software_list" # A built in operation can be overridden [init] -script = "/etc/tedge/operations/init-software-list.sh ${.topic}" # The json output of the script is used for the next step +script = "/etc/tedge/operations/echo-as-json.sh status scheduled" on_stdout = ["scheduled"] [scheduled] From 104a318e6db531bc30bfc8605d9614765d8b2781 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Wed, 4 Sep 2024 14:41:40 +0200 Subject: [PATCH 07/11] Remove unused methods Signed-off-by: Didier Wenzek --- crates/core/tedge_api/src/commands.rs | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/crates/core/tedge_api/src/commands.rs b/crates/core/tedge_api/src/commands.rs index 5a7ffacc2c..32c91857b4 100644 --- a/crates/core/tedge_api/src/commands.rs +++ b/crates/core/tedge_api/src/commands.rs @@ -731,23 +731,6 @@ pub enum CommandStatus { Unknown, } -impl CommandStatus { - pub fn is_terminal_status(&self) -> bool { - matches!( - self, - CommandStatus::Successful | CommandStatus::Failed { reason: _ } - ) - } - - pub fn is_successful(&self) -> bool { - *self == CommandStatus::Successful - } - - pub fn is_failed(&self) -> bool { - matches!(self, CommandStatus::Failed { reason: _ }) - } -} - fn default_failure_reason() -> String { "Unknown reason".to_string() } From dcd9812f0584eda9afe9ad6749dbe881f2913a96 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Fri, 6 Sep 2024 14:36:22 +0200 Subject: [PATCH 08/11] A builtin:operation can only be invoked from the same operation Signed-off-by: Didier Wenzek --- crates/core/tedge_api/src/workflow/error.rs | 6 ++++++ crates/core/tedge_api/src/workflow/mod.rs | 16 ++++++++++++++++ docs/src/references/agent/operation-workflow.md | 5 ++--- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/crates/core/tedge_api/src/workflow/error.rs b/crates/core/tedge_api/src/workflow/error.rs index 6cd8291ba4..c8545fe54e 100644 --- a/crates/core/tedge_api/src/workflow/error.rs +++ b/crates/core/tedge_api/src/workflow/error.rs @@ -20,6 +20,12 @@ pub enum WorkflowDefinitionError { #[error("The provided target {0} is not a valid path expression")] InvalidPathExpression(String), + + #[error("The `builtin:{builtin_operation}` cannot be invoked from `{main_operation}`, but only from `{builtin_operation}`")] + InvalidBuiltinOperation { + main_operation: String, + builtin_operation: String, + }, } /// Error related to a script definition diff --git a/crates/core/tedge_api/src/workflow/mod.rs b/crates/core/tedge_api/src/workflow/mod.rs index 73015278ee..baf5cd92d3 100644 --- a/crates/core/tedge_api/src/workflow/mod.rs +++ b/crates/core/tedge_api/src/workflow/mod.rs @@ -218,6 +218,22 @@ impl OperationWorkflow { }); } + let main_operation = operation.to_string(); + for (_, action) in states.iter() { + match action { + // A `builtin:` can only be invoked from the same `` + OperationAction::BuiltInOperation(builtin_operation, _) + if builtin_operation != &main_operation => + { + return Err(WorkflowDefinitionError::InvalidBuiltinOperation { + main_operation, + builtin_operation: builtin_operation.clone(), + }) + } + _ => continue, + } + } + Ok(OperationWorkflow { operation, built_in: false, diff --git a/docs/src/references/agent/operation-workflow.md b/docs/src/references/agent/operation-workflow.md index 8c4c9e7cd1..9ad9e8b1c1 100644 --- a/docs/src/references/agent/operation-workflow.md +++ b/docs/src/references/agent/operation-workflow.md @@ -609,9 +609,8 @@ For each, there are *two* workflows: an internal workflow and a customized versi - The customized version of an `` can invoke its builtin version with a step triggering `operation = "builtin:"`. - A customized workflow can also be a complete rework of the feature, ignoring the builtin behavior. -- A user-provided workflow can invoke any builtin operations as well as the custom versions. - - `operation = ""` triggers the custom-version of the operation workflow. - - `operation = "builtin:"` triggers the operation ignoring any customization. + - However, the builtin behavior can only be invoked from the workflow for the same operation + (e.g. `builtin:sofware_update` can only be invoked from `sofware_update`). In order to customize a builtin operation, the first step is to materialize its definition in `/etc/tedge/operations`. For instance, here is the builtin workflow for the `software_update` operation: From 31f6c8c587e8225c527d05e93a420fbcae687291 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Fri, 6 Sep 2024 15:59:36 +0200 Subject: [PATCH 09/11] Check that builtin actions are still supported even if deprecated Signed-off-by: Didier Wenzek --- .../tests/cumulocity/log/workflow/log_upload.sh | 9 +++++++++ .../tests/cumulocity/log/workflow/log_upload.toml | 4 ++++ 2 files changed, 13 insertions(+) diff --git a/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.sh b/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.sh index b5db88102a..b7c78614fc 100755 --- a/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.sh +++ b/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.sh @@ -29,6 +29,12 @@ Parameters: EOT } +postprocess_sqlite() { + LOG_TYPE="$1" + TMP_LOG_FILE=/tmp/${LOG_TYPE}.log + rm -f "$TMP_LOG_FILE" +} + # # Main # @@ -52,6 +58,9 @@ case "$COMMAND" in ;; esac ;; + postprocess) + postprocess_sqlite "$@" + ;; esac exit 0 diff --git a/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.toml b/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.toml index 5fa1fbb475..7b4f7dc030 100644 --- a/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.toml +++ b/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.toml @@ -20,6 +20,10 @@ on_error = "failed" [executing] action = "builtin" + on_success = "postprocess" # on_success & on_error can be customized for builtin actions + +[postprocess] + script = "/usr/bin/log_upload.sh ${.payload.status}" on_success = "successful" [successful] From c5adbe44feb5abca46424e25e0030138fc420205 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 10 Sep 2024 10:45:48 +0200 Subject: [PATCH 10/11] A builtin operation can be triggered on the executing step Signed-off-by: Didier Wenzek --- .../workflows/custom_operation.robot | 8 +++++ .../workflows/software_update.toml | 29 +++++++++++++++++++ 2 files changed, 37 insertions(+) create mode 100644 tests/RobotFramework/tests/tedge_agent/workflows/software_update.toml diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot b/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot index 50600c2e03..40cfe84d78 100644 --- a/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot +++ b/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot @@ -38,6 +38,13 @@ Override Built-In Operation Should Contain ${software_list[0]} "done" Execute Command tedge mqtt pub --retain te/device/main///cmd/software_list/robot-456 '' +Override Built-In Operation Executing Step + # Trigger a software update using a custom software-update workflow with rollbacks + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/software_update/test-builtin-executing-step '{"status":"init","updateList":[{"type":"apt","modules":[{"name":"broken-package","version":"latest","action":"install"}]}]}' + Should Have MQTT Messages te/device/main///cmd/software_update/test-builtin-executing-step message_pattern=.*rollback.* minimum=1 + Execute Command tedge mqtt pub --retain te/device/main///cmd/software_update/test-builtin-executing-step '' + Trigger Device Restart Using A Sub-Command [Documentation] To detect if the device has been rebooted, a marker file is created in the /run directory ... which should be deleted when the device is restarted @@ -198,6 +205,7 @@ Custom Setup Copy Configuration Files ThinEdgeIO.Transfer To Device ${CURDIR}/software_list.toml /etc/tedge/operations/ + ThinEdgeIO.Transfer To Device ${CURDIR}/software_update.toml /etc/tedge/operations/ ThinEdgeIO.Transfer To Device ${CURDIR}/custom-download.toml /etc/tedge/operations/ ThinEdgeIO.Transfer To Device ${CURDIR}/schedule-download.sh /etc/tedge/operations/ ThinEdgeIO.Transfer To Device ${CURDIR}/launch-download.sh /etc/tedge/operations/ diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/software_update.toml b/tests/RobotFramework/tests/tedge_agent/workflows/software_update.toml new file mode 100644 index 0000000000..bd280f3f2b --- /dev/null +++ b/tests/RobotFramework/tests/tedge_agent/workflows/software_update.toml @@ -0,0 +1,29 @@ +operation = "software_update" + +[init] +action = "proceed" +on_success = "scheduled" + +[scheduled] +action = "proceed" +on_success = "executing" + +[executing] +operation = "builtin:software_update" # trigger the built-in behavior for software update +on_exec = "await_operation" + +[await_operation] +action = "await-operation-completion" # awaiting the builtin operation to complete +on_success = "successful" +on_error = "rollback" + +[rollback] +script = "/etc/tedge/operations/echo-as-json.sh rollback done" +on_success = "failed" +on_error = "failed" + +[successful] +action = "cleanup" + +[failed] +action = "cleanup" From 7ad7db48b4ce1bf4e600d24c29684f87c0129c52 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Wed, 11 Sep 2024 09:50:21 +0200 Subject: [PATCH 11/11] Use T::try_from instead of TryInto::::try_into Signed-off-by: Didier Wenzek --- .../tedge_api/src/workflow/toml_config.rs | 37 +++++++++---------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/crates/core/tedge_api/src/workflow/toml_config.rs b/crates/core/tedge_api/src/workflow/toml_config.rs index c8f5fcf59b..d9ae45f210 100644 --- a/crates/core/tedge_api/src/workflow/toml_config.rs +++ b/crates/core/tedge_api/src/workflow/toml_config.rs @@ -124,16 +124,16 @@ impl TryFrom<(TomlOperationState, DefaultHandlers)> for OperationAction { ) -> Result { match input.action { TomlOperationAction::Script(script) => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; + let handlers = ExitHandlers::try_from((input.handlers, defaults))?; Ok(OperationAction::Script(script, handlers)) } TomlOperationAction::BackgroundScript(script) => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; + let handlers = ExecHandlers::try_from((input.handlers, defaults))?; Ok(OperationAction::BgScript(script, handlers)) } TomlOperationAction::Operation(operation) => match operation.strip_prefix("builtin:") { None => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; + let handlers = ExecHandlers::try_from((input.handlers, defaults))?; let input_script = input.input_script; let cmd_input = input.input.try_into()?; Ok(OperationAction::Operation( @@ -144,7 +144,7 @@ impl TryFrom<(TomlOperationState, DefaultHandlers)> for OperationAction { )) } Some(builtin_operation_name) => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; + let handlers = ExecHandlers::try_from((input.handlers, defaults))?; Ok(OperationAction::BuiltInOperation( builtin_operation_name.to_string(), handlers, @@ -152,7 +152,7 @@ impl TryFrom<(TomlOperationState, DefaultHandlers)> for OperationAction { } }, TomlOperationAction::Iterate(target_json_path) => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; + let handlers = IterateHandlers::try_from((input.handlers, defaults))?; let Some(json_path) = GenericCommandState::extract_path(&target_json_path) else { return Err(WorkflowDefinitionError::InvalidPathExpression( target_json_path, @@ -171,22 +171,22 @@ impl TryFrom<(TomlOperationState, DefaultHandlers)> for OperationAction { Ok(OperationAction::MoveTo(on_success)) } "await-agent-restart" => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; + let handlers = AwaitHandlers::try_from((input.handlers, defaults))?; Ok(OperationAction::AwaitingAgentRestart(handlers)) } "await-operation-completion" => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; + let handlers = AwaitHandlers::try_from((input.handlers, defaults))?; let cmd_output = input.output.try_into()?; Ok(OperationAction::AwaitOperationCompletion( handlers, cmd_output, )) } "builtin" => { - let exec_handlers = TryInto::::try_into(( + let exec_handlers = ExecHandlers::try_from(( input.handlers.clone(), ExecHandlers::builtin_default(), ))?; - let await_handlers = TryInto::::try_into(( + let await_handlers = AwaitHandlers::try_from(( input.handlers, AwaitHandlers::builtin_default(), ))?; @@ -203,11 +203,10 @@ impl TryFrom for OperationWorkflow { fn try_from(input: TomlOperationWorkflow) -> Result { let operation = input.operation; - let default_handlers = TryInto::::try_into(input.handlers)?; + let default_handlers = DefaultHandlers::try_from(input.handlers)?; let mut states = HashMap::new(); for (state, action_spec) in input.states.into_iter() { - let action = - TryInto::::try_into((action_spec, default_handlers.clone()))?; + let action = OperationAction::try_from((action_spec, default_handlers.clone()))?; states.insert(state, action); } @@ -585,7 +584,7 @@ on_exit.0 = "0" on_success = "success" "#; let input: TomlExitHandlers = toml::from_str(file).unwrap(); - let error = TryInto::::try_into(input).unwrap_err(); + let error = ExitHandlers::try_from(input).unwrap_err(); assert_eq!(error, ScriptDefinitionError::DuplicatedOnSuccessHandler) } @@ -596,7 +595,7 @@ on_exit._ = "wildcard" on_error = "error" "#; let input: TomlExitHandlers = toml::from_str(file).unwrap(); - let error = TryInto::::try_into(input).unwrap_err(); + let error = ExitHandlers::try_from(input).unwrap_err(); assert_eq!(error, ScriptDefinitionError::DuplicatedOnErrorHandler) } @@ -607,7 +606,7 @@ on_exit.1-5 = "1-5" on_exit.4-8 = "4-8" "#; let input: TomlExitHandlers = toml::from_str(file).unwrap(); - let error = TryInto::::try_into(input).unwrap_err(); + let error = ExitHandlers::try_from(input).unwrap_err(); assert_eq!( error, ScriptDefinitionError::OverlappingHandler { @@ -623,7 +622,7 @@ on_exit.4-8 = "4-8" on_exit.5-1 = "oops" "#; let input: TomlExitHandlers = toml::from_str(file).unwrap(); - let error = TryInto::::try_into(input).unwrap_err(); + let error = ExitHandlers::try_from(input).unwrap_err(); assert_eq!( error, ScriptDefinitionError::IncorrectRange { from: 5, to: 1 } @@ -637,7 +636,7 @@ on_success = "successful_state" on_stdout = ["other_successful_state_extracted_from_json"] "#; let input: TomlExitHandlers = toml::from_str(file).unwrap(); - let error = TryInto::::try_into(input).unwrap_err(); + let error = ExitHandlers::try_from(input).unwrap_err(); assert_eq!(error, ScriptDefinitionError::DuplicatedOnStdoutHandler) } @@ -648,7 +647,7 @@ on_exit.0 = "successful_state" on_stdout = ["other_successful_state_extracted_from_json"] "#; let input: TomlExitHandlers = toml::from_str(file).unwrap(); - let error = TryInto::::try_into(input).unwrap_err(); + let error = ExitHandlers::try_from(input).unwrap_err(); assert_eq!(error, ScriptDefinitionError::DuplicatedOnStdoutHandler) } @@ -656,7 +655,7 @@ on_stdout = ["other_successful_state_extracted_from_json"] fn default_handlers() { let file = ""; let input: TomlExitHandlers = toml::from_str(file).unwrap(); - let handlers = TryInto::::try_into(input).unwrap(); + let handlers = ExitHandlers::try_from(input).unwrap(); assert_eq!(handlers.state_update_on_success().status, "successful"); assert_eq!( handlers.state_update_on_exit("foo.sh", 1).reason.unwrap(),