Skip to content

Commit

Permalink
Connect mini forward workflow triggeed by extended sos_step target. #983
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Jun 16, 2018
1 parent 4fd39a9 commit f8c40a4
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 45 deletions.
63 changes: 35 additions & 28 deletions src/sos/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@


class SoS_Node(object):
def __init__(self, step_uuid: str, node_name: str, node_index: Union[str, None], input_targets: sos_targets, depends_targets: sos_targets,
def __init__(self, step_uuid: str, node_name: str, wf_index: Union[int, None], node_index: Union[int, None], input_targets: sos_targets, depends_targets: sos_targets,
output_targets: sos_targets, context: dict) -> None:
self._step_uuid = step_uuid
self._node_id = node_name
self._wf_index = wf_index
self._node_index = node_index
self._input_targets = input_targets
self._depends_targets = depends_targets
Expand Down Expand Up @@ -116,13 +117,18 @@ def __init__(self, *args, **kwargs):
self._all_dependent_files: DefaultDict[BaseTarget, List] = defaultdict(
list)
self._all_output_files = defaultdict(list)
# index of mini
self._forward_workflow_id = 0

def new_forward_workflow(self):
self._forward_workflow_id += 1

def num_nodes(self):
return nx.number_of_nodes(self)

def add_step(self, step_uuid, node_name, node_index, input_targets: sos_targets, depends_targets: sos_targets,
output_targets: sos_targets, context: dict={}):
node = SoS_Node(step_uuid, node_name, node_index, input_targets, depends_targets,
node = SoS_Node(step_uuid, node_name, None if node_index is None else self._forward_workflow_id, node_index, input_targets, depends_targets,
output_targets, context)
if node._node_uuid in [x._node_uuid for x in self.nodes()]:
return
Expand Down Expand Up @@ -284,33 +290,34 @@ def build(self, steps):
# refer to http://stackoverflow.com/questions/33494376/networkx-add-edges-to-graph-from-node-attributes
#
# several cases triggers dependency.
indexed = [x for x in self.nodes() if x._node_index is not None]
indexed.sort(key=lambda x: x._node_index)

for idx, node in enumerate(indexed):
# 1. if a node changes context (using option alias), all later steps
# has to rely on it.
if node._context['__changed_vars__']:
for later_node in indexed[idx + 1:]:
if node._context['__changed_vars__'] & (later_node._context['__signature_vars__'] | later_node._context['__environ_vars__']):
self.add_edge(node, later_node)

# 2. if the input of a step is undetermined, it has to be executed
# after all its previous steps.
if not node._input_targets.determined() and idx > 0:
# if there is some input specified, it does not use default
# input, so the relationship can be further looked before
if isinstance(node._input_targets._undetermined, str):
# if the input is dynamic, has to rely on previous step...
if 'dynamic' in node._context['__environ_vars__']:
self.add_edge(indexed[idx - 1], node)
for wf in range(self._forward_workflow_id + 1):
indexed = [x for x in self.nodes() if x._wf_index == wf]
indexed.sort(key=lambda x: x._node_index)

for idx, node in enumerate(indexed):
# 1. if a node changes context (using option alias), all later steps
# has to rely on it.
if node._context['__changed_vars__']:
for later_node in indexed[idx + 1:]:
if node._context['__changed_vars__'] & (later_node._context['__signature_vars__'] | later_node._context['__environ_vars__']):
self.add_edge(node, later_node)

# 2. if the input of a step is undetermined, it has to be executed
# after all its previous steps.
if not node._input_targets.determined() and idx > 0:
# if there is some input specified, it does not use default
# input, so the relationship can be further looked before
if isinstance(node._input_targets._undetermined, str):
# if the input is dynamic, has to rely on previous step...
if 'dynamic' in node._context['__environ_vars__']:
self.add_edge(indexed[idx - 1], node)
else:
# otherwise let us look back.
for prev_node in indexed[idx - 1::-1]:
if node._context['__environ_vars__'] & prev_node._context['__changed_vars__']:
self.add_edge(prev_node, node)
else:
# otherwise let us look back.
for prev_node in indexed[idx - 1::-1]:
if node._context['__environ_vars__'] & prev_node._context['__changed_vars__']:
self.add_edge(prev_node, node)
else:
self.add_edge(indexed[idx - 1], node)
self.add_edge(indexed[idx - 1], node)
#
# 3. if the input of a step depends on the output of another step
for target, in_node in self._all_dependent_files.items():
Expand Down
25 changes: 14 additions & 11 deletions src/sos/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,11 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]=
if len(mo) > 1:
# sos_step('a') could match to step a_1, a_2, etc, in this case we are adding a subworkflow
if isinstance(target, sos_step):
# create a new forward_workflow that is different from the master one
dag.new_forward_workflow()
# get the step names
sections = sorted([x[0] for x in mo], key=lambda x: x.step_name())
sections = sorted([x[0] for x in mo],
key=lambda x: x.step_name())
# no default input
default_input: sos_targets = sos_targets()
#
Expand All @@ -589,8 +592,10 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]=
continue
res = analyze_section(section, default_input)

