Skip to content

Commit

Permalink
Implement option -T #1197
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Jan 30, 2019
1 parent f8cad39 commit ddadbf9
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 124 deletions.
64 changes: 24 additions & 40 deletions src/sos/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ class SoS_DAG(nx.DiGraph):
def __init__(self, *args, **kwargs):
nx.DiGraph.__init__(self, *args, **kwargs)
# all_input
self._all_input_files = defaultdict(list)
self._all_depends_files = defaultdict(list)
self._all_output_files = defaultdict(list)
# index of mini
Expand All @@ -133,8 +132,8 @@ def add_step(self, step_uuid, node_name, node_index, input_targets: sos_targets,
self._all_output_files[sos_step(node_name.split(' ')[0])].append(node)

for x in input_targets:
if node not in self._all_input_files[x]:
self._all_input_files[x].append(node)
if node not in self._all_depends_files[x]:
self._all_depends_files[x].append(node)
for x in depends_targets:
if node not in self._all_depends_files[x]:
self._all_depends_files[x].append(node)
Expand All @@ -149,8 +148,8 @@ def add_step(self, step_uuid, node_name, node_index, input_targets: sos_targets,

def update_step(self, node, input_targets: sos_targets, output_targets: sos_targets, depends_targets: sos_targets):
for x in input_targets:
if node not in self._all_input_files[x]:
self._all_input_files[x].append(node)
if node not in self._all_depends_files[x]:
self._all_depends_files[x].append(node)
for x in depends_targets:
if node not in self._all_depends_files[x]:
self._all_depends_files[x].append(node)
Expand Down Expand Up @@ -216,42 +215,34 @@ def circular_dependencies(self):
def steps_depending_on(self, target: BaseTarget, workflow):
if target in self._all_depends_files:
return ' requested by ' + ', '.join(set([workflow.section_by_id(x._step_uuid).step_name() for x in self._all_depends_files[target]]))
elif target in self._all_input_files:
return ' requested by ' + ', '.join(set([workflow.section_by_id(x._step_uuid).step_name() for x in self._all_input_files[target]]))
else:
return ''
return ''

def pending(self):
return [x for x in self.nodes() if x._status == 'failed'], [x for x in self.nodes() if x._status is None]

def dangling(self, targets: sos_targets):
missing = []
depending = []
'''returns
1. missing targets, which are missing from the DAG or from the provided targets
2. existing targets of provided target list, not in DAG
'''
existing = []
for x in self._all_input_files.keys():
# for input files, if it exists, and not in output files,
# declear exist, if it is in output_files, good.
if x.target_exists():
if x not in self._all_output_files:
existing.append(x)
# if it does not exist, and not in output_files, declear missing
elif x not in self._all_output_files:
missing.append(x)
for x in self._all_depends_files.keys():
# for dependent files, if it exists, and not in output files
# we still try to find steps to satify it
if x.target_exists():
missing = []
if env.config['trace_existing']:
for x in self._all_depends_files.keys():
if x not in self._all_output_files:
depending.append(x)
elif x not in self._all_output_files:
missing.append(x)
if x.target_exists():
existing.append(x)
else:
missing.append(x)
else:
missing = [x for x in self._all_depends_files.keys() if x not in self._all_output_files and not x.target_exists()]
for x in targets:
if x.target_exists():
if x not in self._all_output_files:
depending.append(x)
elif x not in self._all_output_files:
missing.append(x)
return missing, existing, depending
if x not in self._all_output_files:
if x.target_exists('target'):
existing.append(x)
else:
missing.append(x)
return missing, existing

def regenerate_target(self, target: BaseTarget):
if target in self._all_output_files:
Expand Down Expand Up @@ -328,13 +319,6 @@ def build(self):
for j in out_node:
if j != i:
self.add_edge(j, i)
#
for target, in_node in self._all_input_files.items():
for out_node in [y for (x, y) in self._all_output_files.items() if x == target]:
for i in in_node:
for j in out_node:
if j != i:
self.add_edge(j, i)

def save(self, dest=None):
if not dest:
Expand Down
12 changes: 9 additions & 3 deletions src/sos/section_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,12 @@ def get_step_input(section, default_input):
'''Find step input
'''
step_input: sos_targets = sos_targets()
dynamic_input = True

# look for input statement.
input_idx = find_statement(section, 'input')
if input_idx is None:
return step_input
return step_input, dynamic_input

# input statement
stmt = section.statements[input_idx][2]
Expand All @@ -223,8 +225,10 @@ def get_step_input(section, default_input):
step_input = sos_targets()
else:
step_input = default_input
dynamoc_input = False
elif not any(isinstance(x, (dynamic, remote)) for x in args):
step_input = sos_targets(*args)
dynamic_input = True
except SyntaxError:
raise
except Exception as e:
Expand All @@ -236,7 +240,7 @@ def get_step_input(section, default_input):
finally:
[env.sos_dict._dict.pop(x) for x in svars]
env.sos_dict._dict.update(old_values)
return step_input
return step_input, dynamic_input

def get_step_output(section, default_output):
'''determine step output'''
Expand Down Expand Up @@ -375,7 +379,9 @@ def analyze_section(section: SoS_Step, default_input: Optional[sos_targets] = No
'changed_vars': get_changed_vars(section)
}
if not vars_and_output_only:
res['step_input'] = get_step_input(section, default_input)
inps = get_step_input(section, default_input)
res['step_input'] = inps[0]
res['dynamic_input'] = inps[1]
deps = get_step_depends(section)
res['step_depends'] = deps[0]
res['dynamic_depends'] = deps[1]
Expand Down
12 changes: 8 additions & 4 deletions src/sos/step_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ def process_input_args(self, ifiles: sos_targets, **kwargs):

assert isinstance(ifiles, sos_targets)

if env.sos_dict.get('__dynamic_input__', False):
self.verify_dynamic_targets([x for x in ifiles if isinstance(x, file_target)])

# input file is the filtered files
env.sos_dict.set('step_input', ifiles)
env.sos_dict.set('_input', ifiles)
Expand All @@ -349,7 +352,7 @@ def process_input_args(self, ifiles: sos_targets, **kwargs):
#
return ifiles.groups

def verify_dynamic_depends(self, target):
def verify_dynamic_targets(self, target):
return True

def process_depends_args(self, dfiles: sos_targets, **kwargs):
Expand All @@ -360,7 +363,7 @@ def process_depends_args(self, dfiles: sos_targets, **kwargs):
raise ValueError(r"Depends needs to handle undetermined")

if env.sos_dict.get('__dynamic_depends__', False):
self.verify_dynamic_depends([x for x in dfiles if isinstance(x, file_target)])
self.verify_dynamic_targets([x for x in dfiles if isinstance(x, file_target)])

env.sos_dict.set('_depends', dfiles)
env.sos_dict.set('step_depends', dfiles)
Expand Down Expand Up @@ -1413,9 +1416,10 @@ def handle_unknown_target(self, e):
if not res:
raise e

def verify_dynamic_depends(self, targets):
if not targets:
def verify_dynamic_targets(self, targets):
if not targets or not env.config['trace_existing']:
return

self.socket.send_pyobj(['dependent_target'] + targets)
res = self.socket.recv()
if res != b'target_resolved':
Expand Down
95 changes: 19 additions & 76 deletions src/sos/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]=
'__environ_vars__': environ_vars,
'__changed_vars__': changed_vars,
'__dynamic_depends__': res['dynamic_depends'],
'__dynamic_input__': res['dynamic_input']
}
if idx == 0:
context['__step_output__'] = env.sos_dict['__step_output__']
Expand Down Expand Up @@ -605,6 +606,8 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]=
context['__changed_vars__'] = res['changed_vars']
context['__default_output__'] = env.sos_dict['__default_output__']
context['__dynamic_depends__'] = res['dynamic_depends']
context['__dynamic_input__'] = res['dynamic_input']

