Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized flow node execution by only trying to execute flow nodes that have incoming tokens #13

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions core/src/bpmn/collaboration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,20 @@ impl Collaboration {
panic!("No process found for snapshot with id \"{}\"", snapshot.id)
}
Some(process) => {
// TODO: Would be nice to only try to execute flow nodes that have incoming tokens/messages. But currently sfs are contained in flow nodes not in the process itself.
for flow_node in process.flow_nodes.iter() {
// Only try to execute flow nodes that have incoming tokens.
let mut flow_node_indexes: Vec<&usize> = snapshot
.tokens
.iter()
.filter_map(|(&token_position, _)| {
process.sequence_flow_index.get(token_position)
})
.collect();
flow_node_indexes.sort();
flow_node_indexes.dedup(); // Do not try to execute a flow node twice.
for flow_node in flow_node_indexes
.iter()
.filter_map(|&flow_node_idx| process.flow_nodes.get(*flow_node_idx))
{
let new_states = flow_node.try_execute(
snapshot,
state,
Expand Down
4 changes: 4 additions & 0 deletions core/src/bpmn/process.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::bpmn::flow_node::{FlowNode, SequenceFlow};
use std::collections::HashMap;

#[derive(Debug, PartialEq)]
pub struct Process {
pub id: String,
pub flow_nodes: Vec<FlowNode>,
pub sequence_flow_index: HashMap<String, usize>, // Map from sf_id to target flow node index.
}

impl Process {
Expand All @@ -16,6 +18,8 @@ impl Process {
let target_idx = self.find_flow_node_idx_by_id(target_ref);
match (source_idx, target_idx) {
(Some(source_idx), Some(target_idx)) => {
self.sequence_flow_index.insert(id.clone(), target_idx);

let source = self.flow_nodes.get_mut(source_idx).unwrap();
source.add_outgoing_flow(SequenceFlow {
id: id.clone(),
Expand Down
13 changes: 7 additions & 6 deletions core/src/bpmn/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::bpmn::flow_node::{EventType, FlowNode, FlowNodeType, TaskType};
use crate::bpmn::process::Process;
use quick_xml::events::{BytesStart, Event};
use quick_xml::reader::Reader;
use std::collections::HashMap;
use std::{fmt, fs};

#[derive(Debug)]
Expand Down Expand Up @@ -50,8 +51,7 @@ pub fn read_bpmn_from_string(
Ok(Event::Start(e)) => match e.local_name().as_ref() {
b"process" => {
add_participant(&mut collaboration, &e);
current_participant =
Some(get_attribute_value_or_panic(&e, &String::from("id")));
current_participant = Some(get_attribute_value_or_panic(&e, "id"));
}
b"subProcess" => {
if has_true_attribute_value(&e, "triggeredByEvent") {
Expand Down Expand Up @@ -192,7 +192,7 @@ pub fn read_bpmn_from_string(
if !unsupported_elements.is_empty() {
let unsupported_elements: Vec<String> = unsupported_elements
.iter()
.map(|e| get_attribute_value_or_panic(e, &String::from("id")))
.map(|e| get_attribute_value_or_panic(e, "id"))
.collect();
return Err(UnsupportedBpmnElementsError {
unsupported_elements,
Expand All @@ -218,10 +218,11 @@ fn read_file(path: &str) -> String {
}

fn add_participant(collaboration: &mut Collaboration, p_bytes: &BytesStart) {
let id = get_attribute_value_or_panic(p_bytes, &String::from("id"));
let id = get_attribute_value_or_panic(p_bytes, "id");
collaboration.add_participant(Process {
id,
flow_nodes: Vec::new(),
flow_nodes: vec![],
sequence_flow_index: HashMap::new(),
});
}

Expand All @@ -245,7 +246,7 @@ fn add_flow_node(
flow_node_bytes: &BytesStart,
flow_node_type: FlowNodeType,
) {
let id = get_attribute_value_or_panic(flow_node_bytes, &String::from("id"));
let id = get_attribute_value_or_panic(flow_node_bytes, "id");
let last_participant = collaboration.participants.last_mut();
match last_participant {
None => {
Expand Down
4 changes: 3 additions & 1 deletion core/src/bpmn/reader_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod tests {
use crate::bpmn::flow_node::{EventType, FlowNode, TaskType};
use crate::bpmn::process::Process;
use crate::bpmn::reader::read_bpmn_from_file;
use std::collections::HashMap;

const PATH: &str = "tests/resources/unit/";

Expand Down Expand Up @@ -36,7 +37,8 @@ mod tests {
};
let mut process = Process {
id: String::from("process_id"),
flow_nodes: Vec::new(),
flow_nodes: vec![],
sequence_flow_index: HashMap::new(),
};
process.add_flow_node(FlowNode::new(
String::from("start"),
Expand Down