environ_vars = res['environ_vars'] - self._base_symbols
signature_vars = res['signature_vars'] - self._base_symbols
environ_vars = res['environ_vars'] - \
self._base_symbols
signature_vars = res['signature_vars'] - \
self._base_symbols
changed_vars = res['changed_vars']
# parameters, if used in the step, should be considered environmental
environ_vars |= env.parameter_vars & signature_vars
Expand All @@ -617,24 +622,22 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]=
}
if idx == 0:
context['__step_output__'] = env.sos_dict['__step_output__']
else:
res['step_depends'].extend(sos_step(sections[idx-1].step_name()))
if idx == len(sections) - 1:
# for the last step, we say the mini-subworkflow satisfies sos_step('a')
# we have to do it this way because by default the DAG only sees sos_step('a_1') etc
res['step_output'].extend(target)
elif idx == len(sections) - 1:
# for the last step, we say the mini-subworkflow satisfies sos_step('a')
# we have to do it this way because by default the DAG only sees sos_step('a_1') etc
res['step_output'].extend(target)

node_name = section.step_name()
dag.add_step(section.uuid,
node_name, None,
node_name, idx,
res['step_input'],
res['step_depends'],
res['step_output'],
context=context)
default_input = res['step_output']
added_node += len(sections)
resolved += 1
#dag.show_nodes()
# dag.show_nodes()
continue
else:
raise RuntimeError(
Expand Down
14 changes: 8 additions & 6 deletions test/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,6 @@ def testExecuteIPynb(self):
wf = script.workflow()
Base_Executor(wf).run()


def testOutputReport(self):
'''Test generation of report'''
if os.path.isfile('report.html'):
Expand Down Expand Up @@ -1165,7 +1164,6 @@ def testOutputReportWithDAG(self):
content = rep.read()
self.assertTrue('Execution DAG' in content)


def testSoSStepWithOutput(self):
'''Test checking output of sos_step #981'''
script = SoS_Script('''
Expand All @@ -1180,19 +1178,20 @@ def testSoSStepWithOutput(self):
wf = script.workflow()
Base_Executor(wf).run()


def testMultiSoSStep(self):
'''Test matching 'a_1', 'a_2' etc with sos_step('a')'''
file_target('a_1').remove('all')
file_target('a_2').remove('all')
script = SoS_Script('''
[a_1]
output: "a_1"
sh:
touch a_1
echo whatever > a_1
[a_2]
sh:
touch a_2
output: "a_2"
sh: expand=True
cp {_input} {_output}
[default]
depends: sos_step('a')
Expand All @@ -1202,6 +1201,9 @@ def testMultiSoSStep(self):
self.assertEqual(res['__completed__']['__step_completed__'], 3)
self.assertTrue(os.path.isfile('a_1'))
self.assertTrue(os.path.isfile('a_2'))
with open('a_1') as a1, open('a_2') as a2:
self.assertEqual(a1.read(), a2.read())


if __name__ == '__main__':
unittest.main()

0 comments on commit f8c40a4

Please sign in to comment.