diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 5a1fe00112ef9..4c3bb47d38617 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -802,7 +802,8 @@ public void testApiNamingConventions() throws Exception { "render_search_template", "scripts_painless_execute", "indices.simulate_template", - "indices.resolve_index" + "indices.resolve_index", + "indices.add_block" }; //These API are not required for high-level client feature completeness String[] notRequiredApi = new String[] { diff --git a/docs/reference/index-modules.asciidoc b/docs/reference/index-modules.asciidoc index 1a39bf7421d15..bc3da4745d11c 100644 --- a/docs/reference/index-modules.asciidoc +++ b/docs/reference/index-modules.asciidoc @@ -173,35 +173,6 @@ specific index module: for the <>. Defaults to `3`. -`index.blocks.read_only`:: - - Set to `true` to make the index and index metadata read only, `false` to - allow writes and metadata changes. - -`index.blocks.read_only_allow_delete`:: - - Similar to `index.blocks.read_only`, but also allows deleting the index to - make more resources available. The <> may add and remove this block automatically. - -Deleting documents from an index to release resources - rather than deleting the index itself - can increase the index size over time. When `index.blocks.read_only_allow_delete` is set to `true`, deleting documents is not permitted. However, deleting the index itself releases the read-only index block and makes resources available almost immediately. - -IMPORTANT: {es} adds and removes the read-only index block automatically when the disk utilization falls below the high watermark, controlled by <>. - -`index.blocks.read`:: - - Set to `true` to disable read operations against the index. - -`index.blocks.write`:: - - Set to `true` to disable data write operations against the index. Unlike `read_only`, - this setting does not affect metadata. For instance, you can close an index with a `write` - block, but not an index with a `read_only` block. - -`index.blocks.metadata`:: - - Set to `true` to disable index metadata reads and writes. - `index.max_refresh_listeners`:: Maximum number of refresh listeners available on each shard of the index. @@ -321,6 +292,8 @@ include::index-modules/analysis.asciidoc[] include::index-modules/allocation.asciidoc[] +include::index-modules/blocks.asciidoc[] + include::index-modules/mapper.asciidoc[] include::index-modules/merge.asciidoc[] diff --git a/docs/reference/index-modules/blocks.asciidoc b/docs/reference/index-modules/blocks.asciidoc new file mode 100644 index 0000000000000..3f66991d0687d --- /dev/null +++ b/docs/reference/index-modules/blocks.asciidoc @@ -0,0 +1,146 @@ +[[index-modules-blocks]] +== Index blocks + +Index blocks limit the kind of operations that are available on a certain +index. The blocks come in different flavours, allowing to block write, +read, or metadata operations. The blocks can be set / removed using dynamic +index settings, or can be added using a dedicated API, which also ensures +for write blocks that, once successfully returning to the user, all shards +of the index are properly accounting for the block, for example that all +in-flight writes to an index have been completed after adding the write +block. + +[discrete] +[[index-block-settings]] +=== Index block settings + +The following _dynamic_ index settings determine the blocks present on an +index: + +`index.blocks.read_only`:: + + Set to `true` to make the index and index metadata read only, `false` to + allow writes and metadata changes. + +`index.blocks.read_only_allow_delete`:: + + Similar to `index.blocks.read_only`, but also allows deleting the index to + make more resources available. The <> may add and remove this block automatically. + +Deleting documents from an index to release resources - rather than deleting the index itself - can increase the index size over time. When `index.blocks.read_only_allow_delete` is set to `true`, deleting documents is not permitted. However, deleting the index itself releases the read-only index block and makes resources available almost immediately. + +IMPORTANT: {es} adds and removes the read-only index block automatically when the disk utilization falls below the high watermark, controlled by <>. + +`index.blocks.read`:: + + Set to `true` to disable read operations against the index. + +`index.blocks.write`:: + + Set to `true` to disable data write operations against the index. Unlike `read_only`, + this setting does not affect metadata. For instance, you can close an index with a `write` + block, but you cannot close an index with a `read_only` block. + +`index.blocks.metadata`:: + + Set to `true` to disable index metadata reads and writes. + +[discrete] +[[add-index-block]] +=== Add index block API + +Adds an index block to an index. + +[source,console] +-------------------------------------------------- +PUT /twitter/_block/write +-------------------------------------------------- +// TEST[setup:twitter] + + +[discrete] +[[add-index-block-api-request]] +==== {api-request-title} + +`PUT //_block/` + + +[discrete] +[role="child_attributes"] +[[add-index-block-api-path-params]] +==== {api-path-parms-title} + +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=index] ++ +To add blocks to all indices, use `_all` or `*`. To disallow the adding +of blocks to indices with `_all` or wildcard expressions, +change the `action.destructive_requires_name` cluster setting to `true`. +You can update this setting in the `elasticsearch.yml` file +or using the <> API. +``:: +(Required, string) +Block type to add to the index. ++ +.Valid values for `` +[%collapsible%open] +==== +`metadata`:: +Disable metadata changes, such as closing the index. + +`read`:: +Disable read operations. + +`read_only`:: +Disable write operations and metadata changes. + +`read_only_allow_delete`:: +Disable write operations and metadata changes. +Document deletion is disabled. +However, index deletion is still allowed. + +`write`:: +Disable write operations. However, metadata changes are still allowed. +==== +[discrete] +[[add-index-block-api-query-params]] +==== {api-query-parms-title} + +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=allow-no-indices] ++ +Defaults to `true`. + +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=expand-wildcards] ++ +Defaults to `open`. + +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=index-ignore-unavailable] + +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=timeoutparms] + +[discrete] +[[add-index-block-api-example]] +==== {api-examples-title} + +The following example shows how to add an index block: + +[source,console] +-------------------------------------------------- +PUT /my_index/_block/write +-------------------------------------------------- +// TEST[s/^/PUT my_index\n/] + +The API returns following response: + +[source,console-result] +-------------------------------------------------- +{ + "acknowledged" : true, + "shards_acknowledged" : true, + "indices" : [ { + "name" : "my_index", + "blocked" : true + } ] +} +-------------------------------------------------- + diff --git a/docs/reference/indices/open-close.asciidoc b/docs/reference/indices/open-close.asciidoc index 41eb8687db30f..660e56771c24a 100644 --- a/docs/reference/indices/open-close.asciidoc +++ b/docs/reference/indices/open-close.asciidoc @@ -63,13 +63,20 @@ the current write index, the data stream must first be <> so that a new write index is created and then the previous write index can be closed. +// end::closed-index[] + +[[open-index-api-wait-for-active-shards]] ===== Wait For active shards +// tag::wait-for-active-shards[] + Because opening or closing an index allocates its shards, the <> setting on index creation applies to the `_open` and `_close` index actions as well. -// end::closed-index[] +// end::wait-for-active-shards[] + + [[open-index-api-path-params]] diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.add_block.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.add_block.json new file mode 100644 index 0000000000000..797e53bc2a2a7 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.add_block.json @@ -0,0 +1,59 @@ +{ + "indices.add_block":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/indices-blocks.html", + "description":"Adds a block to an index." + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/{index}/_block/{block}", + "methods":[ + "PUT" + ], + "parts":{ + "index":{ + "type":"list", + "description":"A comma separated list of indices to add a block to" + }, + "block":{ + "type":"string", + "description":"The block to add (one of read, write, read_only, metadata, read_only_allow_delete)" + } + } + } + ] + }, + "params":{ + "timeout":{ + "type":"time", + "description":"Explicit operation timeout" + }, + "master_timeout":{ + "type":"time", + "description":"Specify timeout for connection to master" + }, + "ignore_unavailable":{ + "type":"boolean", + "description":"Whether specified concrete indices should be ignored when unavailable (missing or closed)" + }, + "allow_no_indices":{ + "type":"boolean", + "description":"Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)" + }, + "expand_wildcards":{ + "type":"enum", + "options":[ + "open", + "closed", + "hidden", + "none", + "all" + ], + "default":"open", + "description":"Whether to expand wildcard expression to concrete indices that are open, closed or both." + } + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.blocks/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.blocks/10_basic.yml new file mode 100644 index 0000000000000..f29af74dd4755 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.blocks/10_basic.yml @@ -0,0 +1,33 @@ +--- +"Basic test for index blocks": + - skip: + version: " - 7.99.99" + reason: "index block APIs have only been made available in 8.0.0" + - do: + indices.create: + index: test_index + body: + settings: + number_of_replicas: 0 + + - do: + indices.add_block: + index: test_index + block: write + - is_true: acknowledged + + - do: + catch: /cluster_block_exception/ + index: + index: test_index + body: { foo: bar } + + - do: + search: + index: test_index + + - do: + indices.put_settings: + index: test_index + body: + index.blocks.write: false diff --git a/server/src/internalClusterTest/java/org/elasticsearch/blocks/SimpleBlocksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/blocks/SimpleBlocksIT.java index ef0d0ccb4e6f1..1700a78ea7098 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/blocks/SimpleBlocksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/blocks/SimpleBlocksIT.java @@ -19,18 +19,48 @@ package org.elasticsearch.blocks; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; +import java.util.stream.IntStream; +import static java.util.Collections.emptySet; +import static java.util.stream.Collectors.toList; +import static org.elasticsearch.action.support.IndicesOptions.lenientExpandOpen; +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_WRITE; +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY; +import static org.elasticsearch.search.internal.SearchContext.TRACK_TOTAL_HITS_ACCURATE; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) @@ -82,11 +112,11 @@ public void testIndexReadWriteMetadataBlocks() { canCreateIndex("test1"); canIndexDocument("test1"); client().admin().indices().prepareUpdateSettings("test1") - .setSettings(Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true)) + .setSettings(Settings.builder().put(SETTING_BLOCKS_WRITE, true)) .execute().actionGet(); canNotIndexDocument("test1"); client().admin().indices().prepareUpdateSettings("test1") - .setSettings(Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, false)) + .setSettings(Settings.builder().put(SETTING_BLOCKS_WRITE, false)) .execute().actionGet(); canIndexDocument("test1"); } @@ -133,11 +163,319 @@ private void canNotIndexDocument(String index) { private void setIndexReadOnly(String index, Object value) { HashMap newSettings = new HashMap<>(); - newSettings.put(IndexMetadata.SETTING_READ_ONLY, value); + newSettings.put(SETTING_READ_ONLY, value); UpdateSettingsRequestBuilder settingsRequest = client().admin().indices().prepareUpdateSettings(index); settingsRequest.setSettings(newSettings); AcknowledgedResponse settingsResponse = settingsRequest.execute().actionGet(); assertThat(settingsResponse, notNullValue()); } + + + public void testAddBlocksWhileExistingBlocks() { + createIndex("test"); + ensureGreen("test"); + + for (APIBlock otherBlock : APIBlock.values()) { + + for (APIBlock block : Arrays.asList(APIBlock.READ, APIBlock.WRITE)) { + try { + enableIndexBlock("test", block.settingName()); + + // Adding a block is not blocked + AcknowledgedResponse addBlockResponse = client().admin().indices() + .prepareAddBlock(otherBlock, "test").get(); + assertAcked(addBlockResponse); + } finally { + disableIndexBlock("test", otherBlock.settingName()); + disableIndexBlock("test", block.settingName()); + } + } + + for (APIBlock block : Arrays.asList(APIBlock.READ_ONLY, APIBlock.METADATA, APIBlock.READ_ONLY_ALLOW_DELETE)) { + boolean success = false; + try { + enableIndexBlock("test", block.settingName()); + // Adding a block is blocked when there is a metadata block and the new block to be added is not a metadata block + if (block.getBlock().contains(ClusterBlockLevel.METADATA_WRITE) && + otherBlock.getBlock().contains(ClusterBlockLevel.METADATA_WRITE) == false) { + assertBlocked(client().admin().indices().prepareAddBlock(otherBlock, "test")); + } else { + assertAcked(client().admin().indices().prepareAddBlock(otherBlock, "test")); + success = true; + } + } finally { + if (success) { + disableIndexBlock("test", otherBlock.settingName()); + } + disableIndexBlock("test", block.settingName()); + } + } + } + } + + public void testAddBlockToMissingIndex() { + IndexNotFoundException e = expectThrows(IndexNotFoundException.class, () -> client().admin().indices() + .prepareAddBlock(randomFrom(APIBlock.values()),"test").get()); + assertThat(e.getMessage(), is("no such index [test]")); + } + + public void testAddBlockToOneMissingIndex() { + createIndex("test1"); + final IndexNotFoundException e = expectThrows(IndexNotFoundException.class, + () -> client().admin().indices().prepareAddBlock(randomFrom(APIBlock.values()),"test1", "test2").get()); + assertThat(e.getMessage(), is("no such index [test2]")); + } + + public void testCloseOneMissingIndexIgnoreMissing() throws Exception { + createIndex("test1"); + final APIBlock block = randomFrom(APIBlock.values()); + try { + assertBusy(() -> assertAcked(client().admin().indices().prepareAddBlock(block, "test1", "test2") + .setIndicesOptions(lenientExpandOpen()))); + assertIndexHasBlock(block, "test1"); + } finally { + disableIndexBlock("test1", block); + } + } + + public void testAddBlockNoIndex() { + final ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class, + () -> client().admin().indices().prepareAddBlock(randomFrom(APIBlock.values())).get()); + assertThat(e.getMessage(), containsString("index is missing")); + } + + public void testAddBlockNullIndex() { + expectThrows(NullPointerException.class, + () -> client().admin().indices().prepareAddBlock(randomFrom(APIBlock.values()), (String[])null)); + } + + public void testAddIndexBlock() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + + final int nbDocs = randomIntBetween(0, 50); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs) + .mapToObj(i -> client().prepareIndex(indexName).setId(String.valueOf(i)).setSource("num", i)).collect(toList())); + + final APIBlock block = randomFrom(APIBlock.values()); + try { + assertAcked(client().admin().indices().prepareAddBlock(block, indexName)); + assertIndexHasBlock(block, indexName); + } finally { + disableIndexBlock(indexName, block); + } + + client().admin().indices().prepareRefresh(indexName).get(); + assertHitCount(client().prepareSearch(indexName).setSize(0).get(), nbDocs); + } + + public void testSameBlockTwice() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + + if (randomBoolean()) { + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(1, 10)) + .mapToObj(i -> client().prepareIndex(indexName).setId(String.valueOf(i)).setSource("num", i)).collect(toList())); + } + final APIBlock block = randomFrom(APIBlock.values()); + try { + assertAcked(client().admin().indices().prepareAddBlock(block, indexName)); + assertIndexHasBlock(block, indexName); + // Second add block should be acked too, even if it was a METADATA block + assertAcked(client().admin().indices().prepareAddBlock(block, indexName)); + assertIndexHasBlock(block, indexName); + } finally { + disableIndexBlock(indexName, block); + } + } + + public void testAddBlockToUnassignedIndex() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + assertAcked(prepareCreate(indexName) + .setWaitForActiveShards(ActiveShardCount.NONE) + .setSettings(Settings.builder().put("index.routing.allocation.include._name", "nothing").build())); + + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + assertThat(clusterState.metadata().indices().get(indexName).getState(), is(IndexMetadata.State.OPEN)); + assertThat(clusterState.routingTable().allShards().stream().allMatch(ShardRouting::unassigned), is(true)); + + final APIBlock block = randomFrom(APIBlock.values()); + try { + assertAcked(client().admin().indices().prepareAddBlock(block, indexName)); + assertIndexHasBlock(block, indexName); + } finally { + disableIndexBlock(indexName, block); + } + } + + public void testConcurrentAddBlock() throws InterruptedException { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + + final int nbDocs = randomIntBetween(10, 50); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs) + .mapToObj(i -> client().prepareIndex(indexName).setId(String.valueOf(i)).setSource("num", i)).collect(toList())); + ensureYellowAndNoInitializingShards(indexName); + + final CountDownLatch startClosing = new CountDownLatch(1); + final Thread[] threads = new Thread[randomIntBetween(2, 5)]; + + final APIBlock block = randomFrom(APIBlock.values()); + + try { + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + try { + startClosing.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + try { + client().admin().indices().prepareAddBlock(block, indexName).get(); + assertIndexHasBlock(block, indexName); + } catch (final ClusterBlockException e) { + assertThat(e.blocks(), hasSize(1)); + assertTrue(e.blocks().stream().allMatch(b -> b.id() == block.getBlock().id())); + } + }); + threads[i].start(); + } + + startClosing.countDown(); + for (Thread thread : threads) { + thread.join(); + } + assertIndexHasBlock(block, indexName); + } finally { + disableIndexBlock(indexName, block); + } + } + + public void testAddBlockWhileIndexingDocuments() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + + final APIBlock block = randomFrom(APIBlock.values()); + + int nbDocs = 0; + + try { + try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), 1000)) { + indexer.setFailureAssertion(t -> { + Throwable cause = ExceptionsHelper.unwrapCause(t); + assertThat(cause, instanceOf(ClusterBlockException.class)); + ClusterBlockException e = (ClusterBlockException) cause; + assertThat(e.blocks(), hasSize(1)); + assertTrue(e.blocks().stream().allMatch(b -> b.id() == block.getBlock().id())); + }); + + waitForDocs(randomIntBetween(10, 50), indexer); + assertAcked(client().admin().indices().prepareAddBlock(block, indexName)); + indexer.stopAndAwaitStopped(); + nbDocs += indexer.totalIndexedDocs(); + } + + assertIndexHasBlock(block, indexName); + } finally { + disableIndexBlock(indexName, block); + } + refresh(indexName); + assertHitCount(client().prepareSearch(indexName).setSize(0).setTrackTotalHitsUpTo(TRACK_TOTAL_HITS_ACCURATE).get(), nbDocs); + } + + public void testAddBlockWhileDeletingIndices() throws Exception { + final String[] indices = new String[randomIntBetween(3, 10)]; + for (int i = 0; i < indices.length; i++) { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + if (randomBoolean()) { + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, 10) + .mapToObj(n -> client().prepareIndex(indexName).setId(String.valueOf(n)).setSource("num", n)).collect(toList())); + } + indices[i] = indexName; + } + assertThat(client().admin().cluster().prepareState().get().getState().metadata().indices().size(), equalTo(indices.length)); + + final List threads = new ArrayList<>(); + final CountDownLatch latch = new CountDownLatch(1); + + final APIBlock block = randomFrom(APIBlock.values()); + + Consumer exceptionConsumer = t -> { + Throwable cause = ExceptionsHelper.unwrapCause(t); + if (cause instanceof ClusterBlockException) { + ClusterBlockException e = (ClusterBlockException) cause; + assertThat(e.blocks(), hasSize(1)); + assertTrue(e.blocks().stream().allMatch(b -> b.id() == block.getBlock().id())); + } else { + assertThat(cause, instanceOf(IndexNotFoundException.class)); + } + }; + + try { + for (final String indexToDelete : indices) { + threads.add(new Thread(() -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + try { + assertAcked(client().admin().indices().prepareDelete(indexToDelete)); + } catch (final Exception e) { + exceptionConsumer.accept(e); + } + })); + } + for (final String indexToBlock : indices) { + threads.add(new Thread(() -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + try { + client().admin().indices().prepareAddBlock(block, indexToBlock).get(); + } catch (final Exception e) { + exceptionConsumer.accept(e); + } + })); + } + + for (Thread thread : threads) { + thread.start(); + } + latch.countDown(); + for (Thread thread : threads) { + thread.join(); + } + } finally { + for (final String indexToBlock : indices) { + try { + disableIndexBlock(indexToBlock, block); + } catch (IndexNotFoundException infe) { + // ignore + } + } + } + } + + static void assertIndexHasBlock(APIBlock block, final String... indices) { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + for (String index : indices) { + final IndexMetadata indexMetadata = clusterState.metadata().indices().get(index); + final Settings indexSettings = indexMetadata.getSettings(); + assertThat(indexSettings.hasValue(block.settingName()), is(true)); + assertThat(indexSettings.getAsBoolean(block.settingName(), false), is(true)); + assertThat(clusterState.blocks().hasIndexBlock(index, block.getBlock()), is(true)); + assertThat("Index " + index + " must have only 1 block with [id=" + block.getBlock().id() + "]", + clusterState.blocks().indices().getOrDefault(index, emptySet()).stream() + .filter(clusterBlock -> clusterBlock.id() == block.getBlock().id()).count(), equalTo(1L)); + } + } + + public static void disableIndexBlock(String index, APIBlock block) { + disableIndexBlock(index, block.settingName()); + } } diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index e38405d9c2fac..aa77b20cc17f9 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -136,6 +136,9 @@ import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction; import org.elasticsearch.action.admin.indices.open.OpenIndexAction; import org.elasticsearch.action.admin.indices.open.TransportOpenIndexAction; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockAction; +import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction; +import org.elasticsearch.action.admin.indices.readonly.TransportVerifyShardIndexBlockAction; import org.elasticsearch.action.admin.indices.recovery.RecoveryAction; import org.elasticsearch.action.admin.indices.recovery.TransportRecoveryAction; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; @@ -299,6 +302,7 @@ import org.elasticsearch.rest.action.admin.cluster.dangling.RestDeleteDanglingIndexAction; import org.elasticsearch.rest.action.admin.cluster.dangling.RestImportDanglingIndexAction; import org.elasticsearch.rest.action.admin.cluster.dangling.RestListDanglingIndicesAction; +import org.elasticsearch.rest.action.admin.indices.RestAddIndexBlockAction; import org.elasticsearch.rest.action.admin.indices.RestAnalyzeAction; import org.elasticsearch.rest.action.admin.indices.RestClearIndicesCacheAction; import org.elasticsearch.rest.action.admin.indices.RestCloseIndexAction; @@ -539,6 +543,7 @@ public void reg actions.register(GetIndexAction.INSTANCE, TransportGetIndexAction.class); actions.register(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class); actions.register(CloseIndexAction.INSTANCE, TransportCloseIndexAction.class); + actions.register(AddIndexBlockAction.INSTANCE, TransportAddIndexBlockAction.class); actions.register(GetMappingsAction.INSTANCE, TransportGetMappingsAction.class); actions.register(GetFieldMappingsAction.INSTANCE, TransportGetFieldMappingsAction.class); actions.register(TransportGetFieldMappingsIndexAction.TYPE, TransportGetFieldMappingsIndexAction.class); @@ -635,6 +640,7 @@ public void reg actions.register(TransportNodesSnapshotsStatus.TYPE, TransportNodesSnapshotsStatus.class); actions.register(TransportNodesListGatewayMetaState.TYPE, TransportNodesListGatewayMetaState.class); actions.register(TransportVerifyShardBeforeCloseAction.TYPE, TransportVerifyShardBeforeCloseAction.class); + actions.register(TransportVerifyShardIndexBlockAction.TYPE, TransportVerifyShardIndexBlockAction.class); actions.register(TransportNodesListGatewayStartedShards.TYPE, TransportNodesListGatewayStartedShards.class); actions.register(TransportNodesListShardStoreMetadata.TYPE, TransportNodesListShardStoreMetadata.class); actions.register(TransportShardFlushAction.TYPE, TransportShardFlushAction.class); @@ -699,6 +705,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestDeleteIndexAction()); registerHandler.accept(new RestCloseIndexAction()); registerHandler.accept(new RestOpenIndexAction()); + registerHandler.accept(new RestAddIndexBlockAction()); registerHandler.accept(new RestUpdateSettingsAction()); registerHandler.accept(new RestGetSettingsAction()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockAction.java new file mode 100644 index 0000000000000..a993ba5b7004d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockAction.java @@ -0,0 +1,32 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.readonly; + +import org.elasticsearch.action.ActionType; + +public class AddIndexBlockAction extends ActionType { + + public static final AddIndexBlockAction INSTANCE = new AddIndexBlockAction(); + public static final String NAME = "indices:admin/block/add"; + + private AddIndexBlockAction() { + super(NAME, AddIndexBlockResponse::new); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockClusterStateUpdateRequest.java new file mode 100644 index 0000000000000..92f5329c069c7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockClusterStateUpdateRequest.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.readonly; + +import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest; +import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock; + +/** + * Cluster state update request that allows to add a block to one or more indices + */ +public class AddIndexBlockClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest { + + private final APIBlock block; + private long taskId; + + public AddIndexBlockClusterStateUpdateRequest(final APIBlock block, final long taskId) { + this.block = block; + this.taskId = taskId; + } + + public long taskId() { + return taskId; + } + + public APIBlock getBlock() { + return block; + } + + public AddIndexBlockClusterStateUpdateRequest taskId(final long taskId) { + this.taskId = taskId; + return this; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequest.java new file mode 100644 index 0000000000000..72e8707f969ed --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequest.java @@ -0,0 +1,125 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.readonly; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.CollectionUtils; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** + * A request to add a block to an index. + */ +public class AddIndexBlockRequest extends AcknowledgedRequest implements IndicesRequest.Replaceable { + + private final APIBlock block; + private String[] indices; + private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen(); + + public AddIndexBlockRequest(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + block = APIBlock.readFrom(in); + } + + /** + * Constructs a new request for the specified block and indices + */ + public AddIndexBlockRequest(APIBlock block, String... indices) { + this.block = Objects.requireNonNull(block); + this.indices = Objects.requireNonNull(indices); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (CollectionUtils.isEmpty(indices)) { + validationException = addValidationError("index is missing", validationException); + } + return validationException; + } + + /** + * Returns the indices to be blocked + */ + @Override + public String[] indices() { + return indices; + } + + /** + * Sets the indices to be blocked + * @param indices the indices to be blocked + * @return the request itself + */ + @Override + public AddIndexBlockRequest indices(String... indices) { + this.indices = Objects.requireNonNull(indices); + return this; + } + + /** + * Specifies what type of requested indices to ignore and how to deal with wildcard expressions. + * For example indices that don't exist. + * + * @return the desired behaviour regarding indices to ignore and wildcard indices expressions + */ + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + /** + * Specifies what type of requested indices to ignore and how to deal wild wildcard expressions. + * For example indices that don't exist. + * + * @param indicesOptions the desired behaviour regarding indices to ignore and wildcard indices expressions + * @return the request itself + */ + public AddIndexBlockRequest indicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + + /** + * Returns the block to be added + */ + public APIBlock getBlock() { + return block; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + indicesOptions.writeIndicesOptions(out); + block.writeTo(out); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequestBuilder.java new file mode 100644 index 0000000000000..ba2ba5434d7cb --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequestBuilder.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.readonly; + +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock; + +/** + * Builder for add index block request + */ +public class AddIndexBlockRequestBuilder + extends AcknowledgedRequestBuilder { + + public AddIndexBlockRequestBuilder(ElasticsearchClient client, AddIndexBlockAction action, APIBlock block, String... indices) { + super(client, action, new AddIndexBlockRequest(block, indices)); + } + + /** + * Sets the indices to be blocked + * + * @param indices the indices to be blocked + * @return the request itself + */ + public AddIndexBlockRequestBuilder setIndices(String... indices) { + request.indices(indices); + return this; + } + + /** + * Specifies what type of requested indices to ignore and wildcard indices expressions + * For example indices that don't exist. + * + * @param indicesOptions the desired behaviour regarding indices to ignore and indices wildcard expressions + * @return the request itself + */ + public AddIndexBlockRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) { + request.indicesOptions(indicesOptions); + return this; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockResponse.java new file mode 100644 index 0000000000000..ac323e3ce4e0b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockResponse.java @@ -0,0 +1,275 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.readonly; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +import static java.util.Collections.unmodifiableList; + +public class AddIndexBlockResponse extends ShardsAcknowledgedResponse { + + private final List indices; + + AddIndexBlockResponse(StreamInput in) throws IOException { + super(in, true); + indices = unmodifiableList(in.readList(AddBlockResult::new)); + } + + public AddIndexBlockResponse(final boolean acknowledged, final boolean shardsAcknowledged, final List indices) { + super(acknowledged, shardsAcknowledged); + this.indices = unmodifiableList(Objects.requireNonNull(indices)); + } + + public List getIndices() { + return indices; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + writeShardsAcknowledged(out); + out.writeList(indices); + } + + @Override + protected void addCustomFields(final XContentBuilder builder, final Params params) throws IOException { + super.addCustomFields(builder, params); + builder.startArray("indices"); + for (AddBlockResult index : indices) { + index.toXContent(builder, params); + } + builder.endArray(); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + public static class AddBlockResult implements Writeable, ToXContentFragment { + + private final Index index; + private final @Nullable Exception exception; + private final @Nullable AddBlockShardResult[] shards; + + public AddBlockResult(final Index index) { + this(index, null, null); + } + + public AddBlockResult(final Index index, final Exception failure) { + this(index, Objects.requireNonNull(failure), null); + } + + public AddBlockResult(final Index index, final AddBlockShardResult[] shards) { + this(index, null, Objects.requireNonNull(shards)); + } + + private AddBlockResult(final Index index, @Nullable final Exception exception, @Nullable final AddBlockShardResult[] shards) { + this.index = Objects.requireNonNull(index); + this.exception = exception; + this.shards = shards; + } + + AddBlockResult(final StreamInput in) throws IOException { + this.index = new Index(in); + this.exception = in.readException(); + this.shards = in.readOptionalArray(AddBlockShardResult::new, AddBlockShardResult[]::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + index.writeTo(out); + out.writeException(exception); + out.writeOptionalArray(shards); + } + + public Index getIndex() { + return index; + } + + public Exception getException() { + return exception; + } + + public AddBlockShardResult[] getShards() { + return shards; + } + + public boolean hasFailures() { + if (exception != null) { + return true; + } + if (shards != null) { + for (AddBlockShardResult shard : shards) { + if (shard.hasFailures()) { + return true; + } + } + } + return false; + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + { + builder.field("name", index.getName()); + if (hasFailures()) { + if (exception != null) { + builder.startObject("exception"); + ElasticsearchException.generateFailureXContent(builder, params, exception, true); + builder.endObject(); + } else { + builder.startArray("failed_shards"); + for (AddBlockShardResult shard : shards) { + if (shard.hasFailures()) { + shard.toXContent(builder, params); + } + } + builder.endArray(); + } + } else { + builder.field("blocked", true); + } + } + return builder.endObject(); + } + + @Override + public String toString() { + return Strings.toString(this); + } + } + + public static class AddBlockShardResult implements Writeable, ToXContentFragment { + + private final int id; + private final Failure[] failures; + + public AddBlockShardResult(final int id, final Failure[] failures) { + this.id = id; + this.failures = failures; + } + + AddBlockShardResult(final StreamInput in) throws IOException { + this.id = in.readVInt(); + this.failures = in.readOptionalArray(Failure::readFailure, Failure[]::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeVInt(id); + out.writeOptionalArray(failures); + } + + public boolean hasFailures() { + return CollectionUtils.isEmpty(failures) == false; + } + + public int getId() { + return id; + } + + public Failure[] getFailures() { + return failures; + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + { + builder.field("id", String.valueOf(id)); + builder.startArray("failures"); + if (failures != null) { + for (Failure failure : failures) { + failure.toXContent(builder, params); + } + } + builder.endArray(); + } + return builder.endObject(); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + public static class Failure extends DefaultShardOperationFailedException { + + private @Nullable String nodeId; + + private Failure(StreamInput in) throws IOException { + super(in); + nodeId = in.readOptionalString(); + } + + public Failure(final String index, final int shardId, final Throwable reason) { + this(index, shardId, reason, null); + } + + public Failure(final String index, final int shardId, final Throwable reason, final String nodeId) { + super(index, shardId, reason); + this.nodeId = nodeId; + } + + public String getNodeId() { + return nodeId; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(nodeId); + } + + @Override + public XContentBuilder innerToXContent(final XContentBuilder builder, final Params params) throws IOException { + if (nodeId != null) { + builder.field("node", nodeId); + } + return super.innerToXContent(builder, params); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + static Failure readFailure(final StreamInput in) throws IOException { + return new Failure(in); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportAddIndexBlockAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportAddIndexBlockAction.java new file mode 100644 index 0000000000000..b831882549846 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportAddIndexBlockAction.java @@ -0,0 +1,119 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.readonly; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetadataIndexStateService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.index.Index; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Collections; + +/** + * Adds a single index level block to a given set of indices. Not only does it set the correct setting, + * but it ensures that, in case of a write block, once successfully returning to the user, all shards + * of the index are properly accounting for the block, for instance, when adding a write block all + * in-flight writes to an index have been completed prior to the response being returned. These actions + * are done in multiple cluster state updates (at least two). See also {@link TransportVerifyShardIndexBlockAction} + * for the eventual delegation for shard-level verification. + */ +public class TransportAddIndexBlockAction extends TransportMasterNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportAddIndexBlockAction.class); + + private final MetadataIndexStateService indexStateService; + private final DestructiveOperations destructiveOperations; + + @Inject + public TransportAddIndexBlockAction(TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, MetadataIndexStateService indexStateService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + DestructiveOperations destructiveOperations) { + super(AddIndexBlockAction.NAME, transportService, clusterService, threadPool, actionFilters, AddIndexBlockRequest::new, + indexNameExpressionResolver); + this.indexStateService = indexStateService; + this.destructiveOperations = destructiveOperations; + } + + @Override + protected String executor() { + // no need to use a thread pool, we go async right away + return ThreadPool.Names.SAME; + } + + @Override + protected AddIndexBlockResponse read(StreamInput in) throws IOException { + return new AddIndexBlockResponse(in); + } + + @Override + protected void doExecute(Task task, AddIndexBlockRequest request, ActionListener listener) { + destructiveOperations.failDestructive(request.indices()); + super.doExecute(task, request, listener); + } + + @Override + protected ClusterBlockException checkBlock(AddIndexBlockRequest request, ClusterState state) { + if (request.getBlock().getBlock().levels().contains(ClusterBlockLevel.METADATA_WRITE) && + state.blocks().global(ClusterBlockLevel.METADATA_WRITE).isEmpty()) { + return null; + } + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, + indexNameExpressionResolver.concreteIndexNames(state, request)); + } + + @Override + protected void masterOperation(final Task task, + final AddIndexBlockRequest request, + final ClusterState state, + final ActionListener listener) throws Exception { + final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); + if (concreteIndices == null || concreteIndices.length == 0) { + listener.onResponse(new AddIndexBlockResponse(true, false, Collections.emptyList())); + return; + } + + final AddIndexBlockClusterStateUpdateRequest addBlockRequest = new AddIndexBlockClusterStateUpdateRequest(request.getBlock(), + task.getId()) + .ackTimeout(request.timeout()) + .masterNodeTimeout(request.masterNodeTimeout()) + .indices(concreteIndices); + indexStateService.addIndexBlock(addBlockRequest, ActionListener.delegateResponse(listener, (delegatedListener, t) -> { + logger.debug(() -> new ParameterizedMessage("failed to mark indices as readonly [{}]", (Object) concreteIndices), t); + delegatedListener.onFailure(t); + })); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java new file mode 100644 index 0000000000000..73990cb3ab75e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java @@ -0,0 +1,171 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.readonly; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.action.support.replication.ReplicationRequest; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.TransportReplicationAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; + +/** + * Action used to verify whether shards have properly applied a given index block, + * and are no longer executing any operations in violation of that block. This action + * requests all operation permits of the shard in order to wait for all write operations + * to complete. + */ +public class TransportVerifyShardIndexBlockAction extends TransportReplicationAction< + TransportVerifyShardIndexBlockAction.ShardRequest, TransportVerifyShardIndexBlockAction.ShardRequest, ReplicationResponse> { + + public static final String NAME = AddIndexBlockAction.NAME + "[s]"; + public static final ActionType TYPE = new ActionType<>(NAME, ReplicationResponse::new); + protected Logger logger = LogManager.getLogger(getClass()); + + @Inject + public TransportVerifyShardIndexBlockAction(final Settings settings, final TransportService transportService, + final ClusterService clusterService, final IndicesService indicesService, + final ThreadPool threadPool, final ShardStateAction stateAction, + final ActionFilters actionFilters) { + super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, + ShardRequest::new, ShardRequest::new, ThreadPool.Names.MANAGEMENT); + } + + @Override + protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException { + return new ReplicationResponse(in); + } + + @Override + protected void acquirePrimaryOperationPermit(final IndexShard primary, + final ShardRequest request, + final ActionListener onAcquired) { + primary.acquireAllPrimaryOperationsPermits(onAcquired, request.timeout()); + } + + @Override + protected void acquireReplicaOperationPermit(final IndexShard replica, + final ShardRequest request, + final ActionListener onAcquired, + final long primaryTerm, + final long globalCheckpoint, + final long maxSeqNoOfUpdateOrDeletes) { + replica.acquireAllReplicaOperationsPermits(primaryTerm, globalCheckpoint, maxSeqNoOfUpdateOrDeletes, onAcquired, request.timeout()); + } + + @Override + protected void shardOperationOnPrimary(final ShardRequest shardRequest, final IndexShard primary, + ActionListener> listener) { + ActionListener.completeWith(listener, () -> { + executeShardOperation(shardRequest, primary); + return new PrimaryResult<>(shardRequest, new ReplicationResponse()); + }); + } + + @Override + protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + executeShardOperation(shardRequest, replica); + return new ReplicaResult(); + }); + } + + private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) { + final ShardId shardId = indexShard.shardId(); + if (indexShard.getActiveOperationsCount() != IndexShard.OPERATIONS_BLOCKED) { + throw new IllegalStateException("index shard " + shardId + + " is not blocking all operations while waiting for block " + request.clusterBlock()); + } + + final ClusterBlocks clusterBlocks = clusterService.state().blocks(); + if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) { + throw new IllegalStateException("index shard " + shardId + " has not applied block " + request.clusterBlock()); + } + } + + @Override + protected ReplicationOperation.Replicas newReplicasProxy() { + return new VerifyShardReadOnlyActionReplicasProxy(); + } + + /** + * A {@link ReplicasProxy} that marks as stale the shards that are unavailable during the verification + * and the flush of the shard. This is done to ensure that such shards won't be later promoted as primary + * or reopened in an unverified state with potential non flushed translog operations. + */ + class VerifyShardReadOnlyActionReplicasProxy extends ReplicasProxy { + @Override + public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final long primaryTerm, + final ActionListener listener) { + shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener); + } + } + + public static class ShardRequest extends ReplicationRequest { + + private final ClusterBlock clusterBlock; + + ShardRequest(StreamInput in) throws IOException { + super(in); + clusterBlock = new ClusterBlock(in); + } + + public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) { + super(shardId); + this.clusterBlock = Objects.requireNonNull(clusterBlock); + setParentTask(parentTaskId); + } + + @Override + public String toString() { + return "verify shard " + shardId + " before block with " + clusterBlock; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + clusterBlock.writeTo(out); + } + + public ClusterBlock clusterBlock() { + return clusterBlock; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java index a113a09193057..ebe5616664ce9 100644 --- a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java +++ b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java @@ -21,9 +21,6 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; -import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction; -import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; @@ -40,6 +37,9 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.admin.indices.flush.FlushRequest; @@ -62,6 +62,9 @@ import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequestBuilder; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequestBuilder; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryRequestBuilder; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; @@ -106,6 +109,7 @@ import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequestBuilder; import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock; import org.elasticsearch.common.Nullable; /** @@ -284,6 +288,23 @@ public interface IndicesAdminClient extends ElasticsearchClient { */ void open(OpenIndexRequest request, ActionListener listener); + /** + * Adds a block to an index + * + * @param block The block to add + * @param indices The name of the indices to add the block to + */ + AddIndexBlockRequestBuilder prepareAddBlock(APIBlock block, String... indices); + + /** + * Adds a block to an index + * + * @param request The add index block request + * @param listener A listener to be notified with a result + * @see org.elasticsearch.client.Requests#openIndexRequest(String) + */ + void addBlock(AddIndexBlockRequest request, ActionListener listener); + /** * Opens one or more indices based on their index name. * diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 21b25fb07b2f8..cb5f098d44e83 100644 --- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -30,9 +30,6 @@ import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequestBuilder; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; -import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; -import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction; -import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; @@ -166,6 +163,9 @@ import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesAction; import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest; import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; @@ -196,6 +196,10 @@ import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequestBuilder; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockAction; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequestBuilder; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryAction; import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryRequestBuilder; @@ -331,6 +335,7 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.FilterClient; import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; @@ -1368,6 +1373,16 @@ public void open(final OpenIndexRequest request, final ActionListener listener) { + execute(AddIndexBlockAction.INSTANCE, request, listener); + } + @Override public OpenIndexRequestBuilder prepareOpen(String... indices) { return new OpenIndexRequestBuilder(this, OpenIndexAction.INSTANCE, indices); diff --git a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java index 66ff5099e0b96..f65e223873dfb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java +++ b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java @@ -81,6 +81,7 @@ public int id() { return this.id; } + @Nullable public String uuid() { return uuid; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 3728d1630bde3..a5c3a48d78e00 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -188,25 +189,80 @@ public Iterator> settings() { public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas"; public static final Setting INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING; - public static final String SETTING_READ_ONLY = "index.blocks.read_only"; - public static final Setting INDEX_READ_ONLY_SETTING = - Setting.boolSetting(SETTING_READ_ONLY, false, Property.Dynamic, Property.IndexScope); - public static final String SETTING_BLOCKS_READ = "index.blocks.read"; - public static final Setting INDEX_BLOCKS_READ_SETTING = - Setting.boolSetting(SETTING_BLOCKS_READ, false, Property.Dynamic, Property.IndexScope); + public enum APIBlock implements Writeable { + READ_ONLY("read_only", INDEX_READ_ONLY_BLOCK), + READ("read", INDEX_READ_BLOCK), + WRITE("write", INDEX_WRITE_BLOCK), + METADATA("metadata", INDEX_METADATA_BLOCK), + READ_ONLY_ALLOW_DELETE("read_only_allow_delete", INDEX_READ_ONLY_ALLOW_DELETE_BLOCK); - public static final String SETTING_BLOCKS_WRITE = "index.blocks.write"; - public static final Setting INDEX_BLOCKS_WRITE_SETTING = - Setting.boolSetting(SETTING_BLOCKS_WRITE, false, Property.Dynamic, Property.IndexScope); + final String name; + final String settingName; + final Setting setting; + final ClusterBlock block; - public static final String SETTING_BLOCKS_METADATA = "index.blocks.metadata"; - public static final Setting INDEX_BLOCKS_METADATA_SETTING = - Setting.boolSetting(SETTING_BLOCKS_METADATA, false, Property.Dynamic, Property.IndexScope); + APIBlock(String name, ClusterBlock block) { + this.name = name; + this.settingName = "index.blocks." + name; + this.setting = Setting.boolSetting(settingName, false, Property.Dynamic, Property.IndexScope); + this.block = block; + } + + public String settingName() { + return settingName; + } + + public Setting setting() { + return setting; + } + + public ClusterBlock getBlock() { + return block; + } + + public static APIBlock fromName(String name) { + for (APIBlock block : APIBlock.values()) { + if (block.name.equals(name)) { + return block; + } + } + throw new IllegalArgumentException("No block found with name " + name); + } + + public static APIBlock fromSetting(String settingName) { + for (APIBlock block : APIBlock.values()) { + if (block.settingName.equals(settingName)) { + return block; + } + } + throw new IllegalArgumentException("No block found with setting name " + settingName); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(ordinal()); + } + + public static APIBlock readFrom(StreamInput input) throws IOException { + return APIBlock.values()[input.readVInt()]; + } + } + + public static final String SETTING_READ_ONLY = APIBlock.READ_ONLY.settingName(); + public static final Setting INDEX_READ_ONLY_SETTING = APIBlock.READ_ONLY.setting(); + + public static final String SETTING_BLOCKS_READ = APIBlock.READ.settingName(); + public static final Setting INDEX_BLOCKS_READ_SETTING = APIBlock.READ.setting(); + + public static final String SETTING_BLOCKS_WRITE = APIBlock.WRITE.settingName(); + public static final Setting INDEX_BLOCKS_WRITE_SETTING = APIBlock.WRITE.setting(); + + public static final String SETTING_BLOCKS_METADATA = APIBlock.METADATA.settingName(); + public static final Setting INDEX_BLOCKS_METADATA_SETTING = APIBlock.METADATA.setting(); - public static final String SETTING_READ_ONLY_ALLOW_DELETE = "index.blocks.read_only_allow_delete"; - public static final Setting INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING = - Setting.boolSetting(SETTING_READ_ONLY_ALLOW_DELETE, false, Property.Dynamic, Property.IndexScope); + public static final String SETTING_READ_ONLY_ALLOW_DELETE = APIBlock.READ_ONLY_ALLOW_DELETE.settingName(); + public static final Setting INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING = APIBlock.READ_ONLY_ALLOW_DELETE.setting(); public static final String SETTING_VERSION_CREATED = "index.version.created"; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java index cfd850c0b7b15..61b4a224e9ad9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java @@ -34,6 +34,11 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.ShardResult; import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction; import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockClusterStateUpdateRequest; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse.AddBlockResult; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse.AddBlockShardResult; +import org.elasticsearch.action.admin.indices.readonly.TransportVerifyShardIndexBlockAction; import org.elasticsearch.action.support.ActiveShardsObserver; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.client.node.NodeClient; @@ -45,6 +50,7 @@ import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; @@ -92,7 +98,7 @@ import static java.util.Collections.unmodifiableMap; /** - * Service responsible for submitting open/close index requests + * Service responsible for submitting open/close index requests as well as for adding index blocks */ public class MetadataIndexStateService { private static final Logger logger = LogManager.getLogger(MetadataIndexStateService.class); @@ -304,6 +310,176 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final Map> addIndexBlock(final Index[] indices, final ClusterState currentState, + final APIBlock block) { + final Metadata.Builder metadata = Metadata.builder(currentState.metadata()); + + final Set indicesToAddBlock = new HashSet<>(); + for (Index index : indices) { + metadata.getSafe(index); // to check if index exists + if (currentState.blocks().hasIndexBlock(index.getName(), block.block)) { + logger.debug("index {} already has block {}, ignoring", index, block.block); + } else { + indicesToAddBlock.add(index); + } + } + + if (indicesToAddBlock.isEmpty()) { + return Tuple.tuple(currentState, Collections.emptyMap()); + } + + final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); + final Map blockedIndices = new HashMap<>(); + + for (Index index : indicesToAddBlock) { + ClusterBlock indexBlock = null; + final Set clusterBlocks = currentState.blocks().indices().get(index.getName()); + if (clusterBlocks != null) { + for (ClusterBlock clusterBlock : clusterBlocks) { + if (clusterBlock.id() == block.block.id()) { + // Reuse the existing UUID-based block + indexBlock = clusterBlock; + break; + } + } + } + if (indexBlock == null) { + // Create a new UUID-based block + indexBlock = createUUIDBasedBlock(block.block); + } + assert Strings.hasLength(indexBlock.uuid()) : "Block should have a UUID"; + blocks.addIndexBlock(index.getName(), indexBlock); + blockedIndices.put(index, indexBlock); + // update index settings as well to match the block + final IndexMetadata indexMetadata = metadata.getSafe(index); + if (block.setting().get(indexMetadata.getSettings()) == false) { + final Settings updatedSettings = Settings.builder() + .put(indexMetadata.getSettings()).put(block.settingName(), true).build(); + + metadata.put(IndexMetadata.builder(indexMetadata) + .settings(updatedSettings) + .settingsVersion(indexMetadata.getSettingsVersion() + 1)); + } + } + + logger.info("adding block {} to indices {}", block.name, + blockedIndices.keySet().stream().map(Object::toString).collect(Collectors.toList())); + return Tuple.tuple(ClusterState.builder(currentState).blocks(blocks).metadata(metadata) + .routingTable(routingTable.build()).build(), blockedIndices); + } + + /** + * Adds an index block based on the given request, and notifies the listener upon completion. + * Adding blocks is done in three steps: + * - First, a temporary UUID-based block is added to the index + * (see {@link #addIndexBlock(Index[], ClusterState, APIBlock)}. + * - Second, shards are checked to have properly applied the UUID-based block. + * (see {@link WaitForBlocksApplied}). + * - Third, the temporary UUID-based block is turned into a full block + * (see {@link #finalizeBlock(ClusterState, Map, Map, APIBlock)}. + * Using this three-step process ensures non-interference by other operations in case where + * we notify successful completion here. + */ + public void addIndexBlock(AddIndexBlockClusterStateUpdateRequest request, + ActionListener listener) { + final Index[] concreteIndices = request.indices(); + if (concreteIndices == null || concreteIndices.length == 0) { + throw new IllegalArgumentException("Index name is required"); + } + List writeIndices = new ArrayList<>(); + SortedMap lookup = clusterService.state().metadata().getIndicesLookup(); + for (Index index : concreteIndices) { + IndexAbstraction ia = lookup.get(index.getName()); + if (ia != null && ia.getParentDataStream() != null && ia.getParentDataStream().getWriteIndex().getIndex().equals(index)) { + writeIndices.add(index.getName()); + } + } + if (writeIndices.size() > 0) { + throw new IllegalArgumentException("cannot add a block to the following data stream write indices [" + + Strings.collectionToCommaDelimitedString(writeIndices) + "]"); + } + + clusterService.submitStateUpdateTask("add-index-block-[" + request.getBlock().name + "]-" + Arrays.toString(concreteIndices), + new ClusterStateUpdateTask(Priority.URGENT) { + + private Map blockedIndices; + + @Override + public ClusterState execute(final ClusterState currentState) { + final Tuple> tup = + addIndexBlock(concreteIndices, currentState, request.getBlock()); + blockedIndices = tup.v2(); + return tup.v1(); + } + + @Override + public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { + if (oldState == newState) { + assert blockedIndices.isEmpty() : "List of blocked indices is not empty but cluster state wasn't changed"; + listener.onResponse(new AddIndexBlockResponse(true, false, Collections.emptyList())); + } else { + assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed"; + threadPool.executor(ThreadPool.Names.MANAGEMENT) + .execute(new WaitForBlocksApplied(blockedIndices, request, + ActionListener.wrap(verifyResults -> + clusterService.submitStateUpdateTask("finalize-index-block-[" + request.getBlock().name + + "]-[" + blockedIndices.keySet().stream().map(Index::getName) + .collect(Collectors.joining(", ")) + "]", + new ClusterStateUpdateTask(Priority.URGENT) { + private final List indices = new ArrayList<>(); + + @Override + public ClusterState execute(final ClusterState currentState) throws Exception { + Tuple> addBlockResult = + finalizeBlock(currentState, blockedIndices, verifyResults, request.getBlock()); + assert verifyResults.size() == addBlockResult.v2().size(); + indices.addAll(addBlockResult.v2()); + return addBlockResult.v1(); + } + + @Override + public void onFailure(final String source, final Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(final String source, + final ClusterState oldState, + final ClusterState newState) { + + final boolean acknowledged = indices.stream().noneMatch( + AddBlockResult::hasFailures); + listener.onResponse(new AddIndexBlockResponse(acknowledged, acknowledged, indices)); + } + }), + listener::onFailure) + ) + ); + } + } + + @Override + public void onFailure(final String source, final Exception e) { + listener.onFailure(e); + } + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + } + ); + } + /** * Step 2 - Wait for indices to be ready for closing *

