diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 4ca06f63991a6..4cc4fb69a54fc 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -71,26 +71,31 @@ public class IngestService implements ClusterStateApplier { public static final String NOOP_PIPELINE_NAME = "_none"; private final ClusterService clusterService; - private final PipelineStore pipelineStore; - private final PipelineExecutionService pipelineExecutionService; + private final Map processorFactories; + // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. + // We know of all the processor factories when a node with all its plugin have been initialized. Also some + // processor factories rely on other node services. Custom metadata is statically registered when classes + // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. + private volatile Map pipelines = new HashMap<>(); + private final ThreadPool threadPool; + private final StatsHolder totalStats = new StatsHolder(); + private volatile Map statsHolderPerPipeline = Collections.emptyMap(); public IngestService(ClusterService clusterService, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, List ingestPlugins) { this.clusterService = clusterService; - this.pipelineStore = new PipelineStore( - processorFactories( - ingestPlugins, - new Processor.Parameters( - env, scriptService, analysisRegistry, - threadPool.getThreadContext(), threadPool::relativeTimeInMillis, - (delay, command) -> threadPool.schedule( - TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command - ) + this.processorFactories = processorFactories( + ingestPlugins, + new Processor.Parameters( + env, scriptService, analysisRegistry, + threadPool.getThreadContext(), threadPool::relativeTimeInMillis, + (delay, command) -> threadPool.schedule( + TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command ) ) ); - this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool); + this.threadPool = threadPool; } private static Map processorFactories(List ingestPlugins, @@ -114,8 +119,7 @@ public ClusterService getClusterService() { /** * Deletes the pipeline specified by id in the request. */ - public void delete(DeletePipelineRequest request, - ActionListener listener) { + public void delete(DeletePipelineRequest request, ActionListener listener) { clusterService.submitStateUpdateTask("delete-pipeline-" + request.getId(), new AckedClusterStateUpdateTask(request, listener) { @@ -198,32 +202,23 @@ static List innerGetPipelines(IngestMetadata ingestMetada return result; } - public void executeBulkRequest(Iterable> actionRequests, BiConsumer itemFailureHandler, - Consumer completionHandler) { - pipelineExecutionService.executeBulkRequest(actionRequests, itemFailureHandler, completionHandler); - } - - public IngestStats stats() { - return pipelineExecutionService.stats(); - } - /** * Stores the specified pipeline definition in the request. */ public void putPipeline(Map ingestInfos, PutPipelineRequest request, ActionListener listener) throws Exception { - pipelineStore.put(clusterService, ingestInfos, request, listener); + put(clusterService, ingestInfos, request, listener); } /** * Returns the pipeline by the specified id */ public Pipeline getPipeline(String id) { - return pipelineStore.get(id); + return pipelines.get(id); } public Map getProcessorFactories() { - return pipelineStore.getProcessorFactories(); + return processorFactories; } public IngestInfo info() { @@ -236,99 +231,64 @@ public IngestInfo info() { } Map pipelines() { - return pipelineStore.pipelines; - } - - void validatePipeline(Map ingestInfos, PutPipelineRequest request) throws Exception { - pipelineStore.validatePipeline(ingestInfos, request); - } - - void updatePipelineStats(IngestMetadata ingestMetadata) { - pipelineExecutionService.updatePipelineStats(ingestMetadata); + return pipelines; } @Override public void applyClusterState(final ClusterChangedEvent event) { ClusterState state = event.state(); - pipelineStore.innerUpdatePipelines(event.previousState(), state); + innerUpdatePipelines(event.previousState(), state); IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); if (ingestMetadata != null) { - pipelineExecutionService.updatePipelineStats(ingestMetadata); + updatePipelineStats(ingestMetadata); } } - public static final class PipelineStore { - - private final Map processorFactories; - - // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. - // We know of all the processor factories when a node with all its plugin have been initialized. Also some - // processor factories rely on other node services. Custom metadata is statically registered when classes - // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. - volatile Map pipelines = new HashMap<>(); - - private PipelineStore(Map processorFactories) { - this.processorFactories = processorFactories; - } - - void innerUpdatePipelines(ClusterState previousState, ClusterState state) { - if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { - return; + private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { + String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; + String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; + String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]"; + Processor failureProcessor = new AbstractProcessor(tag) { + @Override + public void execute(IngestDocument ingestDocument) { + throw new IllegalStateException(errorMessage); } - IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); - IngestMetadata previousIngestMetadata = previousState.getMetaData().custom(IngestMetadata.TYPE); - if (Objects.equals(ingestMetadata, previousIngestMetadata)) { - return; + @Override + public String getType() { + return type; } + }; + String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded"; + return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor)); + } - Map pipelines = new HashMap<>(); - List exceptions = new ArrayList<>(); - for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { - try { - pipelines.put(pipeline.getId(), Pipeline.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); - } catch (ElasticsearchParseException e) { - pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e)); - exceptions.add(e); - } catch (Exception e) { - ElasticsearchParseException parseException = new ElasticsearchParseException( - "Error updating pipeline with id [" + pipeline.getId() + "]", e); - pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), parseException)); - exceptions.add(parseException); - } - } - this.pipelines = Collections.unmodifiableMap(pipelines); - ExceptionsHelper.rethrowAndSuppress(exceptions); + static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) { + IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); + Map pipelines; + if (currentIngestMetadata != null) { + pipelines = new HashMap<>(currentIngestMetadata.getPipelines()); + } else { + pipelines = new HashMap<>(); } - private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { - String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; - String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; - String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]"; - Processor failureProcessor = new AbstractProcessor(tag) { - @Override - public void execute(IngestDocument ingestDocument) { - throw new IllegalStateException(errorMessage); - } - - @Override - public String getType() { - return type; - } - }; - String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded"; - return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor)); - } + pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType())); + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)) + .build()); + return newState.build(); + } - /** - * Stores the specified pipeline definition in the request. - */ - public void put(ClusterService clusterService, Map ingestInfos, PutPipelineRequest request, - ActionListener listener) throws Exception { - // validates the pipeline and processor configuration before submitting a cluster update task: - validatePipeline(ingestInfos, request); - clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), - new AckedClusterStateUpdateTask(request, listener) { + /** + * Stores the specified pipeline definition in the request. + */ + public void put(ClusterService clusterService, Map ingestInfos, PutPipelineRequest request, + ActionListener listener) throws Exception { + // validates the pipeline and processor configuration before submitting a cluster update task: + validatePipeline(ingestInfos, request); + clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), + new AckedClusterStateUpdateTask(request, listener) { @Override protected AcknowledgedResponse newResponse(boolean acknowledged) { @@ -340,222 +300,202 @@ public ClusterState execute(ClusterState currentState) { return innerPut(request, currentState); } }); - } - - void validatePipeline(Map ingestInfos, PutPipelineRequest request) throws Exception { - if (ingestInfos.isEmpty()) { - throw new IllegalStateException("Ingest info is empty"); - } + } - Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); - Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories); - List exceptions = new ArrayList<>(); - for (Processor processor : pipeline.flattenAllProcessors()) { - for (Map.Entry entry : ingestInfos.entrySet()) { - if (entry.getValue().containsProcessor(processor.getType()) == false) { - String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; - exceptions.add( - ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message) - ); - } - } - } - ExceptionsHelper.rethrowAndSuppress(exceptions); + void validatePipeline(Map ingestInfos, PutPipelineRequest request) throws Exception { + if (ingestInfos.isEmpty()) { + throw new IllegalStateException("Ingest info is empty"); } - static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) { - IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); - Map pipelines; - if (currentIngestMetadata != null) { - pipelines = new HashMap<>(currentIngestMetadata.getPipelines()); - } else { - pipelines = new HashMap<>(); + Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); + Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories); + List exceptions = new ArrayList<>(); + for (Processor processor : pipeline.flattenAllProcessors()) { + for (Map.Entry entry : ingestInfos.entrySet()) { + if (entry.getValue().containsProcessor(processor.getType()) == false) { + String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; + exceptions.add( + ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message) + ); + } } - - pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType())); - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()) - .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)) - .build()); - return newState.build(); - } - - /** - * Returns the pipeline by the specified id - */ - public Pipeline get(String id) { - return pipelines.get(id); - } - - public Map getProcessorFactories() { - return processorFactories; } + ExceptionsHelper.rethrowAndSuppress(exceptions); } - private static final class PipelineExecutionService { - - private final PipelineStore store; - private final ThreadPool threadPool; + public void executeBulkRequest(Iterable> actionRequests, + BiConsumer itemFailureHandler, Consumer completionHandler) { + threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { - private final StatsHolder totalStats = new StatsHolder(); - private volatile Map statsHolderPerPipeline = Collections.emptyMap(); - - PipelineExecutionService(PipelineStore store, ThreadPool threadPool) { - this.store = store; - this.threadPool = threadPool; - } - - void executeBulkRequest(Iterable> actionRequests, - BiConsumer itemFailureHandler, - Consumer completionHandler) { - threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { - - @Override - public void onFailure(Exception e) { - completionHandler.accept(e); - } + @Override + public void onFailure(Exception e) { + completionHandler.accept(e); + } - @Override - protected void doRun() { - for (DocWriteRequest actionRequest : actionRequests) { - IndexRequest indexRequest = null; - if (actionRequest instanceof IndexRequest) { - indexRequest = (IndexRequest) actionRequest; - } else if (actionRequest instanceof UpdateRequest) { - UpdateRequest updateRequest = (UpdateRequest) actionRequest; - indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest(); - } - if (indexRequest == null) { - continue; - } - String pipeline = indexRequest.getPipeline(); - if (NOOP_PIPELINE_NAME.equals(pipeline) == false) { - try { - innerExecute(indexRequest, getPipeline(indexRequest.getPipeline())); - //this shouldn't be needed here but we do it for consistency with index api - // which requires it to prevent double execution - indexRequest.setPipeline(NOOP_PIPELINE_NAME); - } catch (Exception e) { - itemFailureHandler.accept(indexRequest, e); + @Override + protected void doRun() { + for (DocWriteRequest actionRequest : actionRequests) { + IndexRequest indexRequest = null; + if (actionRequest instanceof IndexRequest) { + indexRequest = (IndexRequest) actionRequest; + } else if (actionRequest instanceof UpdateRequest) { + UpdateRequest updateRequest = (UpdateRequest) actionRequest; + indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest(); + } + if (indexRequest == null) { + continue; + } + String pipelineId = indexRequest.getPipeline(); + if (NOOP_PIPELINE_NAME.equals(pipelineId) == false) { + try { + Pipeline pipeline = pipelines.get(pipelineId); + if (pipeline == null) { + throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); } + innerExecute(indexRequest, pipeline); + //this shouldn't be needed here but we do it for consistency with index api + // which requires it to prevent double execution + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + } catch (Exception e) { + itemFailureHandler.accept(indexRequest, e); } } - completionHandler.accept(null); } - }); - } - - IngestStats stats() { - Map statsHolderPerPipeline = this.statsHolderPerPipeline; - - Map statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size()); - for (Map.Entry entry : statsHolderPerPipeline.entrySet()) { - statsPerPipeline.put(entry.getKey(), entry.getValue().createStats()); + completionHandler.accept(null); } + }); + } - return new IngestStats(totalStats.createStats(), statsPerPipeline); + public IngestStats stats() { + Map statsHolderPerPipeline = this.statsHolderPerPipeline; + + Map statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size()); + for (Map.Entry entry : statsHolderPerPipeline.entrySet()) { + statsPerPipeline.put(entry.getKey(), entry.getValue().createStats()); } - void updatePipelineStats(IngestMetadata ingestMetadata) { - boolean changed = false; - Map newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline); - Iterator iterator = newStatsPerPipeline.keySet().iterator(); - while (iterator.hasNext()) { - String pipeline = iterator.next(); - if (ingestMetadata.getPipelines().containsKey(pipeline) == false) { - iterator.remove(); - changed = true; - } - } - for (String pipeline : ingestMetadata.getPipelines().keySet()) { - if (newStatsPerPipeline.containsKey(pipeline) == false) { - newStatsPerPipeline.put(pipeline, new StatsHolder()); - changed = true; - } - } + return new IngestStats(totalStats.createStats(), statsPerPipeline); + } - if (changed) { - statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline); + void updatePipelineStats(IngestMetadata ingestMetadata) { + boolean changed = false; + Map newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline); + Iterator iterator = newStatsPerPipeline.keySet().iterator(); + while (iterator.hasNext()) { + String pipeline = iterator.next(); + if (ingestMetadata.getPipelines().containsKey(pipeline) == false) { + iterator.remove(); + changed = true; } } - - private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception { - if (pipeline.getProcessors().isEmpty()) { - return; + for (String pipeline : ingestMetadata.getPipelines().keySet()) { + if (newStatsPerPipeline.containsKey(pipeline) == false) { + newStatsPerPipeline.put(pipeline, new StatsHolder()); + changed = true; } + } - long startTimeInNanos = System.nanoTime(); - // the pipeline specific stat holder may not exist and that is fine: - // (e.g. the pipeline may have been removed while we're ingesting a document - Optional pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId())); - try { - totalStats.preIngest(); - pipelineStats.ifPresent(StatsHolder::preIngest); - String index = indexRequest.index(); - String type = indexRequest.type(); - String id = indexRequest.id(); - String routing = indexRequest.routing(); - Long version = indexRequest.version(); - VersionType versionType = indexRequest.versionType(); - Map sourceAsMap = indexRequest.sourceAsMap(); - IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap); - pipeline.execute(ingestDocument); - - Map metadataMap = ingestDocument.extractMetadata(); - //it's fine to set all metadata fields all the time, as ingest document holds their starting values - //before ingestion, which might also get modified during ingestion. - indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX)); - indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE)); - indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID)); - indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING)); - indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue()); - if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) { - indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); - } - indexRequest.source(ingestDocument.getSourceAndMetadata()); - } catch (Exception e) { - totalStats.ingestFailed(); - pipelineStats.ifPresent(StatsHolder::ingestFailed); - throw e; - } finally { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); - totalStats.postIngest(ingestTimeInMillis); - pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis)); - } + if (changed) { + statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline); + } + } + + private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception { + if (pipeline.getProcessors().isEmpty()) { + return; } - private Pipeline getPipeline(String pipelineId) { - Pipeline pipeline = store.get(pipelineId); - if (pipeline == null) { - throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + long startTimeInNanos = System.nanoTime(); + // the pipeline specific stat holder may not exist and that is fine: + // (e.g. the pipeline may have been removed while we're ingesting a document + Optional pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId())); + try { + totalStats.preIngest(); + pipelineStats.ifPresent(StatsHolder::preIngest); + String index = indexRequest.index(); + String type = indexRequest.type(); + String id = indexRequest.id(); + String routing = indexRequest.routing(); + Long version = indexRequest.version(); + VersionType versionType = indexRequest.versionType(); + Map sourceAsMap = indexRequest.sourceAsMap(); + IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap); + pipeline.execute(ingestDocument); + + Map metadataMap = ingestDocument.extractMetadata(); + //it's fine to set all metadata fields all the time, as ingest document holds their starting values + //before ingestion, which might also get modified during ingestion. + indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX)); + indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE)); + indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID)); + indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING)); + indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue()); + if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) { + indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); } - return pipeline; + indexRequest.source(ingestDocument.getSourceAndMetadata()); + } catch (Exception e) { + totalStats.ingestFailed(); + pipelineStats.ifPresent(StatsHolder::ingestFailed); + throw e; + } finally { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); + totalStats.postIngest(ingestTimeInMillis); + pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis)); } + } - private static class StatsHolder { + private void innerUpdatePipelines(ClusterState previousState, ClusterState state) { + if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + return; + } - private final MeanMetric ingestMetric = new MeanMetric(); - private final CounterMetric ingestCurrent = new CounterMetric(); - private final CounterMetric ingestFailed = new CounterMetric(); + IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); + IngestMetadata previousIngestMetadata = previousState.getMetaData().custom(IngestMetadata.TYPE); + if (Objects.equals(ingestMetadata, previousIngestMetadata)) { + return; + } - void preIngest() { - ingestCurrent.inc(); + Map pipelines = new HashMap<>(); + List exceptions = new ArrayList<>(); + for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { + try { + pipelines.put(pipeline.getId(), Pipeline.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); + } catch (ElasticsearchParseException e) { + pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e)); + exceptions.add(e); + } catch (Exception e) { + ElasticsearchParseException parseException = new ElasticsearchParseException( + "Error updating pipeline with id [" + pipeline.getId() + "]", e); + pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), parseException)); + exceptions.add(parseException); } + } + this.pipelines = Collections.unmodifiableMap(pipelines); + ExceptionsHelper.rethrowAndSuppress(exceptions); + } - void postIngest(long ingestTimeInMillis) { - ingestCurrent.dec(); - ingestMetric.inc(ingestTimeInMillis); - } + private static class StatsHolder { - void ingestFailed() { - ingestFailed.inc(); - } + private final MeanMetric ingestMetric = new MeanMetric(); + private final CounterMetric ingestCurrent = new CounterMetric(); + private final CounterMetric ingestFailed = new CounterMetric(); - IngestStats.Stats createStats() { - return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count()); - } + void preIngest() { + ingestCurrent.inc(); + } + void postIngest(long ingestTimeInMillis) { + ingestCurrent.dec(); + ingestMetric.inc(ingestTimeInMillis); } + void ingestFailed() { + ingestFailed.inc(); + } + + IngestStats.Stats createStats() { + return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count()); + } } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 10516dc0d0126..83a5bef4de279 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -206,7 +206,7 @@ public void testCrud() throws Exception { PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON); ClusterState previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); pipeline = ingestService.getPipeline(id); assertThat(pipeline, notNullValue()); @@ -233,7 +233,7 @@ public void testPut() { // add a new pipeline: PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": []}"), XContentType.JSON); ClusterState previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); pipeline = ingestService.getPipeline(id); assertThat(pipeline, notNullValue()); @@ -245,7 +245,7 @@ public void testPut() { putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON); previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); pipeline = ingestService.getPipeline(id); assertThat(pipeline, notNullValue()); @@ -264,7 +264,7 @@ public void testPutWithErrorResponse() { PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"description\": \"empty processors\"}"), XContentType.JSON); ClusterState previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); try { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); fail("should fail"); @@ -439,7 +439,7 @@ public String getType() { PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final SetOnce failure = new SetOnce<>(); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id); @@ -467,7 +467,7 @@ public void testExecuteBulkPipelineDoesNotExist() { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); BulkRequest bulkRequest = new BulkRequest(); @@ -507,7 +507,7 @@ public void testExecuteSuccess() { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") @@ -525,7 +525,7 @@ public void testExecuteEmptyPipeline() throws Exception { new PutPipelineRequest("_id", new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") @@ -545,7 +545,7 @@ public void testExecutePropagateAllMetaDataUpdates() throws Exception { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final long newVersion = randomLong(); final String versionType = randomFrom("internal", "external", "external_gt", "external_gte"); @@ -587,7 +587,7 @@ public void testExecuteFailure() throws Exception { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()) @@ -616,7 +616,7 @@ public void testExecuteSuccessWithOnFailure() throws Exception { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId(emptyMap())); @@ -645,7 +645,7 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()) @@ -700,7 +700,7 @@ public void testBulkRequestExecutionWithFailures() throws Exception { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @SuppressWarnings("unchecked") @@ -734,7 +734,7 @@ public void testBulkRequestExecution() { new PutPipelineRequest("_id", new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); ClusterState previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @SuppressWarnings("unchecked") @@ -762,12 +762,12 @@ public void testStats() { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); putRequest = new PutPipelineRequest("_id2", new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); previousClusterState = clusterState; - clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final Map configurationMap = new HashMap<>(); configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON));