From 761984ecf2be7729a476ce79cbee9651ad52b4dc Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 30 Jun 2020 09:33:15 +0200 Subject: [PATCH] Add index block api (#58094) Adds an API for putting an index block in place, which also ensures for write blocks that, once successfully returning to the user, all shards of the index are properly accounting for the block, for example that all in-flight writes to an index have been completed after adding the write block. This API allows coordinating more complex workflows, where it is crucial that an index is no longer receiving writes after the API completes, useful for example when marking an index as read-only during an upgrade in order to reindex its documents. --- .../client/RestHighLevelClientTests.java | 3 +- docs/reference/index-modules.asciidoc | 31 +- docs/reference/index-modules/blocks.asciidoc | 146 ++++++++ docs/reference/indices/open-close.asciidoc | 9 +- .../rest-api-spec/api/indices.add_block.json | 59 +++ .../test/indices.blocks/10_basic.yml | 33 ++ .../elasticsearch/blocks/SimpleBlocksIT.java | 344 ++++++++++++++++- .../elasticsearch/action/ActionModule.java | 5 + .../indices/readonly/AddIndexBlockAction.java | 32 ++ ...ddIndexBlockClusterStateUpdateRequest.java | 49 +++ .../readonly/AddIndexBlockRequest.java | 125 +++++++ .../readonly/AddIndexBlockRequestBuilder.java | 59 +++ .../readonly/AddIndexBlockResponse.java | 275 ++++++++++++++ .../TransportAddIndexBlockAction.java | 119 ++++++ .../TransportVerifyShardIndexBlockAction.java | 171 +++++++++ .../client/IndicesAdminClient.java | 27 +- .../client/support/AbstractClient.java | 21 +- .../cluster/block/ClusterBlock.java | 1 + .../cluster/metadata/IndexMetadata.java | 86 ++++- .../metadata/MetadataIndexStateService.java | 354 +++++++++++++++++- .../MetadataUpdateSettingsService.java | 13 +- .../indices/RestAddIndexBlockAction.java | 60 +++ .../elasticsearch/test/ESIntegTestCase.java | 8 +- 23 files changed, 1962 insertions(+), 68 deletions(-) create mode 100644 docs/reference/index-modules/blocks.asciidoc create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/indices.add_block.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/indices.blocks/10_basic.yml create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockClusterStateUpdateRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequestBuilder.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockResponse.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportAddIndexBlockAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestAddIndexBlockAction.java diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index dfd4a9b42b367..0deae22e4aa8e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -802,7 +802,8 @@ public void testApiNamingConventions() throws Exception { "render_search_template", "scripts_painless_execute", "indices.simulate_template", - "indices.resolve_index" + "indices.resolve_index", + "indices.add_block" }; //These API are not required for high-level client feature completeness String[] notRequiredApi = new String[] { diff --git a/docs/reference/index-modules.asciidoc b/docs/reference/index-modules.asciidoc index 2ff5d50cba249..bc3da4745d11c 100644 --- a/docs/reference/index-modules.asciidoc +++ b/docs/reference/index-modules.asciidoc @@ -173,35 +173,6 @@ specific index module: for the <>. Defaults to `3`. -`index.blocks.read_only`:: - - Set to `true` to make the index and index metadata read only, `false` to - allow writes and metadata changes. - -`index.blocks.read_only_allow_delete`:: - - Similar to `index.blocks.read_only`, but also allows deleting the index to - make more resources available. The <> may add and remove this block automatically. - -Deleting documents from an index to release resources - rather than deleting the index itself - can increase the index size over time. When `index.blocks.read_only_allow_delete` is set to `true`, deleting documents is not permitted. However, deleting the index itself releases the read-only index block and makes resources available almost immediately. - -IMPORTANT: {es} adds and removes the read-only index block automatically when the disk utilization falls below the high watermark, controlled by <>. - -`index.blocks.read`:: - - Set to `true` to disable read operations against the index. - -`index.blocks.write`:: - - Set to `true` to disable data write operations against the index. Unlike `read_only`, - this setting does not affect metadata. For instance, you can close an index with a `write` - block, but not an index with a `read_only` block. - -`index.blocks.metadata`:: - - Set to `true` to disable index metadata reads and writes. - `index.max_refresh_listeners`:: Maximum number of refresh listeners available on each shard of the index. @@ -321,6 +292,8 @@ include::index-modules/analysis.asciidoc[] include::index-modules/allocation.asciidoc[] +include::index-modules/blocks.asciidoc[] + include::index-modules/mapper.asciidoc[] include::index-modules/merge.asciidoc[] diff --git a/docs/reference/index-modules/blocks.asciidoc b/docs/reference/index-modules/blocks.asciidoc new file mode 100644 index 0000000000000..481fe010385a4 --- /dev/null +++ b/docs/reference/index-modules/blocks.asciidoc @@ -0,0 +1,146 @@ +[[index-modules-blocks]] +== Index blocks + +Index blocks limit the kind of operations that are available on a certain +index. The blocks come in different flavours, allowing to block write, +read, or metadata operations. The blocks can be set / removed using dynamic +index settings, or can be added using a dedicated API, which also ensures +for write blocks that, once successfully returning to the user, all shards +of the index are properly accounting for the block, for example that all +in-flight writes to an index have been completed after adding the write +block. + +[discrete] +[[index-block-settings]] +=== Index block settings + +The following _dynamic_ index settings determine the blocks present on an +index: + +`index.blocks.read_only`:: + + Set to `true` to make the index and index metadata read only, `false` to + allow writes and metadata changes. + +`index.blocks.read_only_allow_delete`:: + + Similar to `index.blocks.read_only`, but also allows deleting the index to + make more resources available. The <> may add and remove this block automatically. + +Deleting documents from an index to release resources - rather than deleting the index itself - can increase the index size over time. When `index.blocks.read_only_allow_delete` is set to `true`, deleting documents is not permitted. However, deleting the index itself releases the read-only index block and makes resources available almost immediately. + +IMPORTANT: {es} adds and removes the read-only index block automatically when the disk utilization falls below the high watermark, controlled by <>. + +`index.blocks.read`:: + + Set to `true` to disable read operations against the index. + +`index.blocks.write`:: + + Set to `true` to disable data write operations against the index. Unlike `read_only`, + this setting does not affect metadata. For instance, you can close an index with a `write` + block, but not an index with a `read_only` block. + +`index.blocks.metadata`:: + + Set to `true` to disable index metadata reads and writes. + +[discrete] +[[add-index-block]] +=== Add index block API + +Adds an index block to an index. + +[source,console] +-------------------------------------------------- +PUT /twitter/_block/write +-------------------------------------------------- +// TEST[setup:twitter] + + +[discrete] +[[add-index-block-api-request]] +==== {api-request-title} + +`PUT //_block/` + + +[discrete] +[role="child_attributes"] +[[add-index-block-api-path-params]] +==== {api-path-parms-title} + +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=index] ++ +To add blocks to all indices, use `_all` or `*`. To disallow the adding +of blocks to indices with `_all` or wildcard expressions, +change the `action.destructive_requires_name` cluster setting to `true`. +You can update this setting in the `elasticsearch.yml` file +or using the <> API. +``:: +(Required, string) +Block type to add to the index. ++ +.Valid values for `` +[%collapsible%open] +==== +`metadata`:: +Disable metadata changes, such as closing the index. + +`read`:: +Disable read operations. + +`read_only`:: +Disable write operations and metadata changes. + +`read_only_allow_delete`:: +Disable write operations and metadata changes. +Document deletion is disabled. +However, index deletion is still allowed. + +`write`:: +Disable write operations. However, metadata changes are still allowed. +==== +[discrete] +[[add-index-block-api-query-params]] +==== {api-query-parms-title} + +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=allow-no-indices] ++ +Defaults to `true`. + +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=expand-wildcards] ++ +Defaults to `open`. + +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=index-ignore-unavailable] + +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=timeoutparms] + +[discrete] +[[add-index-block-api-example]] +==== {api-examples-title} + +The following example shows how to add an index block: + +[source,console] +-------------------------------------------------- +PUT /my_index/_block/write +-------------------------------------------------- +// TEST[s/^/PUT my_index\n/] + +The API returns following response: + +[source,console-result] +-------------------------------------------------- +{ + "acknowledged" : true, + "shards_acknowledged" : true, + "indices" : [ { + "name" : "my_index", + "blocked" : true + } ] +} +-------------------------------------------------- + diff --git a/docs/reference/indices/open-close.asciidoc b/docs/reference/indices/open-close.asciidoc index 03f2158e4f10a..d2a0ca957af6f 100644 --- a/docs/reference/indices/open-close.asciidoc +++ b/docs/reference/indices/open-close.asciidoc @@ -61,13 +61,20 @@ the current write index, the data stream must first be <> so that a new write index is created and then the previous write index can be closed. +// end::closed-index[] + +[[open-index-api-wait-for-active-shards]] ===== Wait For active shards +// tag::wait-for-active-shards[] + Because opening or closing an index allocates its shards, the <> setting on index creation applies to the `_open` and `_close` index actions as well. -// end::closed-index[] +// end::wait-for-active-shards[] + + [[open-index-api-path-params]] diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.add_block.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.add_block.json new file mode 100644 index 0000000000000..797e53bc2a2a7 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.add_block.json @@ -0,0 +1,59 @@ +{ + "indices.add_block":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/indices-blocks.html", + "description":"Adds a block to an index." + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/{index}/_block/{block}", + "methods":[ + "PUT" + ], + "parts":{ + "index":{ + "type":"list", + "description":"A comma separated list of indices to add a block to" + }, + "block":{ + "type":"string", + "description":"The block to add (one of read, write, read_only, metadata, read_only_allow_delete)" + } + } + } + ] + }, + "params":{ + "timeout":{ + "type":"time", + "description":"Explicit operation timeout" + }, + "master_timeout":{ + "type":"time", + "description":"Specify timeout for connection to master" + }, + "ignore_unavailable":{ + "type":"boolean", + "description":"Whether specified concrete indices should be ignored when unavailable (missing or closed)" + }, + "allow_no_indices":{ + "type":"boolean", + "description":"Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)" + }, + "expand_wildcards":{ + "type":"enum", + "options":[ + "open", + "closed", + "hidden", + "none", + "all" + ], + "default":"open", + "description":"Whether to expand wildcard expression to concrete indices that are open, closed or both." + } + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.blocks/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.blocks/10_basic.yml new file mode 100644 index 0000000000000..f87553f222eb4 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.blocks/10_basic.yml @@ -0,0 +1,33 @@ +--- +"Basic test for index blocks": + - skip: + version: " - 7.8.99" + reason: "index block APIs have only been made available in 7.9.0" + - do: + indices.create: + index: test_index + body: + settings: + number_of_replicas: 0 + + - do: + indices.add_block: + index: test_index + block: write + - is_true: acknowledged + + - do: + catch: /cluster_block_exception/ + index: + index: test_index + body: { foo: bar } + + - do: + search: + index: test_index + + - do: + indices.put_settings: + index: test_index + body: + index.blocks.write: false diff --git a/server/src/internalClusterTest/java/org/elasticsearch/blocks/SimpleBlocksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/blocks/SimpleBlocksIT.java index dcb02a504da17..a52f476a2d87f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/blocks/SimpleBlocksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/blocks/SimpleBlocksIT.java @@ -19,19 +19,49 @@ package org.elasticsearch.blocks; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; +import java.util.stream.IntStream; +import static java.util.Collections.emptySet; +import static java.util.stream.Collectors.toList; +import static org.elasticsearch.action.support.IndicesOptions.lenientExpandOpen; +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_WRITE; +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY; +import static org.elasticsearch.search.internal.SearchContext.TRACK_TOTAL_HITS_ACCURATE; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) @@ -83,11 +113,11 @@ public void testIndexReadWriteMetadataBlocks() { canCreateIndex("test1"); canIndexDocument("test1"); client().admin().indices().prepareUpdateSettings("test1") - .setSettings(Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true)) + .setSettings(Settings.builder().put(SETTING_BLOCKS_WRITE, true)) .execute().actionGet(); canNotIndexDocument("test1"); client().admin().indices().prepareUpdateSettings("test1") - .setSettings(Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, false)) + .setSettings(Settings.builder().put(SETTING_BLOCKS_WRITE, false)) .execute().actionGet(); canIndexDocument("test1"); } @@ -143,11 +173,319 @@ private void canIndexExists(String index) { private void setIndexReadOnly(String index, Object value) { HashMap newSettings = new HashMap<>(); - newSettings.put(IndexMetadata.SETTING_READ_ONLY, value); + newSettings.put(SETTING_READ_ONLY, value); UpdateSettingsRequestBuilder settingsRequest = client().admin().indices().prepareUpdateSettings(index); settingsRequest.setSettings(newSettings); AcknowledgedResponse settingsResponse = settingsRequest.execute().actionGet(); assertThat(settingsResponse, notNullValue()); } + + + public void testAddBlocksWhileExistingBlocks() { + createIndex("test"); + ensureGreen("test"); + + for (APIBlock otherBlock : APIBlock.values()) { + + for (APIBlock block : Arrays.asList(APIBlock.READ, APIBlock.WRITE)) { + try { + enableIndexBlock("test", block.settingName()); + + // Adding a block is not blocked + AcknowledgedResponse addBlockResponse = client().admin().indices() + .prepareAddBlock(otherBlock, "test").get(); + assertAcked(addBlockResponse); + } finally { + disableIndexBlock("test", otherBlock.settingName()); + disableIndexBlock("test", block.settingName()); + } + } + + for (APIBlock block : Arrays.asList(APIBlock.READ_ONLY, APIBlock.METADATA, APIBlock.READ_ONLY_ALLOW_DELETE)) { + boolean success = false; + try { + enableIndexBlock("test", block.settingName()); + // Adding a block is blocked when there is a metadata block and the new block to be added is not a metadata block + if (block.getBlock().contains(ClusterBlockLevel.METADATA_WRITE) && + otherBlock.getBlock().contains(ClusterBlockLevel.METADATA_WRITE) == false) { + assertBlocked(client().admin().indices().prepareAddBlock(otherBlock, "test")); + } else { + assertAcked(client().admin().indices().prepareAddBlock(otherBlock, "test")); + success = true; + } + } finally { + if (success) { + disableIndexBlock("test", otherBlock.settingName()); + } + disableIndexBlock("test", block.settingName()); + } + } + } + } + + public void testAddBlockToMissingIndex() { + IndexNotFoundException e = expectThrows(IndexNotFoundException.class, () -> client().admin().indices() + .prepareAddBlock(randomFrom(APIBlock.values()),"test").get()); + assertThat(e.getMessage(), is("no such index [test]")); + } + + public void testAddBlockToOneMissingIndex() { + createIndex("test1"); + final IndexNotFoundException e = expectThrows(IndexNotFoundException.class, + () -> client().admin().indices().prepareAddBlock(randomFrom(APIBlock.values()),"test1", "test2").get()); + assertThat(e.getMessage(), is("no such index [test2]")); + } + + public void testCloseOneMissingIndexIgnoreMissing() throws Exception { + createIndex("test1"); + final APIBlock block = randomFrom(APIBlock.values()); + try { + assertBusy(() -> assertAcked(client().admin().indices().prepareAddBlock(block, "test1", "test2") + .setIndicesOptions(lenientExpandOpen()))); + assertIndexHasBlock(block, "test1"); + } finally { + disableIndexBlock("test1", block); + } + } + + public void testAddBlockNoIndex() { + final ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class, + () -> client().admin().indices().prepareAddBlock(randomFrom(APIBlock.values())).get()); + assertThat(e.getMessage(), containsString("index is missing")); + } + + public void testAddBlockNullIndex() { + expectThrows(NullPointerException.class, + () -> client().admin().indices().prepareAddBlock(randomFrom(APIBlock.values()), (String[])null)); + } + + public void testAddIndexBlock() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + + final int nbDocs = randomIntBetween(0, 50); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs) + .mapToObj(i -> client().prepareIndex(indexName).setId(String.valueOf(i)).setSource("num", i)).collect(toList())); + + final APIBlock block = randomFrom(APIBlock.values()); + try { + assertAcked(client().admin().indices().prepareAddBlock(block, indexName)); + assertIndexHasBlock(block, indexName); + } finally { + disableIndexBlock(indexName, block); + } + + client().admin().indices().prepareRefresh(indexName).get(); + assertHitCount(client().prepareSearch(indexName).setSize(0).get(), nbDocs); + } + + public void testSameBlockTwice() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + + if (randomBoolean()) { + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(1, 10)) + .mapToObj(i -> client().prepareIndex(indexName).setId(String.valueOf(i)).setSource("num", i)).collect(toList())); + } + final APIBlock block = randomFrom(APIBlock.values()); + try { + assertAcked(client().admin().indices().prepareAddBlock(block, indexName)); + assertIndexHasBlock(block, indexName); + // Second add block should be acked too, even if it was a METADATA block + assertAcked(client().admin().indices().prepareAddBlock(block, indexName)); + assertIndexHasBlock(block, indexName); + } finally { + disableIndexBlock(indexName, block); + } + } + + public void testAddBlockToUnassignedIndex() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + assertAcked(prepareCreate(indexName) + .setWaitForActiveShards(ActiveShardCount.NONE) + .setSettings(Settings.builder().put("index.routing.allocation.include._name", "nothing").build())); + + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + assertThat(clusterState.metadata().indices().get(indexName).getState(), is(IndexMetadata.State.OPEN)); + assertThat(clusterState.routingTable().allShards().stream().allMatch(ShardRouting::unassigned), is(true)); + + final APIBlock block = randomFrom(APIBlock.values()); + try { + assertAcked(client().admin().indices().prepareAddBlock(block, indexName)); + assertIndexHasBlock(block, indexName); + } finally { + disableIndexBlock(indexName, block); + } + } + + public void testConcurrentAddBlock() throws InterruptedException { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + + final int nbDocs = randomIntBetween(10, 50); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs) + .mapToObj(i -> client().prepareIndex(indexName).setId(String.valueOf(i)).setSource("num", i)).collect(toList())); + ensureYellowAndNoInitializingShards(indexName); + + final CountDownLatch startClosing = new CountDownLatch(1); + final Thread[] threads = new Thread[randomIntBetween(2, 5)]; + + final APIBlock block = randomFrom(APIBlock.values()); + + try { + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + try { + startClosing.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + try { + client().admin().indices().prepareAddBlock(block, indexName).get(); + assertIndexHasBlock(block, indexName); + } catch (final ClusterBlockException e) { + assertThat(e.blocks(), hasSize(1)); + assertTrue(e.blocks().stream().allMatch(b -> b.id() == block.getBlock().id())); + } + }); + threads[i].start(); + } + + startClosing.countDown(); + for (Thread thread : threads) { + thread.join(); + } + assertIndexHasBlock(block, indexName); + } finally { + disableIndexBlock(indexName, block); + } + } + + public void testAddBlockWhileIndexingDocuments() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + + final APIBlock block = randomFrom(APIBlock.values()); + + int nbDocs = 0; + + try { + try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), 1000)) { + indexer.setFailureAssertion(t -> { + Throwable cause = ExceptionsHelper.unwrapCause(t); + assertThat(cause, instanceOf(ClusterBlockException.class)); + ClusterBlockException e = (ClusterBlockException) cause; + assertThat(e.blocks(), hasSize(1)); + assertTrue(e.blocks().stream().allMatch(b -> b.id() == block.getBlock().id())); + }); + + waitForDocs(randomIntBetween(10, 50), indexer); + assertAcked(client().admin().indices().prepareAddBlock(block, indexName)); + indexer.stopAndAwaitStopped(); + nbDocs += indexer.totalIndexedDocs(); + } + + assertIndexHasBlock(block, indexName); + } finally { + disableIndexBlock(indexName, block); + } + refresh(indexName); + assertHitCount(client().prepareSearch(indexName).setSize(0).setTrackTotalHitsUpTo(TRACK_TOTAL_HITS_ACCURATE).get(), nbDocs); + } + + public void testAddBlockWhileDeletingIndices() throws Exception { + final String[] indices = new String[randomIntBetween(3, 10)]; + for (int i = 0; i < indices.length; i++) { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + if (randomBoolean()) { + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, 10) + .mapToObj(n -> client().prepareIndex(indexName).setId(String.valueOf(n)).setSource("num", n)).collect(toList())); + } + indices[i] = indexName; + } + assertThat(client().admin().cluster().prepareState().get().getState().metadata().indices().size(), equalTo(indices.length)); + + final List threads = new ArrayList<>(); + final CountDownLatch latch = new CountDownLatch(1); + + final APIBlock block = randomFrom(APIBlock.values()); + + Consumer exceptionConsumer = t -> { + Throwable cause = ExceptionsHelper.unwrapCause(t); + if (cause instanceof ClusterBlockException) { + ClusterBlockException e = (ClusterBlockException) cause; + assertThat(e.blocks(), hasSize(1)); + assertTrue(e.blocks().stream().allMatch(b -> b.id() == block.getBlock().id())); + } else { + assertThat(cause, instanceOf(IndexNotFoundException.class)); + } + }; + + try { + for (final String indexToDelete : indices) { + threads.add(new Thread(() -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + try { + assertAcked(client().admin().indices().prepareDelete(indexToDelete)); + } catch (final Exception e) { + exceptionConsumer.accept(e); + } + })); + } + for (final String indexToBlock : indices) { + threads.add(new Thread(() -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + try { + client().admin().indices().prepareAddBlock(block, indexToBlock).get(); + } catch (final Exception e) { + exceptionConsumer.accept(e); + } + })); + } + + for (Thread thread : threads) { + thread.start(); + } + latch.countDown(); + for (Thread thread : threads) { + thread.join(); + } + } finally { + for (final String indexToBlock : indices) { + try { + disableIndexBlock(indexToBlock, block); + } catch (IndexNotFoundException infe) { + // ignore + } + } + } + } + + static void assertIndexHasBlock(APIBlock block, final String... indices) { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + for (String index : indices) { + final IndexMetadata indexMetadata = clusterState.metadata().indices().get(index); + final Settings indexSettings = indexMetadata.getSettings(); + assertThat(indexSettings.hasValue(block.settingName()), is(true)); + assertThat(indexSettings.getAsBoolean(block.settingName(), false), is(true)); + assertThat(clusterState.blocks().hasIndexBlock(index, block.getBlock()), is(true)); + assertThat("Index " + index + " must have only 1 block with [id=" + block.getBlock().id() + "]", + clusterState.blocks().indices().getOrDefault(index, emptySet()).stream() + .filter(clusterBlock -> clusterBlock.id() == block.getBlock().id()).count(), equalTo(1L)); + } + } + + public static void disableIndexBlock(String index, APIBlock block) { + disableIndexBlock(index, block.settingName()); + } } diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index bbbde24672b8d..7d537652cbddd 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -142,6 +142,8 @@ import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction; import org.elasticsearch.action.admin.indices.open.OpenIndexAction; import org.elasticsearch.action.admin.indices.open.TransportOpenIndexAction; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockAction; +import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction; import org.elasticsearch.action.admin.indices.recovery.RecoveryAction; import org.elasticsearch.action.admin.indices.recovery.TransportRecoveryAction; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; @@ -300,6 +302,7 @@ import org.elasticsearch.rest.action.admin.cluster.dangling.RestDeleteDanglingIndexAction; import org.elasticsearch.rest.action.admin.cluster.dangling.RestImportDanglingIndexAction; import org.elasticsearch.rest.action.admin.cluster.dangling.RestListDanglingIndicesAction; +import org.elasticsearch.rest.action.admin.indices.RestAddIndexBlockAction; import org.elasticsearch.rest.action.admin.indices.RestAnalyzeAction; import org.elasticsearch.rest.action.admin.indices.RestClearIndicesCacheAction; import org.elasticsearch.rest.action.admin.indices.RestCloseIndexAction; @@ -549,6 +552,7 @@ public void reg actions.register(CloseIndexAction.INSTANCE, TransportCloseIndexAction.class); actions.register(IndicesExistsAction.INSTANCE, TransportIndicesExistsAction.class); actions.register(TypesExistsAction.INSTANCE, TransportTypesExistsAction.class); + actions.register(AddIndexBlockAction.INSTANCE, TransportAddIndexBlockAction.class); actions.register(GetMappingsAction.INSTANCE, TransportGetMappingsAction.class); actions.register(GetFieldMappingsAction.INSTANCE, TransportGetFieldMappingsAction.class, TransportGetFieldMappingsIndexAction.class); @@ -701,6 +705,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestDeleteIndexAction()); registerHandler.accept(new RestCloseIndexAction()); registerHandler.accept(new RestOpenIndexAction()); + registerHandler.accept(new RestAddIndexBlockAction()); registerHandler.accept(new RestUpdateSettingsAction()); registerHandler.accept(new RestGetSettingsAction()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockAction.java new file mode 100644 index 0000000000000..a993ba5b7004d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockAction.java @@ -0,0 +1,32 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.readonly; + +import org.elasticsearch.action.ActionType; + +public class AddIndexBlockAction extends ActionType { + + public static final AddIndexBlockAction INSTANCE = new AddIndexBlockAction(); + public static final String NAME = "indices:admin/block/add"; + + private AddIndexBlockAction() { + super(NAME, AddIndexBlockResponse::new); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockClusterStateUpdateRequest.java new file mode 100644 index 0000000000000..92f5329c069c7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockClusterStateUpdateRequest.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.readonly; + +import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest; +import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock; + +/** + * Cluster state update request that allows to add a block to one or more indices + */ +public class AddIndexBlockClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest { + + private final APIBlock block; + private long taskId; + + public AddIndexBlockClusterStateUpdateRequest(final APIBlock block, final long taskId) { + this.block = block; + this.taskId = taskId; + } + + public long taskId() { + return taskId; + } + + public APIBlock getBlock() { + return block; + } + + public AddIndexBlockClusterStateUpdateRequest taskId(final long taskId) { + this.taskId = taskId; + return this; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequest.java new file mode 100644 index 0000000000000..72e8707f969ed --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequest.java @@ -0,0 +1,125 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.readonly; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.CollectionUtils; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** + * A request to add a block to an index. + */ +public class AddIndexBlockRequest extends AcknowledgedRequest implements IndicesRequest.Replaceable { + + private final APIBlock block; + private String[] indices; + private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen(); + + public AddIndexBlockRequest(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + block = APIBlock.readFrom(in); + } + + /** + * Constructs a new request for the specified block and indices + */ + public AddIndexBlockRequest(APIBlock block, String... indices) { + this.block = Objects.requireNonNull(block); + this.indices = Objects.requireNonNull(indices); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (CollectionUtils.isEmpty(indices)) { + validationException = addValidationError("index is missing", validationException); + } + return validationException; + } + + /** + * Returns the indices to be blocked + */ + @Override + public String[] indices() { + return indices; + } + + /** + * Sets the indices to be blocked + * @param indices the indices to be blocked + * @return the request itself + */ + @Override + public AddIndexBlockRequest indices(String... indices) { + this.indices = Objects.requireNonNull(indices); + return this; + } + + /** + * Specifies what type of requested indices to ignore and how to deal with wildcard expressions. + * For example indices that don't exist. + * + * @return the desired behaviour regarding indices to ignore and wildcard indices expressions + */ + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + /** + * Specifies what type of requested indices to ignore and how to deal wild wildcard expressions. + * For example indices that don't exist. + * + * @param indicesOptions the desired behaviour regarding indices to ignore and wildcard indices expressions + * @return the request itself + */ + public AddIndexBlockRequest indicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + + /** + * Returns the block to be added + */ + public APIBlock getBlock() { + return block; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + indicesOptions.writeIndicesOptions(out); + block.writeTo(out); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequestBuilder.java new file mode 100644 index 0000000000000..ba2ba5434d7cb --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequestBuilder.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.readonly; + +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock; + +/** + * Builder for add index block request + */ +public class AddIndexBlockRequestBuilder + extends AcknowledgedRequestBuilder { + + public AddIndexBlockRequestBuilder(ElasticsearchClient client, AddIndexBlockAction action, APIBlock block, String... indices) { + super(client, action, new AddIndexBlockRequest(block, indices)); + } + + /** + * Sets the indices to be blocked + * + * @param indices the indices to be blocked + * @return the request itself + */ + public AddIndexBlockRequestBuilder setIndices(String... indices) { + request.indices(indices); + return this; + } + + /** + * Specifies what type of requested indices to ignore and wildcard indices expressions + * For example indices that don't exist. + * + * @param indicesOptions the desired behaviour regarding indices to ignore and indices wildcard expressions + * @return the request itself + */ + public AddIndexBlockRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) { + request.indicesOptions(indicesOptions); + return this; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockResponse.java new file mode 100644 index 0000000000000..ac323e3ce4e0b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockResponse.java @@ -0,0 +1,275 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.readonly; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +import static java.util.Collections.unmodifiableList; + +public class AddIndexBlockResponse extends ShardsAcknowledgedResponse { + + private final List indices; + + AddIndexBlockResponse(StreamInput in) throws IOException { + super(in, true); + indices = unmodifiableList(in.readList(AddBlockResult::new)); + } + + public AddIndexBlockResponse(final boolean acknowledged, final boolean shardsAcknowledged, final List indices) { + super(acknowledged, shardsAcknowledged); + this.indices = unmodifiableList(Objects.requireNonNull(indices)); + } + + public List getIndices() { + return indices; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + writeShardsAcknowledged(out); + out.writeList(indices); + } + + @Override + protected void addCustomFields(final XContentBuilder builder, final Params params) throws IOException { + super.addCustomFields(builder, params); + builder.startArray("indices"); + for (AddBlockResult index : indices) { + index.toXContent(builder, params); + } + builder.endArray(); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + public static class AddBlockResult implements Writeable, ToXContentFragment { + + private final Index index; + private final @Nullable Exception exception; + private final @Nullable AddBlockShardResult[] shards; + + public AddBlockResult(final Index index) { + this(index, null, null); + } + + public AddBlockResult(final Index index, final Exception failure) { + this(index, Objects.requireNonNull(failure), null); + } + + public AddBlockResult(final Index index, final AddBlockShardResult[] shards) { + this(index, null, Objects.requireNonNull(shards)); + } + + private AddBlockResult(final Index index, @Nullable final Exception exception, @Nullable final AddBlockShardResult[] shards) { + this.index = Objects.requireNonNull(index); + this.exception = exception; + this.shards = shards; + } + + AddBlockResult(final StreamInput in) throws IOException { + this.index = new Index(in); + this.exception = in.readException(); + this.shards = in.readOptionalArray(AddBlockShardResult::new, AddBlockShardResult[]::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + index.writeTo(out); + out.writeException(exception); + out.writeOptionalArray(shards); + } + + public Index getIndex() { + return index; + } + + public Exception getException() { + return exception; + } + + public AddBlockShardResult[] getShards() { + return shards; + } + + public boolean hasFailures() { + if (exception != null) { + return true; + } + if (shards != null) { + for (AddBlockShardResult shard : shards) { + if (shard.hasFailures()) { + return true; + } + } + } + return false; + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + { + builder.field("name", index.getName()); + if (hasFailures()) { + if (exception != null) { + builder.startObject("exception"); + ElasticsearchException.generateFailureXContent(builder, params, exception, true); + builder.endObject(); + } else { + builder.startArray("failed_shards"); + for (AddBlockShardResult shard : shards) { + if (shard.hasFailures()) { + shard.toXContent(builder, params); + } + } + builder.endArray(); + } + } else { + builder.field("blocked", true); + } + } + return builder.endObject(); + } + + @Override + public String toString() { + return Strings.toString(this); + } + } + + public static class AddBlockShardResult implements Writeable, ToXContentFragment { + + private final int id; + private final Failure[] failures; + + public AddBlockShardResult(final int id, final Failure[] failures) { + this.id = id; + this.failures = failures; + } + + AddBlockShardResult(final StreamInput in) throws IOException { + this.id = in.readVInt(); + this.failures = in.readOptionalArray(Failure::readFailure, Failure[]::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeVInt(id); + out.writeOptionalArray(failures); + } + + public boolean hasFailures() { + return CollectionUtils.isEmpty(failures) == false; + } + + public int getId() { + return id; + } + + public Failure[] getFailures() { + return failures; + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + { + builder.field("id", String.valueOf(id)); + builder.startArray("failures"); + if (failures != null) { + for (Failure failure : failures) { + failure.toXContent(builder, params); + } + } + builder.endArray(); + } + return builder.endObject(); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + public static class Failure extends DefaultShardOperationFailedException { + + private @Nullable String nodeId; + + private Failure(StreamInput in) throws IOException { + super(in); + nodeId = in.readOptionalString(); + } + + public Failure(final String index, final int shardId, final Throwable reason) { + this(index, shardId, reason, null); + } + + public Failure(final String index, final int shardId, final Throwable reason, final String nodeId) { + super(index, shardId, reason); + this.nodeId = nodeId; + } + + public String getNodeId() { + return nodeId; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(nodeId); + } + + @Override + public XContentBuilder innerToXContent(final XContentBuilder builder, final Params params) throws IOException { + if (nodeId != null) { + builder.field("node", nodeId); + } + return super.innerToXContent(builder, params); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + static Failure readFailure(final StreamInput in) throws IOException { + return new Failure(in); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportAddIndexBlockAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportAddIndexBlockAction.java new file mode 100644 index 0000000000000..b831882549846 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportAddIndexBlockAction.java @@ -0,0 +1,119 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.readonly; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetadataIndexStateService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.index.Index; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Collections; + +/** + * Adds a single index level block to a given set of indices. Not only does it set the correct setting, + * but it ensures that, in case of a write block, once successfully returning to the user, all shards + * of the index are properly accounting for the block, for instance, when adding a write block all + * in-flight writes to an index have been completed prior to the response being returned. These actions + * are done in multiple cluster state updates (at least two). See also {@link TransportVerifyShardIndexBlockAction} + * for the eventual delegation for shard-level verification. + */ +public class TransportAddIndexBlockAction extends TransportMasterNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportAddIndexBlockAction.class); + + private final MetadataIndexStateService indexStateService; + private final DestructiveOperations destructiveOperations; + + @Inject + public TransportAddIndexBlockAction(TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, MetadataIndexStateService indexStateService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + DestructiveOperations destructiveOperations) { + super(AddIndexBlockAction.NAME, transportService, clusterService, threadPool, actionFilters, AddIndexBlockRequest::new, + indexNameExpressionResolver); + this.indexStateService = indexStateService; + this.destructiveOperations = destructiveOperations; + } + + @Override + protected String executor() { + // no need to use a thread pool, we go async right away + return ThreadPool.Names.SAME; + } + + @Override + protected AddIndexBlockResponse read(StreamInput in) throws IOException { + return new AddIndexBlockResponse(in); + } + + @Override + protected void doExecute(Task task, AddIndexBlockRequest request, ActionListener listener) { + destructiveOperations.failDestructive(request.indices()); + super.doExecute(task, request, listener); + } + + @Override + protected ClusterBlockException checkBlock(AddIndexBlockRequest request, ClusterState state) { + if (request.getBlock().getBlock().levels().contains(ClusterBlockLevel.METADATA_WRITE) && + state.blocks().global(ClusterBlockLevel.METADATA_WRITE).isEmpty()) { + return null; + } + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, + indexNameExpressionResolver.concreteIndexNames(state, request)); + } + + @Override + protected void masterOperation(final Task task, + final AddIndexBlockRequest request, + final ClusterState state, + final ActionListener listener) throws Exception { + final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); + if (concreteIndices == null || concreteIndices.length == 0) { + listener.onResponse(new AddIndexBlockResponse(true, false, Collections.emptyList())); + return; + } + + final AddIndexBlockClusterStateUpdateRequest addBlockRequest = new AddIndexBlockClusterStateUpdateRequest(request.getBlock(), + task.getId()) + .ackTimeout(request.timeout()) + .masterNodeTimeout(request.masterNodeTimeout()) + .indices(concreteIndices); + indexStateService.addIndexBlock(addBlockRequest, ActionListener.delegateResponse(listener, (delegatedListener, t) -> { + logger.debug(() -> new ParameterizedMessage("failed to mark indices as readonly [{}]", (Object) concreteIndices), t); + delegatedListener.onFailure(t); + })); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java new file mode 100644 index 0000000000000..73990cb3ab75e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java @@ -0,0 +1,171 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.readonly; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.action.support.replication.ReplicationRequest; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.TransportReplicationAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; + +/** + * Action used to verify whether shards have properly applied a given index block, + * and are no longer executing any operations in violation of that block. This action + * requests all operation permits of the shard in order to wait for all write operations + * to complete. + */ +public class TransportVerifyShardIndexBlockAction extends TransportReplicationAction< + TransportVerifyShardIndexBlockAction.ShardRequest, TransportVerifyShardIndexBlockAction.ShardRequest, ReplicationResponse> { + + public static final String NAME = AddIndexBlockAction.NAME + "[s]"; + public static final ActionType TYPE = new ActionType<>(NAME, ReplicationResponse::new); + protected Logger logger = LogManager.getLogger(getClass()); + + @Inject + public TransportVerifyShardIndexBlockAction(final Settings settings, final TransportService transportService, + final ClusterService clusterService, final IndicesService indicesService, + final ThreadPool threadPool, final ShardStateAction stateAction, + final ActionFilters actionFilters) { + super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, + ShardRequest::new, ShardRequest::new, ThreadPool.Names.MANAGEMENT); + } + + @Override + protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException { + return new ReplicationResponse(in); + } + + @Override + protected void acquirePrimaryOperationPermit(final IndexShard primary, + final ShardRequest request, + final ActionListener onAcquired) { + primary.acquireAllPrimaryOperationsPermits(onAcquired, request.timeout()); + } + + @Override + protected void acquireReplicaOperationPermit(final IndexShard replica, + final ShardRequest request, + final ActionListener onAcquired, + final long primaryTerm, + final long globalCheckpoint, + final long maxSeqNoOfUpdateOrDeletes) { + replica.acquireAllReplicaOperationsPermits(primaryTerm, globalCheckpoint, maxSeqNoOfUpdateOrDeletes, onAcquired, request.timeout()); + } + + @Override + protected void shardOperationOnPrimary(final ShardRequest shardRequest, final IndexShard primary, + ActionListener> listener) { + ActionListener.completeWith(listener, () -> { + executeShardOperation(shardRequest, primary); + return new PrimaryResult<>(shardRequest, new ReplicationResponse()); + }); + } + + @Override + protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + executeShardOperation(shardRequest, replica); + return new ReplicaResult(); + }); + } + + private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) { + final ShardId shardId = indexShard.shardId(); + if (indexShard.getActiveOperationsCount() != IndexShard.OPERATIONS_BLOCKED) { + throw new IllegalStateException("index shard " + shardId + + " is not blocking all operations while waiting for block " + request.clusterBlock()); + } + + final ClusterBlocks clusterBlocks = clusterService.state().blocks(); + if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) { + throw new IllegalStateException("index shard " + shardId + " has not applied block " + request.clusterBlock()); + } + } + + @Override + protected ReplicationOperation.Replicas newReplicasProxy() { + return new VerifyShardReadOnlyActionReplicasProxy(); + } + + /** + * A {@link ReplicasProxy} that marks as stale the shards that are unavailable during the verification + * and the flush of the shard. This is done to ensure that such shards won't be later promoted as primary + * or reopened in an unverified state with potential non flushed translog operations. + */ + class VerifyShardReadOnlyActionReplicasProxy extends ReplicasProxy { + @Override + public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final long primaryTerm, + final ActionListener listener) { + shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener); + } + } + + public static class ShardRequest extends ReplicationRequest { + + private final ClusterBlock clusterBlock; + + ShardRequest(StreamInput in) throws IOException { + super(in); + clusterBlock = new ClusterBlock(in); + } + + public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) { + super(shardId); + this.clusterBlock = Objects.requireNonNull(clusterBlock); + setParentTask(parentTaskId); + } + + @Override + public String toString() { + return "verify shard " + shardId + " before block with " + clusterBlock; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + clusterBlock.writeTo(out); + } + + public ClusterBlock clusterBlock() { + return clusterBlock; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java index 221df8f93362c..f63ecd4d39e7e 100644 --- a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java +++ b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java @@ -21,9 +21,6 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; -import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction; -import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistRequestBuilder; @@ -42,6 +39,9 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; @@ -73,6 +73,9 @@ import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequestBuilder; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequestBuilder; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryRequestBuilder; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; @@ -117,6 +120,7 @@ import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequestBuilder; import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock; import org.elasticsearch.common.Nullable; /** @@ -347,6 +351,23 @@ public interface IndicesAdminClient extends ElasticsearchClient { */ void open(OpenIndexRequest request, ActionListener listener); + /** + * Adds a block to an index + * + * @param block The block to add + * @param indices The name of the indices to add the block to + */ + AddIndexBlockRequestBuilder prepareAddBlock(APIBlock block, String... indices); + + /** + * Adds a block to an index + * + * @param request The add index block request + * @param listener A listener to be notified with a result + * @see org.elasticsearch.client.Requests#openIndexRequest(String) + */ + void addBlock(AddIndexBlockRequest request, ActionListener listener); + /** * Opens one or more indices based on their index name. * diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 286b62c42ac26..daab9926ef945 100644 --- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -30,9 +30,6 @@ import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequestBuilder; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; -import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; -import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction; -import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; @@ -169,6 +166,9 @@ import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesAction; import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest; import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; @@ -211,6 +211,10 @@ import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequestBuilder; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockAction; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequestBuilder; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryAction; import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryRequestBuilder; @@ -347,6 +351,7 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.FilterClient; import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; @@ -1438,6 +1443,16 @@ public void open(final OpenIndexRequest request, final ActionListener listener) { + execute(AddIndexBlockAction.INSTANCE, request, listener); + } + @Override public OpenIndexRequestBuilder prepareOpen(String... indices) { return new OpenIndexRequestBuilder(this, OpenIndexAction.INSTANCE, indices); diff --git a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java index a0af5aa88c1fe..5efbdffd40837 100644 --- a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java +++ b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java @@ -86,6 +86,7 @@ public int id() { return this.id; } + @Nullable public String uuid() { return uuid; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 6427cd8dc3745..de9f86cbb95b6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -188,25 +189,80 @@ public Iterator> settings() { public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas"; public static final Setting INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING; - public static final String SETTING_READ_ONLY = "index.blocks.read_only"; - public static final Setting INDEX_READ_ONLY_SETTING = - Setting.boolSetting(SETTING_READ_ONLY, false, Property.Dynamic, Property.IndexScope); - public static final String SETTING_BLOCKS_READ = "index.blocks.read"; - public static final Setting INDEX_BLOCKS_READ_SETTING = - Setting.boolSetting(SETTING_BLOCKS_READ, false, Property.Dynamic, Property.IndexScope); + public enum APIBlock implements Writeable { + READ_ONLY("read_only", INDEX_READ_ONLY_BLOCK), + READ("read", INDEX_READ_BLOCK), + WRITE("write", INDEX_WRITE_BLOCK), + METADATA("metadata", INDEX_METADATA_BLOCK), + READ_ONLY_ALLOW_DELETE("read_only_allow_delete", INDEX_READ_ONLY_ALLOW_DELETE_BLOCK); - public static final String SETTING_BLOCKS_WRITE = "index.blocks.write"; - public static final Setting INDEX_BLOCKS_WRITE_SETTING = - Setting.boolSetting(SETTING_BLOCKS_WRITE, false, Property.Dynamic, Property.IndexScope); + final String name; + final String settingName; + final Setting setting; + final ClusterBlock block; - public static final String SETTING_BLOCKS_METADATA = "index.blocks.metadata"; - public static final Setting INDEX_BLOCKS_METADATA_SETTING = - Setting.boolSetting(SETTING_BLOCKS_METADATA, false, Property.Dynamic, Property.IndexScope); + APIBlock(String name, ClusterBlock block) { + this.name = name; + this.settingName = "index.blocks." + name; + this.setting = Setting.boolSetting(settingName, false, Property.Dynamic, Property.IndexScope); + this.block = block; + } + + public String settingName() { + return settingName; + } + + public Setting setting() { + return setting; + } + + public ClusterBlock getBlock() { + return block; + } + + public static APIBlock fromName(String name) { + for (APIBlock block : APIBlock.values()) { + if (block.name.equals(name)) { + return block; + } + } + throw new IllegalArgumentException("No block found with name " + name); + } + + public static APIBlock fromSetting(String settingName) { + for (APIBlock block : APIBlock.values()) { + if (block.settingName.equals(settingName)) { + return block; + } + } + throw new IllegalArgumentException("No block found with setting name " + settingName); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(ordinal()); + } + + public static APIBlock readFrom(StreamInput input) throws IOException { + return APIBlock.values()[input.readVInt()]; + } + } + + public static final String SETTING_READ_ONLY = APIBlock.READ_ONLY.settingName(); + public static final Setting INDEX_READ_ONLY_SETTING = APIBlock.READ_ONLY.setting(); + + public static final String SETTING_BLOCKS_READ = APIBlock.READ.settingName(); + public static final Setting INDEX_BLOCKS_READ_SETTING = APIBlock.READ.setting(); + + public static final String SETTING_BLOCKS_WRITE = APIBlock.WRITE.settingName(); + public static final Setting INDEX_BLOCKS_WRITE_SETTING = APIBlock.WRITE.setting(); + + public static final String SETTING_BLOCKS_METADATA = APIBlock.METADATA.settingName(); + public static final Setting INDEX_BLOCKS_METADATA_SETTING = APIBlock.METADATA.setting(); - public static final String SETTING_READ_ONLY_ALLOW_DELETE = "index.blocks.read_only_allow_delete"; - public static final Setting INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING = - Setting.boolSetting(SETTING_READ_ONLY_ALLOW_DELETE, false, Property.Dynamic, Property.IndexScope); + public static final String SETTING_READ_ONLY_ALLOW_DELETE = APIBlock.READ_ONLY_ALLOW_DELETE.settingName(); + public static final Setting INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING = APIBlock.READ_ONLY_ALLOW_DELETE.setting(); public static final String SETTING_VERSION_CREATED = "index.version.created"; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java index bdcca625a61cb..f1f833d840e3a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java @@ -34,6 +34,11 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.ShardResult; import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction; import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockClusterStateUpdateRequest; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse.AddBlockResult; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse.AddBlockShardResult; +import org.elasticsearch.action.admin.indices.readonly.TransportVerifyShardIndexBlockAction; import org.elasticsearch.action.support.ActiveShardsObserver; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; @@ -44,6 +49,7 @@ import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; @@ -91,7 +97,7 @@ import static java.util.Collections.unmodifiableMap; /** - * Service responsible for submitting open/close index requests + * Service responsible for submitting open/close index requests as well as for adding index blocks */ public class MetadataIndexStateService { private static final Logger logger = LogManager.getLogger(MetadataIndexStateService.class); @@ -303,6 +309,176 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final Map> addIndexBlock(final Index[] indices, final ClusterState currentState, + final APIBlock block) { + final Metadata.Builder metadata = Metadata.builder(currentState.metadata()); + + final Set indicesToAddBlock = new HashSet<>(); + for (Index index : indices) { + metadata.getSafe(index); // to check if index exists + if (currentState.blocks().hasIndexBlock(index.getName(), block.block)) { + logger.debug("index {} already has block {}, ignoring", index, block.block); + } else { + indicesToAddBlock.add(index); + } + } + + if (indicesToAddBlock.isEmpty()) { + return Tuple.tuple(currentState, Collections.emptyMap()); + } + + final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); + final Map blockedIndices = new HashMap<>(); + + for (Index index : indicesToAddBlock) { + ClusterBlock indexBlock = null; + final Set clusterBlocks = currentState.blocks().indices().get(index.getName()); + if (clusterBlocks != null) { + for (ClusterBlock clusterBlock : clusterBlocks) { + if (clusterBlock.id() == block.block.id()) { + // Reuse the existing UUID-based block + indexBlock = clusterBlock; + break; + } + } + } + if (indexBlock == null) { + // Create a new UUID-based block + indexBlock = createUUIDBasedBlock(block.block); + } + assert Strings.hasLength(indexBlock.uuid()) : "Block should have a UUID"; + blocks.addIndexBlock(index.getName(), indexBlock); + blockedIndices.put(index, indexBlock); + // update index settings as well to match the block + final IndexMetadata indexMetadata = metadata.getSafe(index); + if (block.setting().get(indexMetadata.getSettings()) == false) { + final Settings updatedSettings = Settings.builder() + .put(indexMetadata.getSettings()).put(block.settingName(), true).build(); + + metadata.put(IndexMetadata.builder(indexMetadata) + .settings(updatedSettings) + .settingsVersion(indexMetadata.getSettingsVersion() + 1)); + } + } + + logger.info("adding block {} to indices {}", block.name, + blockedIndices.keySet().stream().map(Object::toString).collect(Collectors.toList())); + return Tuple.tuple(ClusterState.builder(currentState).blocks(blocks).metadata(metadata) + .routingTable(routingTable.build()).build(), blockedIndices); + } + + /** + * Adds an index block based on the given request, and notifies the listener upon completion. + * Adding blocks is done in three steps: + * - First, a temporary UUID-based block is added to the index + * (see {@link #addIndexBlock(Index[], ClusterState, APIBlock)}. + * - Second, shards are checked to have properly applied the UUID-based block. + * (see {@link WaitForBlocksApplied}). + * - Third, the temporary UUID-based block is turned into a full block + * (see {@link #finalizeBlock(ClusterState, Map, Map, APIBlock)}. + * Using this three-step process ensures non-interference by other operations in case where + * we notify successful completion here. + */ + public void addIndexBlock(AddIndexBlockClusterStateUpdateRequest request, + ActionListener listener) { + final Index[] concreteIndices = request.indices(); + if (concreteIndices == null || concreteIndices.length == 0) { + throw new IllegalArgumentException("Index name is required"); + } + List writeIndices = new ArrayList<>(); + SortedMap lookup = clusterService.state().metadata().getIndicesLookup(); + for (Index index : concreteIndices) { + IndexAbstraction ia = lookup.get(index.getName()); + if (ia != null && ia.getParentDataStream() != null && ia.getParentDataStream().getWriteIndex().getIndex().equals(index)) { + writeIndices.add(index.getName()); + } + } + if (writeIndices.size() > 0) { + throw new IllegalArgumentException("cannot add a block to the following data stream write indices [" + + Strings.collectionToCommaDelimitedString(writeIndices) + "]"); + } + + clusterService.submitStateUpdateTask("add-index-block-[" + request.getBlock().name + "]-" + Arrays.toString(concreteIndices), + new ClusterStateUpdateTask(Priority.URGENT) { + + private Map blockedIndices; + + @Override + public ClusterState execute(final ClusterState currentState) { + final Tuple> tup = + addIndexBlock(concreteIndices, currentState, request.getBlock()); + blockedIndices = tup.v2(); + return tup.v1(); + } + + @Override + public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { + if (oldState == newState) { + assert blockedIndices.isEmpty() : "List of blocked indices is not empty but cluster state wasn't changed"; + listener.onResponse(new AddIndexBlockResponse(true, false, Collections.emptyList())); + } else { + assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed"; + threadPool.executor(ThreadPool.Names.MANAGEMENT) + .execute(new WaitForBlocksApplied(blockedIndices, request, + ActionListener.wrap(verifyResults -> + clusterService.submitStateUpdateTask("finalize-index-block-[" + request.getBlock().name + + "]-[" + blockedIndices.keySet().stream().map(Index::getName) + .collect(Collectors.joining(", ")) + "]", + new ClusterStateUpdateTask(Priority.URGENT) { + private final List indices = new ArrayList<>(); + + @Override + public ClusterState execute(final ClusterState currentState) throws Exception { + Tuple> addBlockResult = + finalizeBlock(currentState, blockedIndices, verifyResults, request.getBlock()); + assert verifyResults.size() == addBlockResult.v2().size(); + indices.addAll(addBlockResult.v2()); + return addBlockResult.v1(); + } + + @Override + public void onFailure(final String source, final Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(final String source, + final ClusterState oldState, + final ClusterState newState) { + + final boolean acknowledged = indices.stream().noneMatch( + AddBlockResult::hasFailures); + listener.onResponse(new AddIndexBlockResponse(acknowledged, acknowledged, indices)); + } + }), + listener::onFailure) + ) + ); + } + } + + @Override + public void onFailure(final String source, final Exception e) { + listener.onFailure(e); + } + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + } + ); + } + /** * Step 2 - Wait for indices to be ready for closing *

@@ -428,6 +604,112 @@ public void onFailure(Exception e) { } } + /** + * Helper class that coordinates with shards to ensure that blocks have been properly applied to all shards using + * {@link TransportVerifyShardIndexBlockAction}. + */ + class WaitForBlocksApplied extends ActionRunnable> { + + private final Map blockedIndices; + private final AddIndexBlockClusterStateUpdateRequest request; + + private WaitForBlocksApplied(final Map blockedIndices, + final AddIndexBlockClusterStateUpdateRequest request, + final ActionListener> listener) { + super(listener); + if (blockedIndices == null || blockedIndices.isEmpty()) { + throw new IllegalArgumentException("Cannot wait for blocks to be applied, list of blocked indices is empty or null"); + } + this.blockedIndices = blockedIndices; + this.request = request; + } + + @Override + protected void doRun() throws Exception { + final Map results = ConcurrentCollections.newConcurrentMap(); + final CountDown countDown = new CountDown(blockedIndices.size()); + final ClusterState state = clusterService.state(); + blockedIndices.forEach((index, block) -> { + waitForShardsReady(index, block, state, response -> { + results.put(index, response); + if (countDown.countDown()) { + listener.onResponse(unmodifiableMap(results)); + } + }); + }); + } + + private void waitForShardsReady(final Index index, + final ClusterBlock clusterBlock, + final ClusterState state, + final Consumer onResponse) { + final IndexMetadata indexMetadata = state.metadata().index(index); + if (indexMetadata == null) { + logger.debug("index {} has since been deleted, ignoring", index); + onResponse.accept(new AddBlockResult(index)); + return; + } + final IndexRoutingTable indexRoutingTable = state.routingTable().index(index); + if (indexRoutingTable == null || indexMetadata.getState() == IndexMetadata.State.CLOSE) { + logger.debug("index {} is closed, no need to wait for shards, ignoring", index); + onResponse.accept(new AddBlockResult(index)); + return; + } + + final ImmutableOpenIntMap shards = indexRoutingTable.getShards(); + final AtomicArray results = new AtomicArray<>(shards.size()); + final CountDown countDown = new CountDown(shards.size()); + + for (IntObjectCursor shard : shards) { + final IndexShardRoutingTable shardRoutingTable = shard.value; + final int shardId = shardRoutingTable.shardId().id(); + sendVerifyShardBlockRequest(shardRoutingTable, clusterBlock, new NotifyOnceListener() { + @Override + public void innerOnResponse(final ReplicationResponse replicationResponse) { + AddBlockShardResult.Failure[] failures = Arrays.stream(replicationResponse.getShardInfo().getFailures()) + .map(f -> new AddBlockShardResult.Failure(f.index(), f.shardId(), f.getCause(), f.nodeId())) + .toArray(AddBlockShardResult.Failure[]::new); + results.setOnce(shardId, new AddBlockShardResult(shardId, failures)); + processIfFinished(); + } + + @Override + public void innerOnFailure(final Exception e) { + AddBlockShardResult.Failure failure = new AddBlockShardResult.Failure(index.getName(), shardId, e); + results.setOnce(shardId, new AddBlockShardResult(shardId, new AddBlockShardResult.Failure[]{failure})); + processIfFinished(); + } + + private void processIfFinished() { + if (countDown.countDown()) { + onResponse.accept(new AddBlockResult(index, results.toArray(new AddBlockShardResult[results.length()]))); + } + } + }); + } + } + + private void sendVerifyShardBlockRequest(final IndexShardRoutingTable shardRoutingTable, + final ClusterBlock block, + final ActionListener listener) { + final ShardId shardId = shardRoutingTable.shardId(); + if (shardRoutingTable.primaryShard().unassigned()) { + logger.debug("primary shard {} is unassigned, ignoring", shardId); + final ReplicationResponse response = new ReplicationResponse(); + response.setShardInfo(new ReplicationResponse.ShardInfo(shardRoutingTable.size(), shardRoutingTable.size())); + listener.onResponse(response); + return; + } + final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId()); + final TransportVerifyShardIndexBlockAction.ShardRequest shardRequest = + new TransportVerifyShardIndexBlockAction.ShardRequest(shardId, block, parentTaskId); + if (request.ackTimeout() != null) { + shardRequest.timeout(request.ackTimeout()); + } + client.executeLocally(TransportVerifyShardIndexBlockAction.TYPE, shardRequest, listener); + } + } + /** * Step 3 - Move index states from OPEN to CLOSE in cluster state for indices that are ready for closing. */ @@ -617,6 +899,68 @@ ClusterState openIndices(final Index[] indices, final ClusterState currentState) return ClusterState.builder(updatedState).routingTable(routingTable.build()).build(); } + /** + * Finalizes the addition of blocks by turning the temporary UUID-based blocks into full blocks. + * @param currentState the cluster state to update + * @param blockedIndices the indices and their temporary UUID-based blocks to convert + * @param verifyResult the index-level results for adding the block + * @param block the full block to convert to + * @return the updated cluster state, as well as the (failed and successful) index-level results for adding the block + */ + static Tuple> finalizeBlock(final ClusterState currentState, + final Map blockedIndices, + final Map verifyResult, + final APIBlock block) { + + final Metadata.Builder metadata = Metadata.builder(currentState.metadata()); + final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); + + final Set effectivelyBlockedIndices = new HashSet<>(); + Map blockingResults = new HashMap<>(verifyResult); + for (Map.Entry result : verifyResult.entrySet()) { + final Index index = result.getKey(); + final boolean acknowledged = result.getValue().hasFailures() == false; + try { + if (acknowledged == false) { + logger.debug("verification of shards before blocking {} failed [{}]", index, result); + continue; + } + final IndexMetadata indexMetadata = metadata.getSafe(index); + final ClusterBlock tempBlock = blockedIndices.get(index); + assert tempBlock != null; + assert tempBlock.uuid() != null; + final ClusterBlock currentBlock = currentState.blocks().getIndexBlockWithId(index.getName(), tempBlock.id()); + if (currentBlock != null && currentBlock.equals(block.block)) { + logger.debug("verification of shards for {} succeeded, but block finalization already occurred" + + " (possibly for another block) [{}]", index, result); + continue; + } + + if (currentBlock == null || currentBlock.equals(tempBlock) == false) { + // we should report error in this case as the index can be left as open. + blockingResults.put(result.getKey(), new AddBlockResult(result.getKey(), new IllegalStateException( + "verification of shards before blocking " + index + " succeeded but block has been removed in the meantime"))); + logger.debug("verification of shards before blocking {} succeeded but block has been removed in the meantime", index); + continue; + } + + assert currentBlock != null && currentBlock.equals(tempBlock) && currentBlock.id() == block.block.id(); + + blocks.removeIndexBlockWithId(index.getName(), tempBlock.id()); + blocks.addIndexBlock(index.getName(), block.block); + + logger.debug("add block {} to index {} succeeded", block.block, index); + effectivelyBlockedIndices.add(index.getName()); + } catch (final IndexNotFoundException e) { + logger.debug("index {} has been deleted since blocking it started, ignoring", index); + } + } + logger.info("completed adding block {} to indices {}", block.name, effectivelyBlockedIndices); + return Tuple.tuple(ClusterState.builder(currentState).blocks(blocks).metadata(metadata).routingTable(routingTable.build()).build(), + blockingResults.values()); + } + /** * @return Generates a {@link ClusterBlock} that blocks read and write operations on soon-to-be-closed indices. The * cluster block is generated with the id value equals to {@link #INDEX_CLOSED_BLOCK_ID} and a unique UUID. @@ -632,4 +976,12 @@ public static boolean isIndexVerifiedBeforeClosed(final IndexMetadata indexMetad && VERIFIED_BEFORE_CLOSE_SETTING.exists(indexMetadata.getSettings()) && VERIFIED_BEFORE_CLOSE_SETTING.get(indexMetadata.getSettings()); } + + // Create UUID based block based on non-UUID one + public static ClusterBlock createUUIDBasedBlock(ClusterBlock clusterBlock) { + assert clusterBlock.uuid() == null : "no UUID expected on source block"; + return new ClusterBlock(clusterBlock.id(), UUIDs.randomBase64UUID(), "moving to block " + clusterBlock.description(), + clusterBlock.retryable(), clusterBlock.disableStatePersistence(), clusterBlock.isAllowReleaseResources(), clusterBlock.status(), + clusterBlock.levels()); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java index a797f0abbe461..a308af9222331 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -178,16 +178,9 @@ public ClusterState execute(ClusterState currentState) { } ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); - maybeUpdateClusterBlock(actualIndices, blocks, IndexMetadata.INDEX_READ_ONLY_BLOCK, - IndexMetadata.INDEX_READ_ONLY_SETTING, openSettings); - maybeUpdateClusterBlock(actualIndices, blocks, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK, - IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING, openSettings); - maybeUpdateClusterBlock(actualIndices, blocks, IndexMetadata.INDEX_METADATA_BLOCK, - IndexMetadata.INDEX_BLOCKS_METADATA_SETTING, openSettings); - maybeUpdateClusterBlock(actualIndices, blocks, IndexMetadata.INDEX_WRITE_BLOCK, - IndexMetadata.INDEX_BLOCKS_WRITE_SETTING, openSettings); - maybeUpdateClusterBlock(actualIndices, blocks, IndexMetadata.INDEX_READ_BLOCK, - IndexMetadata.INDEX_BLOCKS_READ_SETTING, openSettings); + for (IndexMetadata.APIBlock block : IndexMetadata.APIBlock.values()) { + maybeUpdateClusterBlock(actualIndices, blocks, block.block, block.setting, openSettings); + } if (!openIndices.isEmpty()) { for (Index index : openIndices) { diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestAddIndexBlockAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestAddIndexBlockAction.java new file mode 100644 index 0000000000000..47201b29966cd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestAddIndexBlockAction.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.indices; + +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.PUT; + +public class RestAddIndexBlockAction extends BaseRestHandler { + + @Override + public List routes() { + return List.of( + new Route(PUT, "/{index}/_block/{block}")); + } + + @Override + public String getName() { + return "add_index_block_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + AddIndexBlockRequest addIndexBlockRequest = new AddIndexBlockRequest( + IndexMetadata.APIBlock.fromName(request.param("block")), + Strings.splitStringByCommaToArray(request.param("index"))); + addIndexBlockRequest.masterNodeTimeout(request.paramAsTime("master_timeout", addIndexBlockRequest.masterNodeTimeout())); + addIndexBlockRequest.timeout(request.paramAsTime("timeout", addIndexBlockRequest.timeout())); + addIndexBlockRequest.indicesOptions(IndicesOptions.fromRequest(request, addIndexBlockRequest.indicesOptions())); + return channel -> client.admin().indices().addBlock(addIndexBlockRequest, new RestToXContentListener<>(channel)); + } + +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 760d199585b63..e6ce1ede36d20 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -1572,8 +1572,12 @@ public static void disableIndexBlock(String index, String block) { /** Enables an index block for the specified index */ public static void enableIndexBlock(String index, String block) { - Settings settings = Settings.builder().put(block, true).build(); - client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get(); + if (randomBoolean()) { + Settings settings = Settings.builder().put(block, true).build(); + client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get(); + } else { + client().admin().indices().prepareAddBlock(IndexMetadata.APIBlock.fromSetting(block), index).get(); + } } /** Sets or unsets the cluster read_only mode **/