From f8c40a47bbc98da2f97e55873b19b78e75f7fe39 Mon Sep 17 00:00:00 2001 From: Bo Peng Date: Fri, 15 Jun 2018 22:24:11 -0500 Subject: [PATCH] Connect mini forward workflow triggeed by extended sos_step target. #983 --- src/sos/dag.py | 63 ++++++++++++++++++++---------------- src/sos/workflow_executor.py | 25 +++++++------- test/test_execute.py | 14 ++++---- 3 files changed, 57 insertions(+), 45 deletions(-) diff --git a/src/sos/dag.py b/src/sos/dag.py index 90f1b8a6e..fa1ffa39d 100644 --- a/src/sos/dag.py +++ b/src/sos/dag.py @@ -81,10 +81,11 @@ class SoS_Node(object): - def __init__(self, step_uuid: str, node_name: str, node_index: Union[str, None], input_targets: sos_targets, depends_targets: sos_targets, + def __init__(self, step_uuid: str, node_name: str, wf_index: Union[int, None], node_index: Union[int, None], input_targets: sos_targets, depends_targets: sos_targets, output_targets: sos_targets, context: dict) -> None: self._step_uuid = step_uuid self._node_id = node_name + self._wf_index = wf_index self._node_index = node_index self._input_targets = input_targets self._depends_targets = depends_targets @@ -116,13 +117,18 @@ def __init__(self, *args, **kwargs): self._all_dependent_files: DefaultDict[BaseTarget, List] = defaultdict( list) self._all_output_files = defaultdict(list) + # index of mini + self._forward_workflow_id = 0 + + def new_forward_workflow(self): + self._forward_workflow_id += 1 def num_nodes(self): return nx.number_of_nodes(self) def add_step(self, step_uuid, node_name, node_index, input_targets: sos_targets, depends_targets: sos_targets, output_targets: sos_targets, context: dict={}): - node = SoS_Node(step_uuid, node_name, node_index, input_targets, depends_targets, + node = SoS_Node(step_uuid, node_name, None if node_index is None else self._forward_workflow_id, node_index, input_targets, depends_targets, output_targets, context) if node._node_uuid in [x._node_uuid for x in self.nodes()]: return @@ -284,33 +290,34 @@ def build(self, steps): # refer to http://stackoverflow.com/questions/33494376/networkx-add-edges-to-graph-from-node-attributes # # several cases triggers dependency. - indexed = [x for x in self.nodes() if x._node_index is not None] - indexed.sort(key=lambda x: x._node_index) - - for idx, node in enumerate(indexed): - # 1. if a node changes context (using option alias), all later steps - # has to rely on it. - if node._context['__changed_vars__']: - for later_node in indexed[idx + 1:]: - if node._context['__changed_vars__'] & (later_node._context['__signature_vars__'] | later_node._context['__environ_vars__']): - self.add_edge(node, later_node) - - # 2. if the input of a step is undetermined, it has to be executed - # after all its previous steps. - if not node._input_targets.determined() and idx > 0: - # if there is some input specified, it does not use default - # input, so the relationship can be further looked before - if isinstance(node._input_targets._undetermined, str): - # if the input is dynamic, has to rely on previous step... - if 'dynamic' in node._context['__environ_vars__']: - self.add_edge(indexed[idx - 1], node) + for wf in range(self._forward_workflow_id + 1): + indexed = [x for x in self.nodes() if x._wf_index == wf] + indexed.sort(key=lambda x: x._node_index) + + for idx, node in enumerate(indexed): + # 1. if a node changes context (using option alias), all later steps + # has to rely on it. + if node._context['__changed_vars__']: + for later_node in indexed[idx + 1:]: + if node._context['__changed_vars__'] & (later_node._context['__signature_vars__'] | later_node._context['__environ_vars__']): + self.add_edge(node, later_node) + + # 2. if the input of a step is undetermined, it has to be executed + # after all its previous steps. + if not node._input_targets.determined() and idx > 0: + # if there is some input specified, it does not use default + # input, so the relationship can be further looked before + if isinstance(node._input_targets._undetermined, str): + # if the input is dynamic, has to rely on previous step... + if 'dynamic' in node._context['__environ_vars__']: + self.add_edge(indexed[idx - 1], node) + else: + # otherwise let us look back. + for prev_node in indexed[idx - 1::-1]: + if node._context['__environ_vars__'] & prev_node._context['__changed_vars__']: + self.add_edge(prev_node, node) else: - # otherwise let us look back. - for prev_node in indexed[idx - 1::-1]: - if node._context['__environ_vars__'] & prev_node._context['__changed_vars__']: - self.add_edge(prev_node, node) - else: - self.add_edge(indexed[idx - 1], node) + self.add_edge(indexed[idx - 1], node) # # 3. if the input of a step depends on the output of another step for target, in_node in self._all_dependent_files.items(): diff --git a/src/sos/workflow_executor.py b/src/sos/workflow_executor.py index c8b0658b1..cfda41033 100755 --- a/src/sos/workflow_executor.py +++ b/src/sos/workflow_executor.py @@ -579,8 +579,11 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]= if len(mo) > 1: # sos_step('a') could match to step a_1, a_2, etc, in this case we are adding a subworkflow if isinstance(target, sos_step): + # create a new forward_workflow that is different from the master one + dag.new_forward_workflow() # get the step names - sections = sorted([x[0] for x in mo], key=lambda x: x.step_name()) + sections = sorted([x[0] for x in mo], + key=lambda x: x.step_name()) # no default input default_input: sos_targets = sos_targets() # @@ -589,8 +592,10 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]= continue res = analyze_section(section, default_input) - environ_vars = res['environ_vars'] - self._base_symbols - signature_vars = res['signature_vars'] - self._base_symbols + environ_vars = res['environ_vars'] - \ + self._base_symbols + signature_vars = res['signature_vars'] - \ + self._base_symbols changed_vars = res['changed_vars'] # parameters, if used in the step, should be considered environmental environ_vars |= env.parameter_vars & signature_vars @@ -617,16 +622,14 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]= } if idx == 0: context['__step_output__'] = env.sos_dict['__step_output__'] - else: - res['step_depends'].extend(sos_step(sections[idx-1].step_name())) - if idx == len(sections) - 1: - # for the last step, we say the mini-subworkflow satisfies sos_step('a') - # we have to do it this way because by default the DAG only sees sos_step('a_1') etc - res['step_output'].extend(target) + elif idx == len(sections) - 1: + # for the last step, we say the mini-subworkflow satisfies sos_step('a') + # we have to do it this way because by default the DAG only sees sos_step('a_1') etc + res['step_output'].extend(target) node_name = section.step_name() dag.add_step(section.uuid, - node_name, None, + node_name, idx, res['step_input'], res['step_depends'], res['step_output'], @@ -634,7 +637,7 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]= default_input = res['step_output'] added_node += len(sections) resolved += 1 - #dag.show_nodes() + # dag.show_nodes() continue else: raise RuntimeError( diff --git a/test/test_execute.py b/test/test_execute.py index 3e361ef76..56ed9fe74 100644 --- a/test/test_execute.py +++ b/test/test_execute.py @@ -1106,7 +1106,6 @@ def testExecuteIPynb(self): wf = script.workflow() Base_Executor(wf).run() - def testOutputReport(self): '''Test generation of report''' if os.path.isfile('report.html'): @@ -1165,7 +1164,6 @@ def testOutputReportWithDAG(self): content = rep.read() self.assertTrue('Execution DAG' in content) - def testSoSStepWithOutput(self): '''Test checking output of sos_step #981''' script = SoS_Script(''' @@ -1180,19 +1178,20 @@ def testSoSStepWithOutput(self): wf = script.workflow() Base_Executor(wf).run() - def testMultiSoSStep(self): '''Test matching 'a_1', 'a_2' etc with sos_step('a')''' file_target('a_1').remove('all') file_target('a_2').remove('all') script = SoS_Script(''' [a_1] +output: "a_1" sh: -touch a_1 + echo whatever > a_1 [a_2] -sh: -touch a_2 +output: "a_2" +sh: expand=True + cp {_input} {_output} [default] depends: sos_step('a') @@ -1202,6 +1201,9 @@ def testMultiSoSStep(self): self.assertEqual(res['__completed__']['__step_completed__'], 3) self.assertTrue(os.path.isfile('a_1')) self.assertTrue(os.path.isfile('a_2')) + with open('a_1') as a1, open('a_2') as a2: + self.assertEqual(a1.read(), a2.read()) + if __name__ == '__main__': unittest.main()