Skip to content

Commit

Permalink
add index into sub_context in foreach task (#78)
Browse files Browse the repository at this point in the history
* add index into sub_context in foreach task

---------

Co-authored-by: zeyu10 <zeyu10@staff.sina.com>
  • Loading branch information
techloghub and zeyu10 committed Aug 19, 2024
1 parent f40e732 commit 76924ec
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@
public class IterationMapping {
private String collection;
private String item;
private String index;
private String identity;

@JsonCreator
public IterationMapping(
@JsonProperty("collection") String collection,
@JsonProperty("item") String item,
@JsonProperty("index") String index,
@JsonProperty("identity") String identity) {
this.collection = collection;
this.item = item;
this.index = index;
this.identity = identity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,7 @@ protected ExecutionResult doRun(String executionId, TaskInfo taskInfo, Map<Strin
Map<String, TaskInfo> taskInfoMap = TaskInfoMaker.getMaker().makeTaskInfos(foreachTask.getTasks(), taskInfo, groupIndex);
Set<TaskInfo> subTaskInfos = new HashSet<>(taskInfoMap.values());

Map<String, Object> subContext = Maps.newConcurrentMap();
subContext.putAll(input);
subContext.put(iterationMapping.getItem(), item);
Map<String, Object> subContext = buildSubContext(input, item, iterationMapping, groupIndex);
// record whether the subtask is key
if (existKeyExp(taskInfo)) {
for (TaskInfo subTaskInfo : subTaskInfos) {
Expand Down Expand Up @@ -146,6 +144,17 @@ protected ExecutionResult doRun(String executionId, TaskInfo taskInfo, Map<Strin
return ExecutionResult.builder().taskStatus(taskInfo.getTaskStatus()).subTaskInfosAndContext(readyToRun).build();
}

private static Map<String, Object> buildSubContext(Map<String, Object> input, Object item,
IterationMapping iterationMapping, int groupIndex) {
Map<String, Object> subContext = Maps.newConcurrentMap();
subContext.putAll(input);
subContext.put(iterationMapping.getItem(), item);
if (StringUtils.isNotEmpty(iterationMapping.getIndex())) {
subContext.put(iterationMapping.getIndex(), groupIndex);
}
return subContext;
}

private boolean isKeySubTask(String executionId, Map<String, Object> subContext, TaskInfo it) {
return !stasher.needStash(executionId, it, subContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ class ForeachTaskTraversalTest extends Specification {
" iterationMapping:\n" +
" collection: \$.input.segments\n" +
" item: segmentUrl\n" +
" index: subIndex\n" +
" outputMappings:\n" +
" - target: \$.context.gopUrls\n" +
" source: \$.output.sub_context.[*].gopUrl\n" +
Expand All @@ -210,6 +211,8 @@ class ForeachTaskTraversalTest extends Specification {
" inputMappings:\n" +
" - target: \$.input.segmentUrl\n" +
" source: \$.context.segmentUrl\n" +
" - target: \$.input.subIndex\n" +
" source: \$.context.subIndex\n" +
" outputMappings:\n" +
" - target: \$.context.gopUrl\n" +
" source: \$.output.gopUrl\n" +
Expand Down Expand Up @@ -262,8 +265,8 @@ class ForeachTaskTraversalTest extends Specification {
((DAGCallbackInfo) event.data).context == ['url': 'http://test.com/result', 'segments': ['gopUrl1', 'gopUrl2'], 'gopUrls': ['gopResultUrl2', 'gopResultUrl1']]
})
1 * dispatcher.dispatch({ it -> it.taskInfo.name == 'A' && it.input == ['url': 'http://test.com/test'] })
1 * dispatcher.dispatch({ it -> it.taskInfo.name == 'B_0-B1' && it.input == ['segmentUrl': 'gopUrl1'] })
1 * dispatcher.dispatch({ it -> it.taskInfo.name == 'B_1-B1' && it.input == ['segmentUrl': 'gopUrl2'] })
1 * dispatcher.dispatch({ it -> it.taskInfo.name == 'B_0-B1' && it.input == ['segmentUrl': 'gopUrl1', 'subIndex': 0] })
1 * dispatcher.dispatch({ it -> it.taskInfo.name == 'B_1-B1' && it.input == ['segmentUrl': 'gopUrl2', 'subIndex': 1] })
1 * dispatcher.dispatch({ it -> it.taskInfo.name == 'C' && it.input == ['gopUrls': ['gopResultUrl2', 'gopResultUrl1']] })
}

Expand Down

0 comments on commit 76924ec

Please sign in to comment.