diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java index 024389dd22c7f..6c536a7019bb0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java @@ -41,6 +41,9 @@ default boolean runOnlyOnMaster() { /** * Callback invoked after new cluster state is published. Note that * this method is not invoked if the cluster state was not updated. + * + * Note that this method will be executed using system context. + * * @param clusterChangedEvent the change event for this cluster state change, containing * both old and new states */ diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java index b298e7e915dea..9dc9c7f6f52d0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java @@ -62,6 +62,12 @@ public String describeTasks(List tasks) { */ public abstract void onFailure(String source, Exception e); + @Override + public final void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + // final, empty implementation here as this method should only be defined in combination + // with a batching executor as it will always be executed within the system context. + } + /** * If the cluster state update task wasn't processed by the provided timeout, call * {@link ClusterStateTaskListener#onFailure(String, Exception)}. May return null to indicate no timeout is needed (default). diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 4432d864fd36a..2543be4811c1e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.threadpool.ThreadPool; @@ -59,6 +60,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.elasticsearch.cluster.service.ClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING; @@ -426,26 +428,28 @@ public TimeValue getMaxTaskWaitTime() { return threadPoolExecutor.getMaxTaskWaitTime(); } - private SafeClusterStateTaskListener safe(ClusterStateTaskListener listener) { + private SafeClusterStateTaskListener safe(ClusterStateTaskListener listener, Supplier contextSupplier) { if (listener instanceof AckedClusterStateTaskListener) { - return new SafeAckedClusterStateTaskListener((AckedClusterStateTaskListener) listener, logger); + return new SafeAckedClusterStateTaskListener((AckedClusterStateTaskListener) listener, contextSupplier, logger); } else { - return new SafeClusterStateTaskListener(listener, logger); + return new SafeClusterStateTaskListener(listener, contextSupplier, logger); } } private static class SafeClusterStateTaskListener implements ClusterStateTaskListener { private final ClusterStateTaskListener listener; + protected final Supplier context; private final Logger logger; - SafeClusterStateTaskListener(ClusterStateTaskListener listener, Logger logger) { + SafeClusterStateTaskListener(ClusterStateTaskListener listener, Supplier context, Logger logger) { this.listener = listener; + this.context = context; this.logger = logger; } @Override public void onFailure(String source, Exception e) { - try { + try (ThreadContext.StoredContext ignore = context.get()) { listener.onFailure(source, e); } catch (Exception inner) { inner.addSuppressed(e); @@ -456,7 +460,7 @@ public void onFailure(String source, Exception e) { @Override public void onNoLongerMaster(String source) { - try { + try (ThreadContext.StoredContext ignore = context.get()) { listener.onNoLongerMaster(source); } catch (Exception e) { logger.error(() -> new ParameterizedMessage( @@ -466,7 +470,7 @@ public void onNoLongerMaster(String source) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - try { + try (ThreadContext.StoredContext ignore = context.get()) { listener.clusterStateProcessed(source, oldState, newState); } catch (Exception e) { logger.error(() -> new ParameterizedMessage( @@ -480,8 +484,9 @@ private static class SafeAckedClusterStateTaskListener extends SafeClusterStateT private final AckedClusterStateTaskListener listener; private final Logger logger; - SafeAckedClusterStateTaskListener(AckedClusterStateTaskListener listener, Logger logger) { - super(listener, logger); + SafeAckedClusterStateTaskListener(AckedClusterStateTaskListener listener, Supplier context, + Logger logger) { + super(listener, context, logger); this.listener = listener; this.logger = logger; } @@ -493,7 +498,7 @@ public boolean mustAck(DiscoveryNode discoveryNode) { @Override public void onAllNodesAcked(@Nullable Exception e) { - try { + try (ThreadContext.StoredContext ignore = context.get()) { listener.onAllNodesAcked(e); } catch (Exception inner) { inner.addSuppressed(e); @@ -503,7 +508,7 @@ public void onAllNodesAcked(@Nullable Exception e) { @Override public void onAckTimeout() { - try { + try (ThreadContext.StoredContext ignore = context.get()) { listener.onAckTimeout(); } catch (Exception e) { logger.error("exception thrown by listener while notifying on ack timeout", e); @@ -724,9 +729,13 @@ public void submitStateUpdateTasks(final String source, if (!lifecycle.started()) { return; } - try { + final ThreadContext threadContext = threadPool.getThreadContext(); + final Supplier supplier = threadContext.newRestorableContext(false); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + threadContext.markAsSystemContext(); + List safeTasks = tasks.entrySet().stream() - .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue()), executor)) + .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor)) .collect(Collectors.toList()); taskBatcher.submitTasks(safeTasks, config.timeout()); } catch (EsRejectedExecutionException e) { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index e37f46c5517db..c86ea61980a87 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -556,7 +556,6 @@ public ClusterStateResponse newInstance() { @Override public void handleResponse(ClusterStateResponse response) { - assert transportService.getThreadPool().getThreadContext().isSystemContext() == false : "context is a system context"; try { if (remoteClusterName.get() == null) { assert response.getClusterName().value() != null; @@ -597,7 +596,6 @@ public void handleResponse(ClusterStateResponse response) { @Override public void handleException(TransportException exp) { - assert transportService.getThreadPool().getThreadContext().isSystemContext() == false : "context is a system context"; logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), exp); try { IOUtils.closeWhileHandlingException(connection); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index f75363c7ab5c7..20587d31f5359 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -34,12 +34,14 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.BaseFuture; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; @@ -52,6 +54,7 @@ import org.junit.BeforeClass; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -168,6 +171,85 @@ public void onFailure(String source, Exception e) { nonMaster.close(); } + public void testThreadContext() throws InterruptedException { + final TimedMasterService master = createTimedMasterService(true); + final CountDownLatch latch = new CountDownLatch(1); + + try (ThreadContext.StoredContext ignored = threadPool.getThreadContext().stashContext()) { + final Map expectedHeaders = Collections.singletonMap("test", "test"); + threadPool.getThreadContext().putHeader(expectedHeaders); + + final TimeValue ackTimeout = randomBoolean() ? TimeValue.ZERO : TimeValue.timeValueMillis(randomInt(10000)); + final TimeValue masterTimeout = randomBoolean() ? TimeValue.ZERO : TimeValue.timeValueMillis(randomInt(10000)); + + master.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(null, null) { + @Override + public ClusterState execute(ClusterState currentState) { + assertTrue(threadPool.getThreadContext().isSystemContext()); + assertEquals(Collections.emptyMap(), threadPool.getThreadContext().getHeaders()); + + if (randomBoolean()) { + return ClusterState.builder(currentState).build(); + } else if (randomBoolean()) { + return currentState; + } else { + throw new IllegalArgumentException("mock failure"); + } + } + + @Override + public void onFailure(String source, Exception e) { + assertFalse(threadPool.getThreadContext().isSystemContext()); + assertEquals(expectedHeaders, threadPool.getThreadContext().getHeaders()); + latch.countDown(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + assertFalse(threadPool.getThreadContext().isSystemContext()); + assertEquals(expectedHeaders, threadPool.getThreadContext().getHeaders()); + latch.countDown(); + } + + @Override + protected Void newResponse(boolean acknowledged) { + return null; + } + + public TimeValue ackTimeout() { + return ackTimeout; + } + + @Override + public TimeValue timeout() { + return masterTimeout; + } + + @Override + public void onAllNodesAcked(@Nullable Exception e) { + assertFalse(threadPool.getThreadContext().isSystemContext()); + assertEquals(expectedHeaders, threadPool.getThreadContext().getHeaders()); + latch.countDown(); + } + + @Override + public void onAckTimeout() { + assertFalse(threadPool.getThreadContext().isSystemContext()); + assertEquals(expectedHeaders, threadPool.getThreadContext().getHeaders()); + latch.countDown(); + } + + }); + + assertFalse(threadPool.getThreadContext().isSystemContext()); + assertEquals(expectedHeaders, threadPool.getThreadContext().getHeaders()); + } + + latch.await(); + + master.close(); + } + /* * test that a listener throwing an exception while handling a * notification does not prevent publication notification to the diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 5e145306f8c1f..85e5c99fe3581 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -20,7 +20,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -293,7 +292,7 @@ public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) { return this; } - public Builder putDatafeed(DatafeedConfig datafeedConfig, ThreadContext threadContext) { + public Builder putDatafeed(DatafeedConfig datafeedConfig, Map headers) { if (datafeeds.containsKey(datafeedConfig.getId())) { throw new ResourceAlreadyExistsException("A datafeed with id [" + datafeedConfig.getId() + "] already exists"); } @@ -302,13 +301,13 @@ public Builder putDatafeed(DatafeedConfig datafeedConfig, ThreadContext threadCo Job job = jobs.get(jobId); DatafeedJobValidator.validate(datafeedConfig, job); - if (threadContext != null) { + if (headers.isEmpty() == false) { // Adjust the request, adding security headers from the current thread context DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedConfig); - Map headers = threadContext.getHeaders().entrySet().stream() + Map securityHeaders = headers.entrySet().stream() .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - builder.setHeaders(headers); + builder.setHeaders(securityHeaders); datafeedConfig = builder.build(); } @@ -328,7 +327,7 @@ private void checkJobIsAvailableForDatafeed(String jobId) { } } - public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksCustomMetaData persistentTasks, ThreadContext threadContext) { + public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksCustomMetaData persistentTasks, Map headers) { String datafeedId = update.getId(); DatafeedConfig oldDatafeedConfig = datafeeds.get(datafeedId); if (oldDatafeedConfig == null) { @@ -336,7 +335,7 @@ public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksCustomMetaDa } checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE, datafeedId, DatafeedState.STARTED), datafeedId, persistentTasks); - DatafeedConfig newDatafeedConfig = update.apply(oldDatafeedConfig, threadContext); + DatafeedConfig newDatafeedConfig = update.apply(oldDatafeedConfig, headers); if (newDatafeedConfig.getJobId().equals(oldDatafeedConfig.getJobId()) == false) { checkJobIsAvailableForDatafeed(newDatafeedConfig.getJobId()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index 444532a7e3f15..27498bd1549ee 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -12,7 +12,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -264,7 +263,7 @@ ChunkingConfig getChunkingConfig() { * Applies the update to the given {@link DatafeedConfig} * @return a new {@link DatafeedConfig} that contains the update */ - public DatafeedConfig apply(DatafeedConfig datafeedConfig, ThreadContext threadContext) { + public DatafeedConfig apply(DatafeedConfig datafeedConfig, Map headers) { if (id.equals(datafeedConfig.getId()) == false) { throw new IllegalArgumentException("Cannot apply update to datafeedConfig with different id"); } @@ -301,12 +300,12 @@ public DatafeedConfig apply(DatafeedConfig datafeedConfig, ThreadContext threadC builder.setChunkingConfig(chunkingConfig); } - if (threadContext != null) { + if (headers.isEmpty() == false) { // Adjust the request, adding security headers from the current thread context - Map headers = threadContext.getHeaders().entrySet().stream() + Map securityHeaders = headers.entrySet().stream() .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - builder.setHeaders(headers); + builder.setHeaders(securityHeaders); } return builder.build(); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java index d059e567d1588..358f9d1c97bd7 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java @@ -114,7 +114,7 @@ public void testApply_failBecauseTargetDatafeedHasDifferentId() { public void testApply_givenEmptyUpdate() { DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo"); - DatafeedConfig updatedDatafeed = new DatafeedUpdate.Builder(datafeed.getId()).build().apply(datafeed, null); + DatafeedConfig updatedDatafeed = new DatafeedUpdate.Builder(datafeed.getId()).build().apply(datafeed, Collections.emptyMap()); assertThat(datafeed, equalTo(updatedDatafeed)); } @@ -125,7 +125,7 @@ public void testApply_givenPartialUpdate() { DatafeedUpdate.Builder updated = new DatafeedUpdate.Builder(datafeed.getId()); updated.setScrollSize(datafeed.getScrollSize() + 1); - DatafeedConfig updatedDatafeed = update.build().apply(datafeed, null); + DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap()); DatafeedConfig.Builder expectedDatafeed = new DatafeedConfig.Builder(datafeed); expectedDatafeed.setScrollSize(datafeed.getScrollSize() + 1); @@ -149,7 +149,7 @@ public void testApply_givenFullUpdateNoAggregations() { update.setScrollSize(8000); update.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1))); - DatafeedConfig updatedDatafeed = update.build().apply(datafeed, null); + DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap()); assertThat(updatedDatafeed.getJobId(), equalTo("bar")); assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_2"))); @@ -175,7 +175,7 @@ public void testApply_givenAggregations() { update.setAggregations(new AggregatorFactories.Builder().addAggregator( AggregationBuilders.histogram("a").interval(300000).field("time").subAggregation(maxTime))); - DatafeedConfig updatedDatafeed = update.build().apply(datafeed, null); + DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap()); assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_1"))); assertThat(updatedDatafeed.getTypes(), equalTo(Collections.singletonList("t_1"))); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 81f4a90f575af..ede92fbbab950 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -213,7 +212,7 @@ public void onFailure(String source, Exception e) { } @Override - public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { logger.debug("Job [" + jobId + "] is successfully marked as deleted"); listener.onResponse(true); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java index 08a9dfb09c1d9..88c72578023f9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java @@ -41,6 +41,7 @@ import org.elasticsearch.xpack.core.security.support.Exceptions; import java.io.IOException; +import java.util.Map; public class TransportPutDatafeedAction extends TransportMasterNodeAction { @@ -95,7 +96,7 @@ protected void masterOperation(PutDatafeedAction.Request request, ClusterState s client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); } else { - putDatafeed(request, listener); + putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener); } } @@ -103,7 +104,7 @@ private void handlePrivsResponse(String username, PutDatafeedAction.Request requ HasPrivilegesResponse response, ActionListener listener) throws IOException { if (response.isCompleteMatch()) { - putDatafeed(request, listener); + putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener); } else { XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -120,7 +121,8 @@ private void handlePrivsResponse(String username, PutDatafeedAction.Request requ } } - private void putDatafeed(PutDatafeedAction.Request request, ActionListener listener) { + private void putDatafeed(PutDatafeedAction.Request request, Map headers, + ActionListener listener) { clusterService.submitStateUpdateTask( "put-datafeed-" + request.getDatafeed().getId(), @@ -136,16 +138,16 @@ protected PutDatafeedAction.Response newResponse(boolean acknowledged) { @Override public ClusterState execute(ClusterState currentState) { - return putDatafeed(request, currentState); + return putDatafeed(request, headers, currentState); } }); } - private ClusterState putDatafeed(PutDatafeedAction.Request request, ClusterState clusterState) { + private ClusterState putDatafeed(PutDatafeedAction.Request request, Map headers, ClusterState clusterState) { XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); MlMetadata currentMetadata = MlMetadata.getMlMetadata(clusterState); MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata) - .putDatafeed(request.getDatafeed(), threadPool.getThreadContext()).build(); + .putDatafeed(request.getDatafeed(), headers).build(); return ClusterState.builder(clusterState).metaData( MetaData.builder(clusterState.getMetaData()).putCustom(MLMetadataField.TYPE, newMetadata).build()) .build(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java index 4d752fe294081..4e43cbb185330 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java @@ -27,6 +27,8 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import java.util.Map; + public class TransportUpdateDatafeedAction extends TransportMasterNodeAction { @Inject @@ -50,6 +52,8 @@ protected PutDatafeedAction.Response newResponse() { @Override protected void masterOperation(UpdateDatafeedAction.Request request, ClusterState state, ActionListener listener) { + final Map headers = threadPool.getThreadContext().getHeaders(); + clusterService.submitStateUpdateTask("update-datafeed-" + request.getUpdate().getId(), new AckedClusterStateUpdateTask(request, listener) { private volatile DatafeedConfig updatedDatafeed; @@ -69,7 +73,7 @@ public ClusterState execute(ClusterState currentState) { PersistentTasksCustomMetaData persistentTasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata) - .updateDatafeed(update, persistentTasks, threadPool.getThreadContext()).build(); + .updateDatafeed(update, persistentTasks, headers).build(); updatedDatafeed = newMetadata.getDatafeed(update.getId()); return ClusterState.builder(currentState).metaData( MetaData.builder(currentState.getMetaData()).putCustom(MLMetadataField.TYPE, newMetadata).build()).build(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 1fd73b96667b2..fe6deea55e3aa 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; @@ -347,8 +346,8 @@ public void onFailure(String source, Exception e) { } @Override - public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { - afterClusterStateUpdate(clusterChangedEvent.state(), request); + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + afterClusterStateUpdate(newState, request); actionListener.onResponse(new PutJobAction.Response(updatedJob.get())); } }); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index f6fb2db3c9bb9..ecfe712858331 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -30,9 +30,11 @@ import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.Map; import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; @@ -42,6 +44,7 @@ import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; @@ -63,7 +66,7 @@ protected MlMetadata createTestInstance() { } job = new Job.Builder(job).setAnalysisConfig(analysisConfig).build(); builder.putJob(job, false); - builder.putDatafeed(datafeedConfig, null); + builder.putDatafeed(datafeedConfig, Collections.emptyMap()); } else { builder.putJob(job, false); } @@ -164,7 +167,7 @@ public void testRemoveJob_failDatafeedRefersToJob() { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder.deleteJob(job1.getId(), new PersistentTasksCustomMetaData(0L, Collections.emptyMap()))); @@ -184,7 +187,7 @@ public void testCrudDatafeed() { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); MlMetadata result = builder.build(); assertThat(result.getJobs().get("job_id"), sameInstance(job1)); @@ -201,7 +204,7 @@ public void testPutDatafeed_failBecauseJobDoesNotExist() { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", "missing-job").build(); MlMetadata.Builder builder = new MlMetadata.Builder(); - expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1, null)); + expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap())); } public void testPutDatafeed_failBecauseJobIsBeingDeleted() { @@ -210,7 +213,7 @@ public void testPutDatafeed_failBecauseJobIsBeingDeleted() { MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1, null)); + expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap())); } public void testPutDatafeed_failBecauseDatafeedIdIsAlreadyTaken() { @@ -218,9 +221,9 @@ public void testPutDatafeed_failBecauseDatafeedIdIsAlreadyTaken() { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); - expectThrows(ResourceAlreadyExistsException.class, () -> builder.putDatafeed(datafeedConfig1, null)); + expectThrows(ResourceAlreadyExistsException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap())); } public void testPutDatafeed_failBecauseJobAlreadyHasDatafeed() { @@ -229,10 +232,10 @@ public void testPutDatafeed_failBecauseJobAlreadyHasDatafeed() { DatafeedConfig datafeedConfig2 = createDatafeedConfig("datafeed2", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> builder.putDatafeed(datafeedConfig2, null)); + () -> builder.putDatafeed(datafeedConfig2, Collections.emptyMap())); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); } @@ -246,7 +249,23 @@ public void testPutDatafeed_failBecauseJobIsNotCompatibleForDatafeed() { MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1.build(now), false); - expectThrows(ElasticsearchStatusException.class, () -> builder.putDatafeed(datafeedConfig1, null)); + expectThrows(ElasticsearchStatusException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap())); + } + + public void testPutDatafeed_setsSecurityHeaders() { + Job datafeedJob = createDatafeedJob().build(new Date()); + DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", datafeedJob.getId()).build(); + MlMetadata.Builder builder = new MlMetadata.Builder(); + builder.putJob(datafeedJob, false); + + Map headers = new HashMap<>(); + headers.put("unrelated_header", "unrelated_header_value"); + headers.put(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user"); + builder.putDatafeed(datafeedConfig, headers); + MlMetadata metadata = builder.build(); + assertThat(metadata.getDatafeed("datafeed1").getHeaders().size(), equalTo(1)); + assertThat(metadata.getDatafeed("datafeed1").getHeaders(), + hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user")); } public void testUpdateDatafeed() { @@ -254,12 +273,13 @@ public void testUpdateDatafeed() { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); MlMetadata beforeMetadata = builder.build(); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); update.setScrollSize(5000); - MlMetadata updatedMetadata = new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, null).build(); + MlMetadata updatedMetadata = + new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, Collections.emptyMap()).build(); DatafeedConfig updatedDatafeed = updatedMetadata.getDatafeed(datafeedConfig1.getId()); assertThat(updatedDatafeed.getJobId(), equalTo(datafeedConfig1.getJobId())); @@ -271,7 +291,8 @@ public void testUpdateDatafeed() { public void testUpdateDatafeed_failBecauseDatafeedDoesNotExist() { DatafeedUpdate.Builder update = new DatafeedUpdate.Builder("job_id"); update.setScrollSize(5000); - expectThrows(ResourceNotFoundException.class, () -> new MlMetadata.Builder().updateDatafeed(update.build(), null, null).build()); + expectThrows(ResourceNotFoundException.class, + () -> new MlMetadata.Builder().updateDatafeed(update.build(), null, Collections.emptyMap()).build()); } public void testUpdateDatafeed_failBecauseDatafeedIsNotStopped() { @@ -279,7 +300,7 @@ public void testUpdateDatafeed_failBecauseDatafeedIsNotStopped() { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); MlMetadata beforeMetadata = builder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -300,14 +321,14 @@ public void testUpdateDatafeed_failBecauseNewJobIdDoesNotExist() { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); MlMetadata beforeMetadata = builder.build(); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); update.setJobId(job1.getId() + "_2"); expectThrows(ResourceNotFoundException.class, - () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, null)); + () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, Collections.emptyMap())); } public void testUpdateDatafeed_failBecauseNewJobHasAnotherDatafeedAttached() { @@ -319,25 +340,46 @@ public void testUpdateDatafeed_failBecauseNewJobHasAnotherDatafeedAttached() { MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); builder.putJob(job2.build(), false); - builder.putDatafeed(datafeedConfig1, null); - builder.putDatafeed(datafeedConfig2, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); + builder.putDatafeed(datafeedConfig2, Collections.emptyMap()); MlMetadata beforeMetadata = builder.build(); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); update.setJobId(job2.getId()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, null)); + () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, Collections.emptyMap())); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); assertThat(e.getMessage(), equalTo("A datafeed [datafeed2] already exists for job [job_id_2]")); } + public void testUpdateDatafeed_setsSecurityHeaders() { + Job datafeedJob = createDatafeedJob().build(new Date()); + DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", datafeedJob.getId()).build(); + MlMetadata.Builder builder = new MlMetadata.Builder(); + builder.putJob(datafeedJob, false); + builder.putDatafeed(datafeedConfig, Collections.emptyMap()); + MlMetadata beforeMetadata = builder.build(); + assertTrue(beforeMetadata.getDatafeed("datafeed1").getHeaders().isEmpty()); + + DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig.getId()); + update.setQueryDelay(TimeValue.timeValueMinutes(5)); + + Map headers = new HashMap<>(); + headers.put("unrelated_header", "unrelated_header_value"); + headers.put(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user"); + MlMetadata afterMetadata = new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, headers).build(); + Map updatedHeaders = afterMetadata.getDatafeed("datafeed1").getHeaders(); + assertThat(updatedHeaders.size(), equalTo(1)); + assertThat(updatedHeaders, hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user")); + } + public void testRemoveDatafeed_failBecauseDatafeedStarted() { Job job1 = createDatafeedJob().build(new Date()); DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); MlMetadata result = builder.build(); assertThat(result.getJobs().get("job_id"), sameInstance(job1)); @@ -378,9 +420,9 @@ public void testExpandJobIds() { public void testExpandDatafeedIds() { MlMetadata.Builder mlMetadataBuilder = newMlMetadataWithJobs("bar-1", "foo-1", "foo-2"); - mlMetadataBuilder.putDatafeed(createDatafeedConfig("bar-1-feed", "bar-1").build(), null); - mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-1-feed", "foo-1").build(), null); - mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-2-feed", "foo-2").build(), null); + mlMetadataBuilder.putDatafeed(createDatafeedConfig("bar-1-feed", "bar-1").build(), Collections.emptyMap()); + mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-1-feed", "foo-1").build(), Collections.emptyMap()); + mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-2-feed", "foo-2").build(), Collections.emptyMap()); MlMetadata mlMetadata = mlMetadataBuilder.build(); @@ -409,7 +451,7 @@ protected MlMetadata mutateInstance(MlMetadata instance) { metadataBuilder.putJob(entry.getValue(), true); } for (Map.Entry entry : datafeeds.entrySet()) { - metadataBuilder.putDatafeed(entry.getValue(), null); + metadataBuilder.putDatafeed(entry.getValue(), Collections.emptyMap()); } switch (between(0, 1)) { @@ -430,7 +472,7 @@ protected MlMetadata mutateInstance(MlMetadata instance) { } randomJob = new Job.Builder(randomJob).setAnalysisConfig(analysisConfig).build(); metadataBuilder.putJob(randomJob, false); - metadataBuilder.putDatafeed(datafeedConfig, null); + metadataBuilder.putDatafeed(datafeedConfig, Collections.emptyMap()); break; default: throw new AssertionError("Illegal randomisation branch"); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java index d65fc1476e75e..0e7ad29c54da9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java @@ -51,7 +51,7 @@ public void testValidate_datafeedIsStarted() { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(new Date()), false); mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id", - Collections.singletonList("*")), null); + Collections.singletonList("*")), Collections.emptyMap()); final PersistentTasksCustomMetaData.Builder startDataFeedTaskBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", null, JobState.OPENED, startDataFeedTaskBuilder); addTask("datafeed_id", 0L, null, DatafeedState.STARTED, startDataFeedTaskBuilder); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java index af9446ed972cb..72c8d361dd882 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java @@ -45,7 +45,7 @@ public void testValidate_jobClosed() { PersistentTasksCustomMetaData tasks = PersistentTasksCustomMetaData.builder().build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1, null) + .putDatafeed(datafeedConfig1, Collections.emptyMap()) .build(); Exception e = expectThrows(ElasticsearchStatusException.class, () -> TransportStartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks)); @@ -62,7 +62,7 @@ public void testValidate_jobOpening() { PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1, null) + .putDatafeed(datafeedConfig1, Collections.emptyMap()) .build(); TransportStartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks); @@ -78,7 +78,7 @@ public void testValidate_jobOpened() { PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1, null) + .putDatafeed(datafeedConfig1, Collections.emptyMap()) .build(); TransportStartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java index 55a0f4006bcdd..934642986de96 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java @@ -42,7 +42,7 @@ public void testValidate() { DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder().putJob(job, false) - .putDatafeed(datafeedConfig, null) + .putDatafeed(datafeedConfig, Collections.emptyMap()) .build(); TransportStopDatafeedAction.validateDatafeedTask("foo", mlMetadata2); } @@ -54,12 +54,12 @@ public void testResolveDataFeedIds_GivenDatafeedId() { addTask("datafeed_1", 0L, "node-1", DatafeedState.STARTED, tasksBuilder); Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()); DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, null); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, Collections.emptyMap()); addTask("datafeed_2", 0L, "node-1", DatafeedState.STOPPED, tasksBuilder); job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()); datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, null); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, Collections.emptyMap()); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); MlMetadata mlMetadata = mlMetadataBuilder.build(); @@ -86,17 +86,17 @@ public void testResolveDataFeedIds_GivenAll() { addTask("datafeed_1", 0L, "node-1", DatafeedState.STARTED, tasksBuilder); Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()); DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, null); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, Collections.emptyMap()); addTask("datafeed_2", 0L, "node-1", DatafeedState.STOPPED, tasksBuilder); job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()); datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, null); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, Collections.emptyMap()); addTask("datafeed_3", 0L, "node-1", DatafeedState.STOPPING, tasksBuilder); job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()); datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, null); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, Collections.emptyMap()); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); MlMetadata mlMetadata = mlMetadataBuilder.build(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index f609f0c8c5ed9..6ce03d22b64f0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -84,7 +84,7 @@ public void setUpTests() { Job job = createDatafeedJob().build(new Date()); mlMetadata.putJob(job, false); DatafeedConfig datafeed = createDatafeedConfig("datafeed_id", job.getId()).build(); - mlMetadata.putDatafeed(datafeed, null); + mlMetadata.putDatafeed(datafeed, Collections.emptyMap()); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index 96ae3b5ef38b6..f3fa804bb27b9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -68,7 +68,7 @@ public void testSelectNode_GivenJobIsOpened() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -86,7 +86,7 @@ public void testSelectNode_GivenJobIsOpening() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -106,7 +106,7 @@ public void testNoJobTask() { mlMetadataBuilder.putJob(job, false); // Using wildcard index name to test for index resolving as well - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); tasks = PersistentTasksCustomMetaData.builder().build(); @@ -128,7 +128,7 @@ public void testSelectNode_GivenJobFailedOrClosed() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -156,7 +156,7 @@ public void testShardUnassigned() { mlMetadataBuilder.putJob(job, false); // Using wildcard index name to test for index resolving as well - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -182,7 +182,7 @@ public void testShardNotAllActive() { mlMetadataBuilder.putJob(job, false); // Using wildcard index name to test for index resolving as well - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -207,7 +207,8 @@ public void testIndexDoesntExist() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")), + Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -231,7 +232,8 @@ public void testRemoteIndex() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("remote:foo")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("remote:foo")), + Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -248,7 +250,7 @@ public void testSelectNode_jobTaskStale() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); String nodeId = randomBoolean() ? "node_id2" : null; @@ -286,7 +288,8 @@ public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")), + Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java index 357c2bc232552..14ec4813a749e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; @@ -47,7 +46,7 @@ public void onFailure(String source, Exception e) { } @Override - public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { markAsDeletedLatch.countDown(); } }); @@ -90,7 +89,7 @@ public void onFailure(String source, Exception e) { } @Override - public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { removeJobLatch.countDown(); } });