From f1759cdd17a29f794f4792bab7759df81e3028c6 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 1 Oct 2024 09:09:47 +0100 Subject: [PATCH] [ML] Handle parsing ingest processors where the definition is not a object (#113697) --- docs/changelog/113697.yaml | 6 +++ .../InferenceProcessorInfoExtractor.java | 34 +++++++--------- .../InferenceProcessorInfoExtractorTests.java | 40 +++++++++++++++++++ 3 files changed, 60 insertions(+), 20 deletions(-) create mode 100644 docs/changelog/113697.yaml diff --git a/docs/changelog/113697.yaml b/docs/changelog/113697.yaml new file mode 100644 index 0000000000000..1362e01fcc89b --- /dev/null +++ b/docs/changelog/113697.yaml @@ -0,0 +1,6 @@ +pr: 113697 +summary: Handle parsing ingest processors where definition is not a object +area: Machine Learning +type: bug +issues: + - 113615 diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractor.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractor.java index 1be495a8a82f5..e61342d281c90 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractor.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractor.java @@ -85,13 +85,7 @@ public static Set getModelIdsFromInferenceProcessors(IngestMetadata inge List> processorConfigs = ConfigurationUtils.readList(null, null, configMap, PROCESSORS_KEY); for (Map processorConfigWithKey : processorConfigs) { for (Map.Entry entry : processorConfigWithKey.entrySet()) { - addModelsAndPipelines( - entry.getKey(), - pipelineId, - (Map) entry.getValue(), - pam -> modelIds.add(pam.modelIdOrAlias()), - 0 - ); + addModelsAndPipelines(entry.getKey(), pipelineId, entry.getValue(), pam -> modelIds.add(pam.modelIdOrAlias()), 0); } } }); @@ -119,7 +113,7 @@ public static Map> pipelineIdsByResource(ClusterState state, List> processorConfigs = ConfigurationUtils.readList(null, null, configMap, PROCESSORS_KEY); for (Map processorConfigWithKey : processorConfigs) { for (Map.Entry entry : processorConfigWithKey.entrySet()) { - addModelsAndPipelines(entry.getKey(), pipelineId, (Map) entry.getValue(), pam -> { + addModelsAndPipelines(entry.getKey(), pipelineId, entry.getValue(), pam -> { if (ids.contains(pam.modelIdOrAlias)) { pipelineIdsByModelIds.computeIfAbsent(pam.modelIdOrAlias, m -> new LinkedHashSet<>()).add(pipelineId); } @@ -151,7 +145,7 @@ public static Set pipelineIdsForResource(ClusterState state, Set List> processorConfigs = ConfigurationUtils.readList(null, null, configMap, PROCESSORS_KEY); for (Map processorConfigWithKey : processorConfigs) { for (Map.Entry entry : processorConfigWithKey.entrySet()) { - addModelsAndPipelines(entry.getKey(), pipelineId, (Map) entry.getValue(), pam -> { + addModelsAndPipelines(entry.getKey(), pipelineId, entry.getValue(), pam -> { if (ids.contains(pam.modelIdOrAlias)) { pipelineIds.add(pipelineId); } @@ -166,7 +160,7 @@ public static Set pipelineIdsForResource(ClusterState state, Set private static void addModelsAndPipelines( String processorType, String pipelineId, - Map processorDefinition, + Object processorDefinition, Consumer handler, int level ) { @@ -178,14 +172,16 @@ private static void addModelsAndPipelines( return; } if (InferenceProcessorConstants.TYPE.equals(processorType)) { - String modelId = (String) processorDefinition.get(MODEL_ID_RESULTS_FIELD); - if (modelId != null) { - handler.accept(new PipelineAndModel(pipelineId, modelId)); + if (processorDefinition instanceof Map definitionMap) { + String modelId = (String) definitionMap.get(MODEL_ID_RESULTS_FIELD); + if (modelId != null) { + handler.accept(new PipelineAndModel(pipelineId, modelId)); + } } return; } - if (FOREACH_PROCESSOR_NAME.equals(processorType)) { - Map innerProcessor = (Map) processorDefinition.get("processor"); + if (FOREACH_PROCESSOR_NAME.equals(processorType) && processorDefinition instanceof Map definitionMap) { + Map innerProcessor = (Map) definitionMap.get("processor"); if (innerProcessor != null) { // a foreach processor should only have a SINGLE nested processor. Iteration is for simplicity's sake. for (Map.Entry innerProcessorWithName : innerProcessor.entrySet()) { @@ -200,18 +196,16 @@ private static void addModelsAndPipelines( } return; } - if (processorDefinition.containsKey(Pipeline.ON_FAILURE_KEY)) { + if (processorDefinition instanceof Map definitionMap && definitionMap.containsKey(Pipeline.ON_FAILURE_KEY)) { List> onFailureConfigs = ConfigurationUtils.readList( null, null, - processorDefinition, + (Map) definitionMap, Pipeline.ON_FAILURE_KEY ); onFailureConfigs.stream() .flatMap(map -> map.entrySet().stream()) - .forEach( - entry -> addModelsAndPipelines(entry.getKey(), pipelineId, (Map) entry.getValue(), handler, level + 1) - ); + .forEach(entry -> addModelsAndPipelines(entry.getKey(), pipelineId, entry.getValue(), handler, level + 1)); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractorTests.java index a5d823fc2144f..d62360e895476 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractorTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractorTests.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -140,6 +141,18 @@ public void testNumInferenceProcessorsRecursivelyDefined() throws IOException { assertThat(InferenceProcessorInfoExtractor.countInferenceProcessors(cs), equalTo(3)); } + public void testScriptProcessorStringConfig() throws IOException { + Set expectedModelIds = Set.of("foo"); + + ClusterState clusterState = buildClusterStateWithPipelineConfigurations( + Map.of("processor_does_not_have_a_definition_object", newConfigurationWithScriptProcessor("foo")) + ); + IngestMetadata ingestMetadata = clusterState.metadata().custom(IngestMetadata.TYPE); + Set actualModelIds = InferenceProcessorInfoExtractor.getModelIdsFromInferenceProcessors(ingestMetadata); + + assertThat(actualModelIds, equalTo(expectedModelIds)); + } + private static PipelineConfiguration newConfigurationWithOutInferenceProcessor(int i) throws IOException { try ( XContentBuilder xContentBuilder = XContentFactory.jsonBuilder() @@ -173,6 +186,12 @@ private static ClusterState buildClusterStateWithModelReferences(int numPipeline for (int i = 0; i < numPipelinesWithoutModel; i++) { configurations.put("pipeline_without_model_" + i, newConfigurationWithOutInferenceProcessor(i)); } + + return buildClusterStateWithPipelineConfigurations(configurations); + } + + private static ClusterState buildClusterStateWithPipelineConfigurations(Map configurations) + throws IOException { IngestMetadata ingestMetadata = new IngestMetadata(configurations); return ClusterState.builder(new ClusterName("_name")) @@ -206,6 +225,24 @@ private static PipelineConfiguration newConfigurationWithForeachProcessorProcess } } + private static PipelineConfiguration newConfigurationWithScriptProcessor(String modelId) throws IOException { + try ( + XContentBuilder xContentBuilder = XContentFactory.jsonBuilder() + .map( + Collections.singletonMap( + "processors", + List.of(forScriptProcessorWithStringConfig(), forEachProcessorWithInference(modelId)) + ) + ) + ) { + return new PipelineConfiguration( + "pipeline_with_script_and_model_" + modelId, + BytesReference.bytes(xContentBuilder), + XContentType.JSON + ); + } + } + private static Map forEachProcessorWithInference(String modelId) { return Collections.singletonMap("foreach", new HashMap<>() { { @@ -229,4 +266,7 @@ private static Map inferenceProcessorForModel(String modelId) { }); } + private static Map forScriptProcessorWithStringConfig() { + return Collections.singletonMap("script", "ctx.test=2;"); + } }