diff --git a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/mapping/IterationMapping.java b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/mapping/IterationMapping.java index 752c0a45..bf0c5551 100644 --- a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/mapping/IterationMapping.java +++ b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/mapping/IterationMapping.java @@ -29,15 +29,18 @@ public class IterationMapping { private String collection; private String item; + private String index; private String identity; @JsonCreator public IterationMapping( @JsonProperty("collection") String collection, @JsonProperty("item") String item, + @JsonProperty("index") String index, @JsonProperty("identity") String identity) { this.collection = collection; this.item = item; + this.index = index; this.identity = identity; } } diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/ForeachTaskRunner.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/ForeachTaskRunner.java index 073a7636..30b812a2 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/ForeachTaskRunner.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/ForeachTaskRunner.java @@ -112,9 +112,7 @@ protected ExecutionResult doRun(String executionId, TaskInfo taskInfo, Map taskInfoMap = TaskInfoMaker.getMaker().makeTaskInfos(foreachTask.getTasks(), taskInfo, groupIndex); Set subTaskInfos = new HashSet<>(taskInfoMap.values()); - Map subContext = Maps.newConcurrentMap(); - subContext.putAll(input); - subContext.put(iterationMapping.getItem(), item); + Map subContext = buildSubContext(input, item, iterationMapping, groupIndex); // record whether the subtask is key if (existKeyExp(taskInfo)) { for (TaskInfo subTaskInfo : subTaskInfos) { @@ -146,6 +144,17 @@ protected ExecutionResult doRun(String executionId, TaskInfo taskInfo, Map buildSubContext(Map input, Object item, + IterationMapping iterationMapping, int groupIndex) { + Map subContext = Maps.newConcurrentMap(); + subContext.putAll(input); + subContext.put(iterationMapping.getItem(), item); + if (StringUtils.isNotEmpty(iterationMapping.getIndex())) { + subContext.put(iterationMapping.getIndex(), groupIndex); + } + return subContext; + } + private boolean isKeySubTask(String executionId, Map subContext, TaskInfo it) { return !stasher.needStash(executionId, it, subContext); } diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ForeachTaskTraversalTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ForeachTaskTraversalTest.groovy index d4d28581..d1727e55 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ForeachTaskTraversalTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/ForeachTaskTraversalTest.groovy @@ -198,6 +198,7 @@ class ForeachTaskTraversalTest extends Specification { " iterationMapping:\n" + " collection: \$.input.segments\n" + " item: segmentUrl\n" + + " index: subIndex\n" + " outputMappings:\n" + " - target: \$.context.gopUrls\n" + " source: \$.output.sub_context.[*].gopUrl\n" + @@ -210,6 +211,8 @@ class ForeachTaskTraversalTest extends Specification { " inputMappings:\n" + " - target: \$.input.segmentUrl\n" + " source: \$.context.segmentUrl\n" + + " - target: \$.input.subIndex\n" + + " source: \$.context.subIndex\n" + " outputMappings:\n" + " - target: \$.context.gopUrl\n" + " source: \$.output.gopUrl\n" + @@ -262,8 +265,8 @@ class ForeachTaskTraversalTest extends Specification { ((DAGCallbackInfo) event.data).context == ['url': 'http://test.com/result', 'segments': ['gopUrl1', 'gopUrl2'], 'gopUrls': ['gopResultUrl2', 'gopResultUrl1']] }) 1 * dispatcher.dispatch({ it -> it.taskInfo.name == 'A' && it.input == ['url': 'http://test.com/test'] }) - 1 * dispatcher.dispatch({ it -> it.taskInfo.name == 'B_0-B1' && it.input == ['segmentUrl': 'gopUrl1'] }) - 1 * dispatcher.dispatch({ it -> it.taskInfo.name == 'B_1-B1' && it.input == ['segmentUrl': 'gopUrl2'] }) + 1 * dispatcher.dispatch({ it -> it.taskInfo.name == 'B_0-B1' && it.input == ['segmentUrl': 'gopUrl1', 'subIndex': 0] }) + 1 * dispatcher.dispatch({ it -> it.taskInfo.name == 'B_1-B1' && it.input == ['segmentUrl': 'gopUrl2', 'subIndex': 1] }) 1 * dispatcher.dispatch({ it -> it.taskInfo.name == 'C' && it.input == ['gopUrls': ['gopResultUrl2', 'gopResultUrl1']] }) }