From 58681ecb1e242b6bfe74e8cf30ae0406fd2396a4 Mon Sep 17 00:00:00 2001 From: Florian Wuermseer Date: Wed, 2 Aug 2023 21:48:06 +0200 Subject: [PATCH] feat: support an arbitrary amount of in/outputs --- Cargo.toml | 11 +- flow-derive/Cargo.toml | 13 - flowrs_derive/Cargo.toml | 21 ++ flowrs_derive/README.md | 11 + .../src/connectable.rs | 35 ++- {flow-derive => flowrs_derive}/src/lib.rs | 0 tests/lib.rs | 1 + tests/nodes/mod.rs | 1 + tests/nodes/node.rs | 279 ++++++++++++++++++ 9 files changed, 354 insertions(+), 18 deletions(-) delete mode 100644 flow-derive/Cargo.toml create mode 100644 flowrs_derive/Cargo.toml create mode 100644 flowrs_derive/README.md rename {flow-derive => flowrs_derive}/src/connectable.rs (70%) rename {flow-derive => flowrs_derive}/src/lib.rs (100%) create mode 100644 tests/lib.rs create mode 100644 tests/nodes/mod.rs create mode 100644 tests/nodes/node.rs diff --git a/Cargo.toml b/Cargo.toml index 68903c2..78d2f6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,13 +2,22 @@ name = "flowrs" version = "0.1.0" edition = "2021" +authors = ["wuermseer.florian@gmail.com", "markus.friedrich@hm.edu"] +description = "A generic and Type-Safe WASM library for Flow-Based Programming." +repository = "https://github.com/flow-rs/flowrs" +license = "Apache-2.0" +license-file = "LICENSE" +readme = "README.md" +documentation = "https://docs.rs/flowrs" +keywords = ["flow", "fbp", "wasm"] +categories = ["data-structures", "wasm"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] crate-type = ["cdylib", "rlib"] [dependencies] -flow-derive = { path = "flow-derive" } +flowrs_derive = { version = "0.1.0", path = "flowrs_derive" } serde = { version = "1.0.166", features = ["derive", "rc"] } serde_json = "1.0.100" threadpool = "1.8.1" diff --git a/flow-derive/Cargo.toml b/flow-derive/Cargo.toml deleted file mode 100644 index 5b47d8f..0000000 --- a/flow-derive/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "flow-derive" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -quote = "1.0.29" -syn = { version = "2.0.28", features = ["full"] } - -[lib] -proc-macro = true diff --git a/flowrs_derive/Cargo.toml b/flowrs_derive/Cargo.toml new file mode 100644 index 0000000..8435fc9 --- /dev/null +++ b/flowrs_derive/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "flowrs-derive" +version = "0.2.0" +edition = "2021" +authors = ["wuermseer.florian@gmail.com", "markus.friedrich@hm.edu"] +description = "A macro for connecting flowrs Nodes during runtime." +repository = "https://github.com/flow-rs/flowrs/tree/master/flow_derive" +license = "Apache-2.0" +readme = "README.md" +documentation = "https://docs.rs/flowrs_derive" +keywords = ["flow", "fbp", "wasm"] +categories = ["data-structures", "wasm"] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +quote = "1.0.29" +syn = { version = "2.0.28", features = ["full"] } + +[lib] +proc-macro = true diff --git a/flowrs_derive/README.md b/flowrs_derive/README.md new file mode 100644 index 0000000..e93d70b --- /dev/null +++ b/flowrs_derive/README.md @@ -0,0 +1,11 @@ +# Executing Tests + +All Rust internal test can be executed using: + +```sh +$ cargo test +``` + +# Contributing + +Please read our [Contribution Guidelines](https://github.com/flow-rs/flowrs/blob/master/CONTRIBUTING.md) first. diff --git a/flow-derive/src/connectable.rs b/flowrs_derive/src/connectable.rs similarity index 70% rename from flow-derive/src/connectable.rs rename to flowrs_derive/src/connectable.rs index 9b8d8cf..888991d 100644 --- a/flow-derive/src/connectable.rs +++ b/flowrs_derive/src/connectable.rs @@ -1,6 +1,6 @@ use core::panic; use proc_macro::TokenStream; -use syn::{Arm, DataStruct, DeriveInput, Field, Type}; +use syn::{Arm, DataStruct, DeriveInput, Field, Type, WherePredicate}; pub fn impl_connectable_trait(ast: DeriveInput) -> TokenStream { let struct_ident = ast.clone().ident; @@ -43,12 +43,12 @@ pub fn impl_connectable_trait(ast: DeriveInput) -> TokenStream { }) .collect::>(); let (_, ty_generics, _) = ast.generics.split_for_impl(); + let mut generic_bounds = get_generic_bounds(inputs.clone()); + generic_bounds.append(&mut get_generic_bounds(outputs)); quote::quote! { impl #ty_generics RuntimeConnectable for #struct_ident #ty_generics where - I1: Clone + 'static, - I2: Clone + 'static, - O: Clone + 'static, + #(#generic_bounds,)* { fn input_at(&self, index: usize) -> Rc { match index { @@ -68,6 +68,33 @@ pub fn impl_connectable_trait(ast: DeriveInput) -> TokenStream { .into() } +fn get_generic_bounds(fields: Vec) -> Vec { + fields + .iter() + .map(|f| match &f.ty { + Type::Path(path) => match &path.path.segments.first().unwrap().arguments { + syn::PathArguments::AngleBracketed(angle) => match angle.args.first().unwrap() { + syn::GenericArgument::Type(generic) => match generic { + Type::Path(t_path) => { + let ident = t_path.path.segments.first().unwrap().ident.clone(); + let cond: TokenStream = quote::quote! { + #ident: Clone + 'static + } + .into(); + let cond_ast: WherePredicate = syn::parse(cond.clone()).unwrap(); + cond_ast + } + _ => todo!(), + }, + _ => todo!(), + }, + _ => todo!(), + }, + _ => todo!(), + }) + .collect::>() +} + fn validate_struct_field(strct: DataStruct, ty: &str, mcro: &str) -> Vec { strct .fields diff --git a/flow-derive/src/lib.rs b/flowrs_derive/src/lib.rs similarity index 100% rename from flow-derive/src/lib.rs rename to flowrs_derive/src/lib.rs diff --git a/tests/lib.rs b/tests/lib.rs new file mode 100644 index 0000000..3aa91ec --- /dev/null +++ b/tests/lib.rs @@ -0,0 +1 @@ +mod nodes; \ No newline at end of file diff --git a/tests/nodes/mod.rs b/tests/nodes/mod.rs new file mode 100644 index 0000000..12e2c60 --- /dev/null +++ b/tests/nodes/mod.rs @@ -0,0 +1 @@ +pub mod node; \ No newline at end of file diff --git a/tests/nodes/node.rs b/tests/nodes/node.rs new file mode 100644 index 0000000..48c8f50 --- /dev/null +++ b/tests/nodes/node.rs @@ -0,0 +1,279 @@ +use std::any::Any; +use std::ops::Add; +use std::rc::Rc; + +use serde_json::Value; + +use flowrs::{ + connection::{Input, Output, RuntimeConnectable}, + node::{Context, Node, SequenceError, State, UpdateError}, +}; +use flowrs_derive::Connectable; + +#[derive(Clone)] +enum AddNodeState { + I1(I1), + I2(I2), + None, +} + +#[derive(Connectable)] +pub struct AddNode +where + I1: Clone, + I2: Clone, +{ + name: String, + state: State>, + _props: Value, + _context: State, + + #[input] + pub input_1: Input, + #[input] + pub input_2: Input, + #[output] + pub output_1: Output, +} + +impl AddNode +where + I1: Clone + Add + Send + 'static, + I2: Clone + Send + 'static, + O: Clone + Send + 'static, +{ + pub fn new(name: &str, context: State, props: Value) -> Self { + Self { + name: name.into(), + state: State::new(AddNodeState::None), + _props: props, + _context: context.clone(), + + input_1: Input::new(), + input_2: Input::new(), + output_1: Output::new(context.clone()), + } + } + + fn handle_1(&self, v: I1) -> Result<(), UpdateError> { + let mut state = self.state.0.lock().unwrap(); + match state.clone() { + AddNodeState::I1(_) => { + return Err(UpdateError::SequenceError(SequenceError { + node: self.name().into(), + message: "Addition should happen pairwise.".into(), + })) + } + AddNodeState::I2(i) => { + let out = v + i.clone(); + *state = AddNodeState::None; + let _ = self.output_1.clone().send(out); + } + AddNodeState::None => *state = AddNodeState::I1(v), + } + Ok(()) + } + + fn handle_2(&self, v: I2) -> Result<(), UpdateError> { + let mut state = self.state.0.lock().unwrap(); + match state.clone() { + AddNodeState::I2(_) => { + return Err(UpdateError::SequenceError(SequenceError { + node: self.name().into(), + message: "Addition should happen pairwise.".into(), + })) + } + AddNodeState::I1(i) => { + let out = i.clone() + v; + *state = AddNodeState::None; + let _ = self.output_1.clone().send(out); + } + AddNodeState::None => *state = AddNodeState::I2(v), + } + Ok(()) + } +} + +impl Node for AddNode +where + I1: Add + Clone + Send + 'static, + I2: Clone + Send + 'static, + O: Clone + Send + 'static, +{ + fn on_init(&self) {} + + fn on_ready(&self) {} + + fn on_shutdown(&self) {} + + fn name(&self) -> &str { + &self.name + } + + // To be replaced by macro + fn update(&self) -> Result<(), UpdateError> { + if let Ok(i1) = self.input_1.next_elem() { + println!("UPDATE1"); + self.handle_1(i1)?; + } + + if let Ok(i2) = self.input_2.next_elem() { + println!("UPDATE2"); + self.handle_2(i2)?; + } + Ok(()) + } +} + +#[cfg(test)] +mod nodes { + use std::{thread, rc::Rc, any::Any}; + + use flowrs::{connection::{ConnectError, Edge, connect, Input, RuntimeConnectable, Output}, node::{Context, State, Node}}; + use serde_json::Value; + + use super::AddNode; + + + + #[test] + fn should_add_132() -> Result<(), ConnectError> { + let context = State::new(Context::new()); + let add = AddNode::new("AddNodeI32", context, Value::Null); + let mock_output = Edge::new(); + connect(add.output_1.clone(), mock_output.clone()); + let _ = add.input_1.send(1); + let _ = add.input_2.send(2); + let _ = add.update(); + let _ = add.update(); + + let expected = 3; + let actual = mock_output.next_elem()?; + Ok(assert!(expected == actual)) + } + + /// Scenario: + /// + /// [0, 1, ..., 100] + /// \ + /// >---[100, 100, ..., 100] + /// / + /// [100, 99, ..., 0] + #[test] + fn should_add_multiple_132_sequentially() -> Result<(), ConnectError> { + let context = State::new(Context::new()); + let add = AddNode::new("AddNodeI32", context, Value::Null); + let mock_output = Edge::new(); + connect(add.output_1.clone(), mock_output.clone()); + (0..100).for_each(|int| { + let _ = add.input_1.send(int); + }); + (0..101).rev().for_each(|int| { + let _ = add.input_2.send(int); + }); + (0..100).for_each(|_| { + let _ = add.update(); + }); + let mut actual = vec![]; + for _ in 0..100 { + let curr = mock_output.next_elem()?; + actual.push(curr) + } + let exected = vec![100; 100]; + Ok(assert!( + exected == actual, + "expected was: {:?} while actual was {:?}", + exected, + actual + )) + } + + #[test] + fn should_add_multiple_132_parallel() -> Result<(), ConnectError> { + let context = State::new(Context::new()); + let add1 = AddNode::new("AddNodeI32", context.clone(), Value::Null); + let add2 = AddNode::new("AddNodeI32", context, Value::Null); + let mock_output = Edge::new(); + connect(add1.output_1.clone(), add2.input_1.clone()); + connect(add2.output_1.clone(), mock_output.clone()); + (0..100).for_each(|int| { + let _ = add1.input_1.send(int); + }); + (0..101).rev().for_each(|int| { + let _ = add1.input_2.send(int); + }); + (0..100).rev().for_each(|_| { + let _ = add2.input_2.send(1); + }); + + let handle1 = thread::spawn(move || { + (0..100).for_each(|_| { + match add1.update() { + Ok(_) => (), + Err(e) => println!("{:?}", e), + }; + }); + }); + let handle2 = thread::spawn(move || { + (0..100).for_each(|_| { + match add2.update() { + Ok(_) => (), + Err(e) => println!("{:?}", e), + }; + }); + }); + + handle1.join().unwrap(); + handle2.join().unwrap(); + + let mut actual = vec![]; + for _ in 0..100 { + let curr = mock_output.next_elem(); + actual.push(curr) + } + Ok(assert!(!actual.is_empty())) + } + + #[test] + fn should_return_lhs_at_runtime() { + let context = State::new(Context::new()); + let add: AddNode = AddNode::new("AddNodeI32", context.clone(), Value::Null); + let input1: Rc = add.input_at(0); + let input1_downcasted = input1.downcast::>(); + assert!(input1_downcasted.is_ok()) + } + + #[test] + fn should_return_rhs_at_runtime() { + let context = State::new(Context::new()); + let add: AddNode = AddNode::new("AddNodeI32", context.clone(), Value::Null); + let input1: Rc = add.input_at(1); + let input1_downcasted = input1.downcast::>(); + assert!(input1_downcasted.is_ok()) + } + + #[test] + fn should_return_output_at_runtime() { + let context = State::new(Context::new()); + let add: AddNode = AddNode::new("AddNodeI32", context.clone(), Value::Null); + let input1: Rc = add.output_at(0); + let input1_downcasted = input1.downcast::>(); + assert!(input1_downcasted.is_ok()) + } + + #[test] + #[should_panic(expected = "Index 2 out of bounds for AddNode with input len 2.")] + fn should_fail_on_index_out_of_bounds() { + let context = State::new(Context::new()); + let add: AddNode = AddNode::new("AddNodeI32", context.clone(), Value::Null); + add.input_at(2); + } + + #[test] + #[should_panic(expected = "Index 1 out of bounds for AddNode with output len 1.")] + fn should_fail_on_output_out_of_bounds() { + let context = State::new(Context::new()); + let add: AddNode = AddNode::new("AddNodeI32", context.clone(), Value::Null); + add.output_at(1); + } +} \ No newline at end of file