# NOTE: If a step is called multiple times with different targets, it is much better
# to use different names because pydotplus can be very slow in handling graphs with nodes
# with identical names.
Expand All @@ -620,91 +623,27 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]=
added_node += 1
resolved += 1

# for existing targets... we should check if it actually exists. If
# not it would still need to be regenerated
# for existing targets that are not in DAG
if not env.config['trace_existing']:
if added_node == 0:
break
else:
continue

node_added = False
existing_targets = set(dag.dangling(targets)[1])
for target in existing_targets:
if node_added:
existing_targets = set(dag.dangling(targets)[1])
node_added = False
if target not in existing_targets:
# target already in DAG and not reported as existing
continue
if file_target(target).target_exists('target') if isinstance(target, str) else target.target_exists('target'):
continue
mo = self.match(target)
if not mo:
# this is ok, this is just an existing target, no one is designed to
# generate it.
continue
if len(mo) > 1:
# this is not ok.
raise RuntimeError(
f'Multiple steps {", ".join(x.step_name() for x in mo)} to generate target {target}')
#
# only one step, we need to process it # execute section with specified input
#
if not isinstance(mo[0], tuple):
section = mo[0]
env.sos_dict['__default_output__'] = sos_targets(target)
context = {}
else:
section = mo[0][0]
if isinstance(mo[0][1], dict):
for k, v in mo[0][1].items():
env.sos_dict.set(k, v)

