diff --git a/src/sos/dag.py b/src/sos/dag.py index dfacf03f0..57d360cc5 100644 --- a/src/sos/dag.py +++ b/src/sos/dag.py @@ -110,7 +110,6 @@ class SoS_DAG(nx.DiGraph): def __init__(self, *args, **kwargs): nx.DiGraph.__init__(self, *args, **kwargs) # all_input - self._all_input_files = defaultdict(list) self._all_depends_files = defaultdict(list) self._all_output_files = defaultdict(list) # index of mini @@ -133,8 +132,8 @@ def add_step(self, step_uuid, node_name, node_index, input_targets: sos_targets, self._all_output_files[sos_step(node_name.split(' ')[0])].append(node) for x in input_targets: - if node not in self._all_input_files[x]: - self._all_input_files[x].append(node) + if node not in self._all_depends_files[x]: + self._all_depends_files[x].append(node) for x in depends_targets: if node not in self._all_depends_files[x]: self._all_depends_files[x].append(node) @@ -149,8 +148,8 @@ def add_step(self, step_uuid, node_name, node_index, input_targets: sos_targets, def update_step(self, node, input_targets: sos_targets, output_targets: sos_targets, depends_targets: sos_targets): for x in input_targets: - if node not in self._all_input_files[x]: - self._all_input_files[x].append(node) + if node not in self._all_depends_files[x]: + self._all_depends_files[x].append(node) for x in depends_targets: if node not in self._all_depends_files[x]: self._all_depends_files[x].append(node) @@ -216,42 +215,34 @@ def circular_dependencies(self): def steps_depending_on(self, target: BaseTarget, workflow): if target in self._all_depends_files: return ' requested by ' + ', '.join(set([workflow.section_by_id(x._step_uuid).step_name() for x in self._all_depends_files[target]])) - elif target in self._all_input_files: - return ' requested by ' + ', '.join(set([workflow.section_by_id(x._step_uuid).step_name() for x in self._all_input_files[target]])) - else: - return '' + return '' def pending(self): return [x for x in self.nodes() if x._status == 'failed'], [x for x in self.nodes() if x._status is None] def dangling(self, targets: sos_targets): - missing = [] - depending = [] + '''returns + 1. missing targets, which are missing from the DAG or from the provided targets + 2. existing targets of provided target list, not in DAG + ''' existing = [] - for x in self._all_input_files.keys(): - # for input files, if it exists, and not in output files, - # declear exist, if it is in output_files, good. - if x.target_exists(): - if x not in self._all_output_files: - existing.append(x) - # if it does not exist, and not in output_files, declear missing - elif x not in self._all_output_files: - missing.append(x) - for x in self._all_depends_files.keys(): - # for dependent files, if it exists, and not in output files - # we still try to find steps to satify it - if x.target_exists(): + missing = [] + if env.config['trace_existing']: + for x in self._all_depends_files.keys(): if x not in self._all_output_files: - depending.append(x) - elif x not in self._all_output_files: - missing.append(x) + if x.target_exists(): + existing.append(x) + else: + missing.append(x) + else: + missing = [x for x in self._all_depends_files.keys() if x not in self._all_output_files and not x.target_exists()] for x in targets: - if x.target_exists(): - if x not in self._all_output_files: - depending.append(x) - elif x not in self._all_output_files: - missing.append(x) - return missing, existing, depending + if x not in self._all_output_files: + if x.target_exists('target'): + existing.append(x) + else: + missing.append(x) + return missing, existing def regenerate_target(self, target: BaseTarget): if target in self._all_output_files: @@ -328,13 +319,6 @@ def build(self): for j in out_node: if j != i: self.add_edge(j, i) - # - for target, in_node in self._all_input_files.items(): - for out_node in [y for (x, y) in self._all_output_files.items() if x == target]: - for i in in_node: - for j in out_node: - if j != i: - self.add_edge(j, i) def save(self, dest=None): if not dest: diff --git a/src/sos/section_analyzer.py b/src/sos/section_analyzer.py index 894def9dd..c40c027a0 100644 --- a/src/sos/section_analyzer.py +++ b/src/sos/section_analyzer.py @@ -200,10 +200,12 @@ def get_step_input(section, default_input): '''Find step input ''' step_input: sos_targets = sos_targets() + dynamic_input = True + # look for input statement. input_idx = find_statement(section, 'input') if input_idx is None: - return step_input + return step_input, dynamic_input # input statement stmt = section.statements[input_idx][2] @@ -223,8 +225,10 @@ def get_step_input(section, default_input): step_input = sos_targets() else: step_input = default_input + dynamoc_input = False elif not any(isinstance(x, (dynamic, remote)) for x in args): step_input = sos_targets(*args) + dynamic_input = True except SyntaxError: raise except Exception as e: @@ -236,7 +240,7 @@ def get_step_input(section, default_input): finally: [env.sos_dict._dict.pop(x) for x in svars] env.sos_dict._dict.update(old_values) - return step_input + return step_input, dynamic_input def get_step_output(section, default_output): '''determine step output''' @@ -375,7 +379,9 @@ def analyze_section(section: SoS_Step, default_input: Optional[sos_targets] = No 'changed_vars': get_changed_vars(section) } if not vars_and_output_only: - res['step_input'] = get_step_input(section, default_input) + inps = get_step_input(section, default_input) + res['step_input'] = inps[0] + res['dynamic_input'] = inps[1] deps = get_step_depends(section) res['step_depends'] = deps[0] res['dynamic_depends'] = deps[1] diff --git a/src/sos/step_executor.py b/src/sos/step_executor.py index 5d04ba062..2f8be66fc 100755 --- a/src/sos/step_executor.py +++ b/src/sos/step_executor.py @@ -340,6 +340,9 @@ def process_input_args(self, ifiles: sos_targets, **kwargs): assert isinstance(ifiles, sos_targets) + if env.sos_dict.get('__dynamic_input__', False): + self.verify_dynamic_targets([x for x in ifiles if isinstance(x, file_target)]) + # input file is the filtered files env.sos_dict.set('step_input', ifiles) env.sos_dict.set('_input', ifiles) @@ -349,7 +352,7 @@ def process_input_args(self, ifiles: sos_targets, **kwargs): # return ifiles.groups - def verify_dynamic_depends(self, target): + def verify_dynamic_targets(self, target): return True def process_depends_args(self, dfiles: sos_targets, **kwargs): @@ -360,7 +363,7 @@ def process_depends_args(self, dfiles: sos_targets, **kwargs): raise ValueError(r"Depends needs to handle undetermined") if env.sos_dict.get('__dynamic_depends__', False): - self.verify_dynamic_depends([x for x in dfiles if isinstance(x, file_target)]) + self.verify_dynamic_targets([x for x in dfiles if isinstance(x, file_target)]) env.sos_dict.set('_depends', dfiles) env.sos_dict.set('step_depends', dfiles) @@ -1413,9 +1416,10 @@ def handle_unknown_target(self, e): if not res: raise e - def verify_dynamic_depends(self, targets): - if not targets: + def verify_dynamic_targets(self, targets): + if not targets or not env.config['trace_existing']: return + self.socket.send_pyobj(['dependent_target'] + targets) res = self.socket.recv() if res != b'target_resolved': diff --git a/src/sos/workflow_executor.py b/src/sos/workflow_executor.py index 131472271..39cb92697 100755 --- a/src/sos/workflow_executor.py +++ b/src/sos/workflow_executor.py @@ -529,6 +529,7 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]= '__environ_vars__': environ_vars, '__changed_vars__': changed_vars, '__dynamic_depends__': res['dynamic_depends'], + '__dynamic_input__': res['dynamic_input'] } if idx == 0: context['__step_output__'] = env.sos_dict['__step_output__'] @@ -605,6 +606,8 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]= context['__changed_vars__'] = res['changed_vars'] context['__default_output__'] = env.sos_dict['__default_output__'] context['__dynamic_depends__'] = res['dynamic_depends'] + context['__dynamic_input__'] = res['dynamic_input'] + # NOTE: If a step is called multiple times with different targets, it is much better # to use different names because pydotplus can be very slow in handling graphs with nodes # with identical names. @@ -620,8 +623,13 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]= added_node += 1 resolved += 1 - # for existing targets... we should check if it actually exists. If - # not it would still need to be regenerated + # for existing targets that are not in DAG + if not env.config['trace_existing']: + if added_node == 0: + break + else: + continue + node_added = False existing_targets = set(dag.dangling(targets)[1]) for target in existing_targets: @@ -629,82 +637,13 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]= existing_targets = set(dag.dangling(targets)[1]) node_added = False if target not in existing_targets: + # target already in DAG and not reported as existing continue - if file_target(target).target_exists('target') if isinstance(target, str) else target.target_exists('target'): - continue - mo = self.match(target) - if not mo: - # this is ok, this is just an existing target, no one is designed to - # generate it. - continue - if len(mo) > 1: - # this is not ok. - raise RuntimeError( - f'Multiple steps {", ".join(x.step_name() for x in mo)} to generate target {target}') - # - # only one step, we need to process it # execute section with specified input - # - if not isinstance(mo[0], tuple): - section = mo[0] - env.sos_dict['__default_output__'] = sos_targets(target) - context = {} - else: - section = mo[0][0] - if isinstance(mo[0][1], dict): - for k, v in mo[0][1].items(): - env.sos_dict.set(k, v) - - if mo[0][1]: - env.sos_dict['__default_output__'] = sos_targets(target) - context = {} - else: - env.sos_dict['__default_output__'] = sos_targets( - section.options['provides']) - context = mo[0][1] - # will become input, set to None - env.sos_dict['__step_output__'] = sos_targets() - # - res = analyze_section(section, default_output=env.sos_dict['__default_output__']) - # - # build DAG with input and output files of step - env.logger.debug( - f'Adding step {res["step_name"]} with output {short_repr(res["step_output"])} to resolve target {target}') - - context['__signature_vars__'] = res['signature_vars'] - context['__environ_vars__'] = res['environ_vars'] - context['__changed_vars__'] = res['changed_vars'] - context['__default_output__'] = env.sos_dict['__default_output__'] - context['__dynamic_depends__'] = res['dynamic_depends'] - - # NOTE: If a step is called multiple times with different targets, it is much better - # to use different names because pydotplus can be very slow in handling graphs with nodes - # with identical names. - node_name = section.step_name() - if env.sos_dict["__default_output__"]: - node_name += f' {short_repr(env.sos_dict["__default_output__"])})' - dag.add_step(section.uuid, node_name, - None, res['step_input'], - res['step_depends'], res['step_output'], context=context) - node_added = True - added_node += 1 - # this case do not count as resolved - # resolved += 1 - - # for depending targets... they already exist but we will add - # nodes that generates them if available. - node_added = False - depending_targets = set(dag.dangling(targets)[2]) - for target in depending_targets: - if node_added: - depending_targets = set(dag.dangling(targets)[2]) - node_added = False - if target not in depending_targets: - continue + # now we need to build DAG for existing... mo = self.match(target) if not mo: # this is ok, this is just an existing target, no one is designed to # generate it. - env.logger.debug(f'{target} already exists') continue if len(mo) > 1: # this is not ok. @@ -744,6 +683,7 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]= context['__changed_vars__'] = res['changed_vars'] context['__default_output__'] = env.sos_dict['__default_output__'] context['__dynamic_depends__'] = res['dynamic_depends'] + context['__dynamic_input__'] = res['dynamic_input'] # NOTE: If a step is called multiple times with different targets, it is much better # to use different names because pydotplus can be very slow in handling graphs with nodes @@ -794,7 +734,9 @@ def initialize_dag(self, targets: Optional[List[str]] = [], nested: bool = False context = {'__signature_vars__': signature_vars, '__environ_vars__': environ_vars, '__changed_vars__': changed_vars, - '__dynamic_depends__': res['dynamic_depends']} + '__dynamic_depends__': res['dynamic_depends'], + '__dynamic_input__': res['dynamic_input'] + } # for nested workflow, the input is specified by sos_run, not None. if idx == 0: @@ -903,10 +845,10 @@ def handle_dependent_target(self, dag, targets, runnable) -> int: # for depending targets... they already exist but we will add # nodes that generates them if available. node_added = False - depending_targets = set(dag.dangling(targets)[2]) + depending_targets = set(dag.dangling(targets)[1]) for target in depending_targets: if node_added: - depending_targets = set(dag.dangling(targets)[2]) + depending_targets = set(dag.dangling(targets)[1]) node_added = False if target not in depending_targets: continue @@ -954,6 +896,7 @@ def handle_dependent_target(self, dag, targets, runnable) -> int: context['__changed_vars__'] = res['changed_vars'] context['__default_output__'] = env.sos_dict['__default_output__'] context['__dynamic_depends__'] = res['dynamic_depends'] + context['__dynamic_input__'] = res['dynamic_input'] # NOTE: If a step is called multiple times with different targets, it is much better # to use different names because pydotplus can be very slow in handling graphs with nodes diff --git a/test/test_execute.py b/test/test_execute.py index 07572e083..09f52fb6b 100644 --- a/test/test_execute.py +++ b/test/test_execute.py @@ -2153,7 +2153,7 @@ def testReexecutionOfDynamicDepends(self): Base_Executor(wf).run() # if we run again, because depends, the step will be re-checked os.remove('a.bam') - res = Base_Executor(wf).run() + res = Base_Executor(wf, config={'trace_existing': True}).run() self.assertEqual(res['__completed__']['__step_completed__'], 2) self.assertEqual(res['__completed__']['__step_skipped__'], 1)