Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor control interface #345

Merged
merged 12 commits into from
Aug 15, 2024
6 changes: 3 additions & 3 deletions agent/doc/swdesign/drawio/unit_overview.drawio.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions agent/src/agent_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl AgentManager {

#[cfg(test)]
mod tests {
use super::*;
use super::RuntimeManager;
use crate::agent_manager::AgentManager;
use crate::workload_state::{
workload_state_store::{mock_parameter_storage_new_returns, MockWorkloadStateStore},
Expand All @@ -205,7 +205,7 @@ mod tests {
objects::{generate_test_workload_spec_with_param, ExecutionState},
to_server_interface::ToServer,
};
use mockall::predicate::*;
use mockall::predicate::eq;
use tokio::{join, sync::mpsc::channel};

const BUFFER_SIZE: usize = 20;
Expand Down
6 changes: 3 additions & 3 deletions agent/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use std::path::Path;

#[cfg_attr(test, mockall_double::double)]
use crate::control_interface::Directory;
use crate::control_interface::FileSystemError;
use crate::control_interface::directory::Directory;
use crate::control_interface::filesystem::FileSystemError;
use clap::Parser;
use common::DEFAULT_SERVER_ADDRESS;

Expand Down Expand Up @@ -89,7 +89,7 @@ pub fn parse() -> Arguments {
mod tests {
use common::DEFAULT_SERVER_ADDRESS;

use super::*;
use super::{Arguments, FileSystemError, Path, DEFAULT_RUN_FOLDER};
use crate::control_interface::generate_test_directory_mock;

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,44 @@
//
// SPDX-License-Identifier: Apache-2.0

pub mod authorizer;
pub mod control_interface_info;
mod control_interface_task;
pub mod directory;
mod fifo;
pub mod filesystem;
mod from_server_channels;
mod input_output;
mod reopen_file;
mod to_ankaios;

pub use to_ankaios::ToAnkaios;

#[cfg(test)]
pub use control_interface_info::MockControlInterfaceInfo;
#[cfg(test)]
pub use directory::generate_test_directory_mock;
#[cfg(test)]
pub use fifo::MockFifo;
#[cfg(test)]
pub use filesystem::MockFileSystem;

use common::objects::WorkloadInstanceName;
use common::{from_server_interface::FromServerSender, to_server_interface::ToServerSender};

#[cfg(test)]
use mockall::automock;

#[cfg_attr(test, mockall_double::double)]
use super::authorizer::Authorizer;
use authorizer::Authorizer;
#[cfg_attr(test, mockall_double::double)]
use super::input_output::InputOutput;
use control_interface_task::ControlInterfaceTask;
#[cfg_attr(test, mockall_double::double)]
use super::pipes_channel_task::PipesChannelTask;
use from_server_channels::FromServerChannels;
#[cfg_attr(test, mockall_double::double)]
use super::reopen_file::ReopenFile;
use input_output::InputOutput;
#[cfg_attr(test, mockall_double::double)]
use super::FromServerChannels;
use common::{from_server_interface::FromServerSender, to_server_interface::ToServerSender};
use reopen_file::ReopenFile;
use std::{
fmt::{self, Display},
path::{Path, PathBuf},
Expand All @@ -37,36 +59,36 @@ use std::{
use tokio::task::JoinHandle;

#[derive(Debug)]
pub enum PipesChannelContextError {
pub enum ControlInterfaceError {
CouldNotCreateFifo(String),
}

impl Display for PipesChannelContextError {
impl Display for ControlInterfaceError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
PipesChannelContextError::CouldNotCreateFifo(msg) => {
ControlInterfaceError::CouldNotCreateFifo(msg) => {
write!(f, "{msg:?}")
}
}
}
}

// [impl->swdd~agent-create-control-interface-pipes-per-workload~1]
pub struct PipesChannelContext {
pub struct ControlInterface {
pipes: InputOutput,
input_pipe_sender: FromServerSender,
task_handle: JoinHandle<()>,
authorizer: Arc<Authorizer>,
}

#[cfg_attr(test, automock)]
impl PipesChannelContext {
impl ControlInterface {
pub fn new(
run_directory: &Path,
execution_instance_name: &WorkloadInstanceName,
output_pipe_channel: ToServerSender,
authorizer: Authorizer,
) -> Result<Self, PipesChannelContextError> {
) -> Result<Self, ControlInterfaceError> {
// [impl->swdd~agent-control-interface-pipes-path-naming~1]
match InputOutput::new(execution_instance_name.pipes_folder_name(run_directory)) {
Ok(pipes) => {
Expand All @@ -77,10 +99,10 @@ impl PipesChannelContext {

let authorizer = Arc::new(authorizer);

Ok(PipesChannelContext {
Ok(ControlInterface {
pipes,
input_pipe_sender: input_pipe_channels.get_sender(),
task_handle: PipesChannelTask::new(
task_handle: ControlInterfaceTask::new(
output_stream,
input_stream,
input_pipe_channels.move_receiver(),
Expand All @@ -92,7 +114,7 @@ impl PipesChannelContext {
authorizer,
})
}
Err(e) => Err(PipesChannelContextError::CouldNotCreateFifo(e.to_string())),
Err(e) => Err(ControlInterfaceError::CouldNotCreateFifo(e.to_string())),
}
}

Expand All @@ -111,14 +133,14 @@ impl PipesChannelContext {
self.input_pipe_sender.clone()
}

pub fn abort_pipes_channel_task(&self) {
pub fn abort_control_interface_task(&self) {
self.task_handle.abort();
}
}

impl Drop for PipesChannelContext {
impl Drop for ControlInterface {
fn drop(&mut self) {
self.abort_pipes_channel_task()
self.abort_control_interface_task()
}
}

Expand All @@ -140,15 +162,18 @@ mod tests {
const CONFIG: &str = "config";

use crate::control_interface::{
generate_test_input_output_mock, generate_test_pipes_channel_task_mock, MockAuthorizer,
MockFromServerChannels, MockReopenFile, PipesChannelContext,
authorizer::MockAuthorizer,
control_interface_task::generate_test_control_interface_task_mock,
from_server_channels::MockFromServerChannels,
input_output::generate_test_input_output_mock, reopen_file::MockReopenFile,
ControlInterface,
};
use common::objects::WorkloadInstanceName;

// [utest->swdd~agent-create-control-interface-pipes-per-workload~1]
// [utest->swdd~agent-control-interface-pipes-path-naming~1]
#[tokio::test]
async fn utest_pipes_channel_context_get_api_location_returns_valid_location() {
async fn utest_control_interface_get_api_location_returns_valid_location() {
let _guard = crate::test_helper::MOCKALL_CONTEXT_SYNC
.get_lock_async()
.await;
Expand All @@ -173,9 +198,9 @@ mod tests {
mock
});

let _pipes_channel_task_mock = generate_test_pipes_channel_task_mock();
let _control_interface_task_mock = generate_test_control_interface_task_mock();

let pipes_channel_context = PipesChannelContext::new(
let control_interface = ControlInterface::new(
Path::new("api_pipes_location"),
&WorkloadInstanceName::builder()
.workload_name("workload_name_1")
Expand All @@ -187,7 +212,7 @@ mod tests {
.unwrap();

assert_eq!(
pipes_channel_context
control_interface
.get_api_location()
.as_os_str()
.to_string_lossy(),
Expand Down Expand Up @@ -223,9 +248,9 @@ mod tests {
mock
});

let _pipes_channel_task_mock = generate_test_pipes_channel_task_mock();
let _control_interface_task_mock = generate_test_control_interface_task_mock();

let pipes_channel_context = PipesChannelContext::new(
let control_interface = ControlInterface::new(
Path::new("api_pipes_location"),
&WorkloadInstanceName::builder()
.agent_name("workload_name_1")
Expand All @@ -236,7 +261,7 @@ mod tests {
)
.unwrap();

let _ = pipes_channel_context
let _ = control_interface
.get_input_pipe_sender()
.send(FromServer::UpdateWorkload(
common::commands::UpdateWorkload {
Expand All @@ -256,6 +281,6 @@ mod tests {
receiver.recv().await
);

pipes_channel_context.abort_pipes_channel_task();
control_interface.abort_control_interface_task();
}
}
6 changes: 4 additions & 2 deletions agent/src/control_interface/authorizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ impl Authorizer {
pub fn authorize(&self, request: &Request) -> bool {
match &request.request_content {
common::commands::RequestContent::CompleteStateRequest(r) => {
let default_mask = vec!["".into()];
let field_mask = if r.field_mask.is_empty() {
&vec!["".into()]
&default_mask
} else {
&r.field_mask
};
Expand Down Expand Up @@ -108,8 +109,9 @@ impl Authorizer {
})
}
common::commands::RequestContent::UpdateStateRequest(r) => {
let default_mask = vec!["".into()];
let update_mask: &Vec<_> = if r.update_mask.is_empty() {
&vec!["".into()]
&default_mask
} else {
&r.update_mask
};
Expand Down
Loading