if mo[0][1]:
env.sos_dict['__default_output__'] = sos_targets(target)
context = {}
else:
env.sos_dict['__default_output__'] = sos_targets(
section.options['provides'])
context = mo[0][1]
# will become input, set to None
env.sos_dict['__step_output__'] = sos_targets()
#
res = analyze_section(section, default_output=env.sos_dict['__default_output__'])
#
# build DAG with input and output files of step
env.logger.debug(
f'Adding step {res["step_name"]} with output {short_repr(res["step_output"])} to resolve target {target}')

context['__signature_vars__'] = res['signature_vars']
context['__environ_vars__'] = res['environ_vars']
context['__changed_vars__'] = res['changed_vars']
context['__default_output__'] = env.sos_dict['__default_output__']
context['__dynamic_depends__'] = res['dynamic_depends']

# NOTE: If a step is called multiple times with different targets, it is much better
# to use different names because pydotplus can be very slow in handling graphs with nodes
# with identical names.
node_name = section.step_name()
if env.sos_dict["__default_output__"]:
node_name += f' {short_repr(env.sos_dict["__default_output__"])})'
dag.add_step(section.uuid, node_name,
None, res['step_input'],
res['step_depends'], res['step_output'], context=context)
node_added = True
added_node += 1
# this case do not count as resolved
# resolved += 1

# for depending targets... they already exist but we will add
# nodes that generates them if available.
node_added = False
depending_targets = set(dag.dangling(targets)[2])
for target in depending_targets:
if node_added:
depending_targets = set(dag.dangling(targets)[2])
node_added = False
if target not in depending_targets:
continue
# now we need to build DAG for existing...
mo = self.match(target)
if not mo:
# this is ok, this is just an existing target, no one is designed to
# generate it.
env.logger.debug(f'{target} already exists')
continue
if len(mo) > 1:
# this is not ok.
Expand Down Expand Up @@ -744,6 +683,7 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]=
context['__changed_vars__'] = res['changed_vars']
context['__default_output__'] = env.sos_dict['__default_output__']
context['__dynamic_depends__'] = res['dynamic_depends']
context['__dynamic_input__'] = res['dynamic_input']

# NOTE: If a step is called multiple times with different targets, it is much better
# to use different names because pydotplus can be very slow in handling graphs with nodes
Expand Down Expand Up @@ -794,7 +734,9 @@ def initialize_dag(self, targets: Optional[List[str]] = [], nested: bool = False
context = {'__signature_vars__': signature_vars,
'__environ_vars__': environ_vars,
'__changed_vars__': changed_vars,
'__dynamic_depends__': res['dynamic_depends']}
'__dynamic_depends__': res['dynamic_depends'],
'__dynamic_input__': res['dynamic_input']
}

# for nested workflow, the input is specified by sos_run, not None.
if idx == 0:
Expand Down Expand Up @@ -903,10 +845,10 @@ def handle_dependent_target(self, dag, targets, runnable) -> int:
# for depending targets... they already exist but we will add
# nodes that generates them if available.
node_added = False
depending_targets = set(dag.dangling(targets)[2])
depending_targets = set(dag.dangling(targets)[1])
for target in depending_targets:
if node_added:
depending_targets = set(dag.dangling(targets)[2])
depending_targets = set(dag.dangling(targets)[1])
node_added = False
if target not in depending_targets:
continue
Expand Down Expand Up @@ -954,6 +896,7 @@ def handle_dependent_target(self, dag, targets, runnable) -> int:
context['__changed_vars__'] = res['changed_vars']
context['__default_output__'] = env.sos_dict['__default_output__']
context['__dynamic_depends__'] = res['dynamic_depends']
context['__dynamic_input__'] = res['dynamic_input']

# NOTE: If a step is called multiple times with different targets, it is much better
# to use different names because pydotplus can be very slow in handling graphs with nodes
Expand Down
2 changes: 1 addition & 1 deletion test/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -2153,7 +2153,7 @@ def testReexecutionOfDynamicDepends(self):
Base_Executor(wf).run()
# if we run again, because depends, the step will be re-checked
os.remove('a.bam')
res = Base_Executor(wf).run()
res = Base_Executor(wf, config={'trace_existing': True}).run()
self.assertEqual(res['__completed__']['__step_completed__'], 2)
self.assertEqual(res['__completed__']['__step_skipped__'], 1)

Expand Down

0 comments on commit ddadbf9

Please sign in to comment.