@@ -429,6 +605,112 @@ public void onFailure(Exception e) { } } + /** + * Helper class that coordinates with shards to ensure that blocks have been properly applied to all shards using + * {@link TransportVerifyShardIndexBlockAction}. + */ + class WaitForBlocksApplied extends ActionRunnable> { + + private final Map blockedIndices; + private final AddIndexBlockClusterStateUpdateRequest request; + + private WaitForBlocksApplied(final Map blockedIndices, + final AddIndexBlockClusterStateUpdateRequest request, + final ActionListener> listener) { + super(listener); + if (blockedIndices == null || blockedIndices.isEmpty()) { + throw new IllegalArgumentException("Cannot wait for blocks to be applied, list of blocked indices is empty or null"); + } + this.blockedIndices = blockedIndices; + this.request = request; + } + + @Override + protected void doRun() throws Exception { + final Map results = ConcurrentCollections.newConcurrentMap(); + final CountDown countDown = new CountDown(blockedIndices.size()); + final ClusterState state = clusterService.state(); + blockedIndices.forEach((index, block) -> { + waitForShardsReady(index, block, state, response -> { + results.put(index, response); + if (countDown.countDown()) { + listener.onResponse(unmodifiableMap(results)); + } + }); + }); + } + + private void waitForShardsReady(final Index index, + final ClusterBlock clusterBlock, + final ClusterState state, + final Consumer onResponse) { + final IndexMetadata indexMetadata = state.metadata().index(index); + if (indexMetadata == null) { + logger.debug("index {} has since been deleted, ignoring", index); + onResponse.accept(new AddBlockResult(index)); + return; + } + final IndexRoutingTable indexRoutingTable = state.routingTable().index(index); + if (indexRoutingTable == null || indexMetadata.getState() == IndexMetadata.State.CLOSE) { + logger.debug("index {} is closed, no need to wait for shards, ignoring", index); + onResponse.accept(new AddBlockResult(index)); + return; + } + + final ImmutableOpenIntMap shards = indexRoutingTable.getShards(); + final AtomicArray results = new AtomicArray<>(shards.size()); + final CountDown countDown = new CountDown(shards.size()); + + for (IntObjectCursor shard : shards) { + final IndexShardRoutingTable shardRoutingTable = shard.value; + final int shardId = shardRoutingTable.shardId().id(); + sendVerifyShardBlockRequest(shardRoutingTable, clusterBlock, new NotifyOnceListener() { + @Override + public void innerOnResponse(final ReplicationResponse replicationResponse) { + AddBlockShardResult.Failure[] failures = Arrays.stream(replicationResponse.getShardInfo().getFailures()) + .map(f -> new AddBlockShardResult.Failure(f.index(), f.shardId(), f.getCause(), f.nodeId())) + .toArray(AddBlockShardResult.Failure[]::new); + results.setOnce(shardId, new AddBlockShardResult(shardId, failures)); + processIfFinished(); + } + + @Override + public void innerOnFailure(final Exception e) { + AddBlockShardResult.Failure failure = new AddBlockShardResult.Failure(index.getName(), shardId, e); + results.setOnce(shardId, new AddBlockShardResult(shardId, new AddBlockShardResult.Failure[]{failure})); + processIfFinished(); + } + + private void processIfFinished() { + if (countDown.countDown()) { + onResponse.accept(new AddBlockResult(index, results.toArray(new AddBlockShardResult[results.length()]))); + } + } + }); + } + } + + private void sendVerifyShardBlockRequest(final IndexShardRoutingTable shardRoutingTable, + final ClusterBlock block, + final ActionListener listener) { + final ShardId shardId = shardRoutingTable.shardId(); + if (shardRoutingTable.primaryShard().unassigned()) { + logger.debug("primary shard {} is unassigned, ignoring", shardId); + final ReplicationResponse response = new ReplicationResponse(); + response.setShardInfo(new ReplicationResponse.ShardInfo(shardRoutingTable.size(), shardRoutingTable.size())); + listener.onResponse(response); + return; + } + final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId()); + final TransportVerifyShardIndexBlockAction.ShardRequest shardRequest = + new TransportVerifyShardIndexBlockAction.ShardRequest(shardId, block, parentTaskId); + if (request.ackTimeout() != null) { + shardRequest.timeout(request.ackTimeout()); + } + client.executeLocally(TransportVerifyShardIndexBlockAction.TYPE, shardRequest, listener); + } + } + /** * Step 3 - Move index states from OPEN to CLOSE in cluster state for indices that are ready for closing. */ @@ -618,6 +900,68 @@ ClusterState openIndices(final Index[] indices, final ClusterState currentState) return ClusterState.builder(updatedState).routingTable(routingTable.build()).build(); } + /** + * Finalizes the addition of blocks by turning the temporary UUID-based blocks into full blocks. + * @param currentState the cluster state to update + * @param blockedIndices the indices and their temporary UUID-based blocks to convert + * @param verifyResult the index-level results for adding the block + * @param block the full block to convert to + * @return the updated cluster state, as well as the (failed and successful) index-level results for adding the block + */ + static Tuple> finalizeBlock(final ClusterState currentState, + final Map blockedIndices, + final Map verifyResult, + final APIBlock block) { + + final Metadata.Builder metadata = Metadata.builder(currentState.metadata()); + final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); + + final Set effectivelyBlockedIndices = new HashSet<>(); + Map blockingResults = new HashMap<>(verifyResult); + for (Map.Entry result : verifyResult.entrySet()) { + final Index index = result.getKey(); + final boolean acknowledged = result.getValue().hasFailures() == false; + try { + if (acknowledged == false) { + logger.debug("verification of shards before blocking {} failed [{}]", index, result); + continue; + } + final IndexMetadata indexMetadata = metadata.getSafe(index); + final ClusterBlock tempBlock = blockedIndices.get(index); + assert tempBlock != null; + assert tempBlock.uuid() != null; + final ClusterBlock currentBlock = currentState.blocks().getIndexBlockWithId(index.getName(), tempBlock.id()); + if (currentBlock != null && currentBlock.equals(block.block)) { + logger.debug("verification of shards for {} succeeded, but block finalization already occurred" + + " (possibly for another block) [{}]", index, result); + continue; + } + + if (currentBlock == null || currentBlock.equals(tempBlock) == false) { + // we should report error in this case as the index can be left as open. + blockingResults.put(result.getKey(), new AddBlockResult(result.getKey(), new IllegalStateException( + "verification of shards before blocking " + index + " succeeded but block has been removed in the meantime"))); + logger.debug("verification of shards before blocking {} succeeded but block has been removed in the meantime", index); + continue; + } + + assert currentBlock != null && currentBlock.equals(tempBlock) && currentBlock.id() == block.block.id(); + + blocks.removeIndexBlockWithId(index.getName(), tempBlock.id()); + blocks.addIndexBlock(index.getName(), block.block); + + logger.debug("add block {} to index {} succeeded", block.block, index); + effectivelyBlockedIndices.add(index.getName()); + } catch (final IndexNotFoundException e) { + logger.debug("index {} has been deleted since blocking it started, ignoring", index); + } + } + logger.info("completed adding block {} to indices {}", block.name, effectivelyBlockedIndices); + return Tuple.tuple(ClusterState.builder(currentState).blocks(blocks).metadata(metadata).routingTable(routingTable.build()).build(), + blockingResults.values()); + } + /** * @return Generates a {@link ClusterBlock} that blocks read and write operations on soon-to-be-closed indices. The * cluster block is generated with the id value equals to {@link #INDEX_CLOSED_BLOCK_ID} and a unique UUID. @@ -633,4 +977,12 @@ public static boolean isIndexVerifiedBeforeClosed(final IndexMetadata indexMetad && VERIFIED_BEFORE_CLOSE_SETTING.exists(indexMetadata.getSettings()) && VERIFIED_BEFORE_CLOSE_SETTING.get(indexMetadata.getSettings()); } + + // Create UUID based block based on non-UUID one + public static ClusterBlock createUUIDBasedBlock(ClusterBlock clusterBlock) { + assert clusterBlock.uuid() == null : "no UUID expected on source block"; + return new ClusterBlock(clusterBlock.id(), UUIDs.randomBase64UUID(), "moving to block " + clusterBlock.description(), + clusterBlock.retryable(), clusterBlock.disableStatePersistence(), clusterBlock.isAllowReleaseResources(), clusterBlock.status(), + clusterBlock.levels()); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java index a797f0abbe461..a308af9222331 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -178,16 +178,9 @@ public ClusterState execute(ClusterState currentState) { } ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); - maybeUpdateClusterBlock(actualIndices, blocks, IndexMetadata.INDEX_READ_ONLY_BLOCK, - IndexMetadata.INDEX_READ_ONLY_SETTING, openSettings); - maybeUpdateClusterBlock(actualIndices, blocks, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK, - IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING, openSettings); - maybeUpdateClusterBlock(actualIndices, blocks, IndexMetadata.INDEX_METADATA_BLOCK, - IndexMetadata.INDEX_BLOCKS_METADATA_SETTING, openSettings); - maybeUpdateClusterBlock(actualIndices, blocks, IndexMetadata.INDEX_WRITE_BLOCK, - IndexMetadata.INDEX_BLOCKS_WRITE_SETTING, openSettings); - maybeUpdateClusterBlock(actualIndices, blocks, IndexMetadata.INDEX_READ_BLOCK, - IndexMetadata.INDEX_BLOCKS_READ_SETTING, openSettings); + for (IndexMetadata.APIBlock block : IndexMetadata.APIBlock.values()) { + maybeUpdateClusterBlock(actualIndices, blocks, block.block, block.setting, openSettings); + } if (!openIndices.isEmpty()) { for (Index index : openIndices) { diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestAddIndexBlockAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestAddIndexBlockAction.java new file mode 100644 index 0000000000000..47201b29966cd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestAddIndexBlockAction.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.indices; + +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.PUT; + +public class RestAddIndexBlockAction extends BaseRestHandler { + + @Override + public List routes() { + return List.of( + new Route(PUT, "/{index}/_block/{block}")); + } + + @Override + public String getName() { + return "add_index_block_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + AddIndexBlockRequest addIndexBlockRequest = new AddIndexBlockRequest( + IndexMetadata.APIBlock.fromName(request.param("block")), + Strings.splitStringByCommaToArray(request.param("index"))); + addIndexBlockRequest.masterNodeTimeout(request.paramAsTime("master_timeout", addIndexBlockRequest.masterNodeTimeout())); + addIndexBlockRequest.timeout(request.paramAsTime("timeout", addIndexBlockRequest.timeout())); + addIndexBlockRequest.indicesOptions(IndicesOptions.fromRequest(request, addIndexBlockRequest.indicesOptions())); + return channel -> client.admin().indices().addBlock(addIndexBlockRequest, new RestToXContentListener<>(channel)); + } + +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 9c7d779d8c163..6de472e082e32 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -1470,8 +1470,12 @@ public static void disableIndexBlock(String index, String block) { /** Enables an index block for the specified index */ public static void enableIndexBlock(String index, String block) { - Settings settings = Settings.builder().put(block, true).build(); - client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get(); + if (randomBoolean()) { + Settings settings = Settings.builder().put(block, true).build(); + client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get(); + } else { + client().admin().indices().prepareAddBlock(IndexMetadata.APIBlock.fromSetting(block), index).get(); + } } /** Sets or unsets the cluster read_only mode **/