diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java index c0d8d1ceab6d5..3bdc949752afa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; @@ -128,12 +129,21 @@ public void clusterChanged(ClusterChangedEvent event) { Version.CURRENT, changes.get().v1().size(), changes.get().v2().size()); - threadPool.generic().execute(() -> updateTemplates(changes.get().v1(), changes.get().v2())); + + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + threadContext.markAsSystemContext(); + threadPool.generic().execute(() -> updateTemplates(changes.get().v1(), changes.get().v2())); + } } } } void updateTemplates(Map changes, Set deletions) { + if (threadPool.getThreadContext().isSystemContext() == false) { + throw new IllegalStateException("template updates from the template upgrade service should always happen in a system context"); + } + for (Map.Entry change : changes.entrySet()) { PutIndexTemplateRequest request = new PutIndexTemplateRequest(change.getKey()).source(change.getValue(), XContentType.JSON); @@ -141,7 +151,7 @@ void updateTemplates(Map changes, Set deletions) client.admin().indices().putTemplate(request, new ActionListener() { @Override public void onResponse(PutIndexTemplateResponse response) { - if(updatesInProgress.decrementAndGet() == 0) { + if (updatesInProgress.decrementAndGet() == 0) { logger.info("Finished upgrading templates to version {}", Version.CURRENT); } if (response.isAcknowledged() == false) { @@ -151,7 +161,7 @@ public void onResponse(PutIndexTemplateResponse response) { @Override public void onFailure(Exception e) { - if(updatesInProgress.decrementAndGet() == 0) { + if (updatesInProgress.decrementAndGet() == 0) { logger.info("Templates were upgraded to version {}", Version.CURRENT); } logger.warn(new ParameterizedMessage("Error updating template [{}]", change.getKey()), e); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java index 2e82397767fc4..e46f2e06fe16d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -61,6 +62,7 @@ import static org.elasticsearch.test.VersionUtils.randomVersion; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -188,9 +190,16 @@ public void testUpdateTemplates() { additions.put("add_template_" + i, new BytesArray("{\"index_patterns\" : \"*\", \"order\" : " + i + "}")); } - TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, mockClient, clusterService, null, + ThreadPool threadPool = mock(ThreadPool.class); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); + TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, mockClient, clusterService, threadPool, Collections.emptyList()); + IllegalStateException ise = expectThrows(IllegalStateException.class, () -> service.updateTemplates(additions, deletions)); + assertThat(ise.getMessage(), containsString("template upgrade service should always happen in a system context")); + + threadContext.markAsSystemContext(); service.updateTemplates(additions, deletions); int updatesInProgress = service.getUpdatesInProgress(); @@ -241,11 +250,14 @@ public void testClusterStateUpdate() { ); ThreadPool threadPool = mock(ThreadPool.class); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); ExecutorService executorService = mock(ExecutorService.class); when(threadPool.generic()).thenReturn(executorService); doAnswer(invocation -> { Object[] args = invocation.getArguments(); assert args.length == 1; + assertTrue(threadContext.isSystemContext()); Runnable runnable = (Runnable) args[0]; runnable.run(); updateInvocation.incrementAndGet();