Skip to content

Commit

Permalink
[ML] Handle parsing ingest processors where the definition is not a o…
Browse files Browse the repository at this point in the history
…bject (elastic#113697)
  • Loading branch information
davidkyle committed Oct 1, 2024
1 parent 283bb28 commit f1759cd
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 20 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/113697.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 113697
summary: Handle parsing ingest processors where definition is not a object
area: Machine Learning
type: bug
issues:
- 113615
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,7 @@ public static Set<String> getModelIdsFromInferenceProcessors(IngestMetadata inge
List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, configMap, PROCESSORS_KEY);
for (Map<String, Object> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Object> entry : processorConfigWithKey.entrySet()) {
addModelsAndPipelines(
entry.getKey(),
pipelineId,
(Map<String, Object>) entry.getValue(),
pam -> modelIds.add(pam.modelIdOrAlias()),
0
);
addModelsAndPipelines(entry.getKey(), pipelineId, entry.getValue(), pam -> modelIds.add(pam.modelIdOrAlias()), 0);
}
}
});
Expand Down Expand Up @@ -119,7 +113,7 @@ public static Map<String, Set<String>> pipelineIdsByResource(ClusterState state,
List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, configMap, PROCESSORS_KEY);
for (Map<String, Object> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Object> entry : processorConfigWithKey.entrySet()) {
addModelsAndPipelines(entry.getKey(), pipelineId, (Map<String, Object>) entry.getValue(), pam -> {
addModelsAndPipelines(entry.getKey(), pipelineId, entry.getValue(), pam -> {
if (ids.contains(pam.modelIdOrAlias)) {
pipelineIdsByModelIds.computeIfAbsent(pam.modelIdOrAlias, m -> new LinkedHashSet<>()).add(pipelineId);
}
Expand Down Expand Up @@ -151,7 +145,7 @@ public static Set<String> pipelineIdsForResource(ClusterState state, Set<String>
List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, configMap, PROCESSORS_KEY);
for (Map<String, Object> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Object> entry : processorConfigWithKey.entrySet()) {
addModelsAndPipelines(entry.getKey(), pipelineId, (Map<String, Object>) entry.getValue(), pam -> {
addModelsAndPipelines(entry.getKey(), pipelineId, entry.getValue(), pam -> {
if (ids.contains(pam.modelIdOrAlias)) {
pipelineIds.add(pipelineId);
}
Expand All @@ -166,7 +160,7 @@ public static Set<String> pipelineIdsForResource(ClusterState state, Set<String>
private static void addModelsAndPipelines(
String processorType,
String pipelineId,
Map<String, Object> processorDefinition,
Object processorDefinition,
Consumer<PipelineAndModel> handler,
int level
) {
Expand All @@ -178,14 +172,16 @@ private static void addModelsAndPipelines(
return;
}
if (InferenceProcessorConstants.TYPE.equals(processorType)) {
String modelId = (String) processorDefinition.get(MODEL_ID_RESULTS_FIELD);
if (modelId != null) {
handler.accept(new PipelineAndModel(pipelineId, modelId));
if (processorDefinition instanceof Map<?, ?> definitionMap) {
String modelId = (String) definitionMap.get(MODEL_ID_RESULTS_FIELD);
if (modelId != null) {
handler.accept(new PipelineAndModel(pipelineId, modelId));
}
}
return;
}
if (FOREACH_PROCESSOR_NAME.equals(processorType)) {
Map<String, Object> innerProcessor = (Map<String, Object>) processorDefinition.get("processor");
if (FOREACH_PROCESSOR_NAME.equals(processorType) && processorDefinition instanceof Map<?, ?> definitionMap) {
Map<String, Object> innerProcessor = (Map<String, Object>) definitionMap.get("processor");
if (innerProcessor != null) {
// a foreach processor should only have a SINGLE nested processor. Iteration is for simplicity's sake.
for (Map.Entry<String, Object> innerProcessorWithName : innerProcessor.entrySet()) {
Expand All @@ -200,18 +196,16 @@ private static void addModelsAndPipelines(
}
return;
}
if (processorDefinition.containsKey(Pipeline.ON_FAILURE_KEY)) {
if (processorDefinition instanceof Map<?, ?> definitionMap && definitionMap.containsKey(Pipeline.ON_FAILURE_KEY)) {
List<Map<String, Object>> onFailureConfigs = ConfigurationUtils.readList(
null,
null,
processorDefinition,
(Map<String, Object>) definitionMap,
Pipeline.ON_FAILURE_KEY
);
onFailureConfigs.stream()
.flatMap(map -> map.entrySet().stream())
.forEach(
entry -> addModelsAndPipelines(entry.getKey(), pipelineId, (Map<String, Object>) entry.getValue(), handler, level + 1)
);
.forEach(entry -> addModelsAndPipelines(entry.getKey(), pipelineId, entry.getValue(), handler, level + 1));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -140,6 +141,18 @@ public void testNumInferenceProcessorsRecursivelyDefined() throws IOException {
assertThat(InferenceProcessorInfoExtractor.countInferenceProcessors(cs), equalTo(3));
}

public void testScriptProcessorStringConfig() throws IOException {
Set<String> expectedModelIds = Set.of("foo");

ClusterState clusterState = buildClusterStateWithPipelineConfigurations(
Map.of("processor_does_not_have_a_definition_object", newConfigurationWithScriptProcessor("foo"))
);
IngestMetadata ingestMetadata = clusterState.metadata().custom(IngestMetadata.TYPE);
Set<String> actualModelIds = InferenceProcessorInfoExtractor.getModelIdsFromInferenceProcessors(ingestMetadata);

assertThat(actualModelIds, equalTo(expectedModelIds));
}

private static PipelineConfiguration newConfigurationWithOutInferenceProcessor(int i) throws IOException {
try (
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
Expand Down Expand Up @@ -173,6 +186,12 @@ private static ClusterState buildClusterStateWithModelReferences(int numPipeline
for (int i = 0; i < numPipelinesWithoutModel; i++) {
configurations.put("pipeline_without_model_" + i, newConfigurationWithOutInferenceProcessor(i));
}

return buildClusterStateWithPipelineConfigurations(configurations);
}

private static ClusterState buildClusterStateWithPipelineConfigurations(Map<String, PipelineConfiguration> configurations)
throws IOException {
IngestMetadata ingestMetadata = new IngestMetadata(configurations);

return ClusterState.builder(new ClusterName("_name"))
Expand Down Expand Up @@ -206,6 +225,24 @@ private static PipelineConfiguration newConfigurationWithForeachProcessorProcess
}
}

private static PipelineConfiguration newConfigurationWithScriptProcessor(String modelId) throws IOException {
try (
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
.map(
Collections.singletonMap(
"processors",
List.of(forScriptProcessorWithStringConfig(), forEachProcessorWithInference(modelId))
)
)
) {
return new PipelineConfiguration(
"pipeline_with_script_and_model_" + modelId,
BytesReference.bytes(xContentBuilder),
XContentType.JSON
);
}
}

private static Map<String, Object> forEachProcessorWithInference(String modelId) {
return Collections.singletonMap("foreach", new HashMap<>() {
{
Expand All @@ -229,4 +266,7 @@ private static Map<String, Object> inferenceProcessorForModel(String modelId) {
});
}

private static Map<String, Object> forScriptProcessorWithStringConfig() {
return Collections.singletonMap("script", "ctx.test=2;");
}
}

0 comments on commit f1759cd

Please sign in to comment.