Skip to content

Commit

Permalink
Merge branch 'main' into 37265_fix_dashbord_images
Browse files Browse the repository at this point in the history
  • Loading branch information
kibanamachine authored Jul 27, 2022
2 parents 3746c8e + 0fe8d3f commit fb88527
Show file tree
Hide file tree
Showing 99 changed files with 6,937 additions and 1,934 deletions.

Large diffs are not rendered by default.

21 changes: 10 additions & 11 deletions src/core/server/saved_objects/migrations/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ index.

### New control state
1. Two conditions have to be met before migrations begin:
1. The Elasticsearch shard allocation cluster setting `cluster.routing.allocation.enable` needs to be unset or set to 'all'. When set to 'primaries', 'new_primaries' or 'none', the migration will timeout when waiting for index yellow status before bulk indexing because the replica cannot be allocated.
1. The Elasticsearch shard allocation cluster setting `cluster.routing.allocation.enable` needs to be unset or set to 'all'. When set to 'primaries', 'new_primaries' or 'none', the migration will timeout when waiting for index green status before bulk indexing because the replica cannot be allocated.

As per the Elasticsearch docs https://www.elastic.co/guide/en/elasticsearch/reference/8.2/restart-cluster.html#restart-cluster-rolling when Cloud performs a rolling restart such as during an upgrade, it will temporarily disable shard allocation. Kibana therefore keeps retrying the INIT step to wait for shard allocation to be enabled again.

Expand Down Expand Up @@ -182,12 +182,12 @@ and the migration source index is the index the `.kibana` alias points to.
### Next action
`createIndex`

Create the target index. This operation is idempotent, if the index already exist, we wait until its status turns yellow
Create the target index. This operation is idempotent, if the index already exist, we wait until its status turns green

### New control state
1. If the action succeeds
`MARK_VERSION_INDEX_READY`
2. If the action fails with a `index_not_yellow_timeout`
2. If the action fails with a `index_not_green_timeout`
`CREATE_NEW_TARGET`


Expand Down Expand Up @@ -219,7 +219,7 @@ saved objects index in 7.4 it will be reindexed into `.kibana_pre7.4.0_001`)
### New control state
1. If the index creation succeeds
`LEGACY_REINDEX`
2. If the index creation task failed with a `index_not_yellow_timeout`
2. If the index creation task failed with a `index_not_green_timeout`
`LEGACY_REINDEX_WAIT_FOR_TASK`
## LEGACY_REINDEX
### Next action
Expand Down Expand Up @@ -261,10 +261,9 @@ new `.kibana` alias that points to `.kibana_pre6.5.0_001`.

## WAIT_FOR_YELLOW_SOURCE
### Next action
`waitForIndexStatusYellow`
`waitForIndexStatus` (status='yellow')

Wait for the Elasticsearch cluster to be in "yellow" state. It means the index's primary shard is allocated and the index is ready for searching/indexing documents, but ES wasn't able to allocate the replicas.
We don't have as much data redundancy as we could have, but it's enough to start the migration.
Wait for the source index to become yellow. This means the index's primary has been allocated and is ready for reading/searching. On a multi node cluster the replicas for this index might not be ready yet but since we're never writing to the source index it does not matter.

### New control state
1. If the action succeeds
Expand All @@ -285,15 +284,15 @@ Set a write block on the source index to prevent any older Kibana instances from
### Next action
`createIndex`

This operation is idempotent, if the index already exist, we wait until its status turns yellow.
This operation is idempotent, if the index already exist, we wait until its status turns green.

- Because we will be transforming documents before writing them into this index, we can already set the mappings to the target mappings for this version. The source index might contain documents belonging to a disabled plugin. So set `dynamic: false` mappings for any unknown saved object types.
- (Since we never query the temporary index we can potentially disable refresh to speed up indexing performance. Profile to see if gains justify complexity)

### New control state
1. If the action succeeds
`REINDEX_SOURCE_TO_TEMP_OPEN_PIT`
2. If the action fails with a `index_not_yellow_timeout`
2. If the action fails with a `index_not_green_timeout`
`CREATE_REINDEX_TEMP`

## REINDEX_SOURCE_TO_TEMP_OPEN_PIT
Expand Down Expand Up @@ -368,14 +367,14 @@ Set a write block on the temporary index so that we can clone it.
### Next action
`cloneIndex`

Ask elasticsearch to clone the temporary index into the target index. If the target index already exists (because another node already started the clone operation), wait until the clone is complete by waiting for a yellow index status.
Ask elasticsearch to clone the temporary index into the target index. If the target index already exists (because another node already started the clone operation), wait until the clone is complete by waiting for a green index status.

We can’t use the temporary index as our target index because one instance can complete the migration, delete a document, and then a second instance starts the reindex operation and re-creates the deleted document. By cloning the temporary index and only accepting writes/deletes from the cloned target index, we prevent lost acknowledged deletes.

### New control state
1. If the action succeeds
`OUTDATED_DOCUMENTS_SEARCH`
2. If the action fails with a `index_not_yellow_timeout`
2. If the action fails with a `index_not_green_timeout`
`CLONE_TEMP_TO_TARGET`

## OUTDATED_DOCUMENTS_SEARCH
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ export interface UnknownDocsFound {
}

/**
* Performs a search in ES, aggregating documents by type,
* retrieving a bunch of documents for each type.
* Performs a search in ES, aggregating documents by type, retrieving a bunch
* of documents for each type.
*
* @internal
* @param esClient The ES client to perform the search query
* @param targetIndices The ES indices to target
Expand Down
51 changes: 24 additions & 27 deletions src/core/server/saved_objects/migrations/actions/clone_index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import {
catchRetryableEsClientErrors,
RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import type { IndexNotFound, AcknowledgeResponse, IndexNotYellowTimeout } from '.';
import { waitForIndexStatusYellow } from './wait_for_index_status_yellow';
import type { IndexNotFound, AcknowledgeResponse } from '.';
import { type IndexNotGreenTimeout, waitForIndexStatus } from './wait_for_index_status';
import {
DEFAULT_TIMEOUT,
INDEX_AUTO_EXPAND_REPLICAS,
Expand Down Expand Up @@ -52,40 +52,37 @@ export const cloneIndex = ({
target,
timeout = DEFAULT_TIMEOUT,
}: CloneIndexParams): TaskEither.TaskEither<
RetryableEsClientError | IndexNotFound | IndexNotYellowTimeout | ClusterShardLimitExceeded,
RetryableEsClientError | IndexNotFound | IndexNotGreenTimeout | ClusterShardLimitExceeded,
CloneIndexResponse
> => {
const cloneTask: TaskEither.TaskEither<
RetryableEsClientError | IndexNotFound | ClusterShardLimitExceeded,
AcknowledgeResponse
> = () => {
return client.indices
.clone(
{
index: source,
target,
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
body: {
settings: {
index: {
// The source we're cloning from will have a write block set, so
// we need to remove it to allow writes to our newly cloned index
'blocks.write': false,
number_of_shards: INDEX_NUMBER_OF_SHARDS,
auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS,
// Set an explicit refresh interval so that we don't inherit the
// value from incorrectly configured index templates (not required
// after we adopt system indices)
refresh_interval: '1s',
// Bump priority so that recovery happens before newer indices
priority: 10,
},
.clone({
index: source,
target,
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
body: {
settings: {
index: {
// The source we're cloning from will have a write block set, so
// we need to remove it to allow writes to our newly cloned index
'blocks.write': false,
number_of_shards: INDEX_NUMBER_OF_SHARDS,
auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS,
// Set an explicit refresh interval so that we don't inherit the
// value from incorrectly configured index templates (not required
// after we adopt system indices)
refresh_interval: '1s',
// Bump priority so that recovery happens before newer indices
priority: 10,
},
},
timeout,
},
{ maxRetries: 0 /** handle retry ourselves for now */ }
)
timeout,
})
.then((response) => {
/**
* - acknowledged=false, we timed out before the cluster state was
Expand Down Expand Up @@ -136,7 +133,7 @@ export const cloneIndex = ({
} else {
// Otherwise, wait until the target index has a 'yellow' status.
return pipe(
waitForIndexStatusYellow({ client, index: target, timeout }),
waitForIndexStatus({ client, index: target, timeout, status: 'green' }),
TaskEither.map((value) => {
/** When the index status is 'yellow' we know that all shards were started */
return { acknowledged: true, shardsAcknowledged: true };
Expand Down
10 changes: 10 additions & 0 deletions src/core/server/saved_objects/migrations/actions/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@
* Uses the default value of 1000 for Elasticsearch reindex operation.
*/
export const BATCH_SIZE = 1_000;
/**
* When a request takes a long time to complete and hits the timeout or the
* client aborts that request due to the requestTimeout, our only course of
* action is to retry that request. This places our request at the end of the
* queue and adds more load to Elasticsearch just making things worse.
*
* So we want to choose as long a timeout as possible. Some load balancers /
* reverse proxies like ELB ignore TCP keep-alive packets so unless there's a
* request or response sent over the socket it will be dropped after 60s.
*/
export const DEFAULT_TIMEOUT = '60s';
/** Allocate 1 replica if there are enough data nodes, otherwise continue with 0 */
export const INDEX_AUTO_EXPAND_REPLICAS = '0-1';
Expand Down
74 changes: 40 additions & 34 deletions src/core/server/saved_objects/migrations/actions/create_index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
INDEX_AUTO_EXPAND_REPLICAS,
WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
} from './constants';
import { IndexNotYellowTimeout, waitForIndexStatusYellow } from './wait_for_index_status_yellow';
import { type IndexNotGreenTimeout, waitForIndexStatus } from './wait_for_index_status';
import { isClusterShardLimitExceeded } from './es_errors';

function aliasArrayToRecord(aliases: string[]): Record<string, estypes.IndicesAlias> {
Expand All @@ -44,6 +44,7 @@ export interface CreateIndexParams {
indexName: string;
mappings: IndexMapping;
aliases?: string[];
timeout?: string;
}
/**
* Creates an index with the given mappings
Expand All @@ -60,8 +61,9 @@ export const createIndex = ({
indexName,
mappings,
aliases = [],
timeout = DEFAULT_TIMEOUT,
}: CreateIndexParams): TaskEither.TaskEither<
RetryableEsClientError | IndexNotYellowTimeout | ClusterShardLimitExceeded,
RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded,
'create_index_succeeded'
> => {
const createIndexTask: TaskEither.TaskEither<
Expand All @@ -71,36 +73,34 @@ export const createIndex = ({
const aliasesObject = aliasArrayToRecord(aliases);

return client.indices
.create(
{
index: indexName,
// wait until all shards are available before creating the index
// (since number_of_shards=1 this does not have any effect atm)
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
// Wait up to 60s for the cluster state to update and all shards to be
// started
timeout: DEFAULT_TIMEOUT,
body: {
mappings,
aliases: aliasesObject,
settings: {
index: {
// ES rule of thumb: shards should be several GB to 10's of GB, so
// Kibana is unlikely to cross that limit.
number_of_shards: 1,
auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS,
// Set an explicit refresh interval so that we don't inherit the
// value from incorrectly configured index templates (not required
// after we adopt system indices)
refresh_interval: '1s',
// Bump priority so that recovery happens before newer indices
priority: 10,
},
.create({
index: indexName,
// wait up to timeout until the following shards are available before
// creating the index: primary, replica (only on multi node clusters)
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
// Timeout for the cluster state to update and all shards to become
// available. If the request doesn't complete within timeout,
// acknowledged or shards_acknowledged would be false.
timeout,
body: {
mappings,
aliases: aliasesObject,
settings: {
index: {
// ES rule of thumb: shards should be several GB to 10's of GB, so
// Kibana is unlikely to cross that limit.
number_of_shards: 1,
auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS,
// Set an explicit refresh interval so that we don't inherit the
// value from incorrectly configured index templates (not required
// after we adopt system indices)
refresh_interval: '1s',
// Bump priority so that recovery happens before newer indices
priority: 10,
},
},
},
{ maxRetries: 0 /** handle retry ourselves for now */ }
)
})
.then((res) => {
/**
* - acknowledged=false, we timed out before the cluster state was
Expand Down Expand Up @@ -140,19 +140,25 @@ export const createIndex = ({
return pipe(
createIndexTask,
TaskEither.chain<
RetryableEsClientError | IndexNotYellowTimeout | ClusterShardLimitExceeded,
RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded,
AcknowledgeResponse,
'create_index_succeeded'
>((res) => {
if (res.acknowledged && res.shardsAcknowledged) {
// If the cluster state was updated and all shards ackd we're done
// If the cluster state was updated and all shards started we're done
return TaskEither.right('create_index_succeeded');
} else {
// Otherwise, wait until the target index has a 'yellow' status.
// Otherwise, wait until the target index has a 'green' status meaning
// the primary (and on multi node clusters) the replica has been started
return pipe(
waitForIndexStatusYellow({ client, index: indexName, timeout: DEFAULT_TIMEOUT }),
waitForIndexStatus({
client,
index: indexName,
timeout: DEFAULT_TIMEOUT,
status: 'green',
}),
TaskEither.map(() => {
/** When the index status is 'yellow' we know that all shards were started */
/** When the index status is 'green' we know that all shards were started */
return 'create_index_succeeded';
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,14 @@ export const fetchIndices =
client,
indices,
}: FetchIndicesParams): TaskEither.TaskEither<RetryableEsClientError, FetchIndexResponse> =>
// @ts-expect-error @elastic/elasticsearch IndexState.alias and IndexState.mappings should be required
() => {
return client.indices
.get(
{
index: indices,
ignore_unavailable: true, // Don't return an error for missing indices. Note this *will* include closed indices, the docs are misleading https://github.com/elastic/elasticsearch/issues/63607
},
{ maxRetries: 0 }
)
.get({
index: indices,
ignore_unavailable: true, // Don't return an error for missing indices. Note this *will* include closed indices, the docs are misleading https://github.com/elastic/elasticsearch/issues/63607
})
.then((body) => {
return Either.right(body);
return Either.right(body as FetchIndexResponse);
})
.catch(catchRetryableEsClientErrors);
};
19 changes: 9 additions & 10 deletions src/core/server/saved_objects/migrations/actions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ export { removeWriteBlock } from './remove_write_block';
export type { CloneIndexResponse, CloneIndexParams } from './clone_index';
export { cloneIndex } from './clone_index';

export type {
WaitForIndexStatusYellowParams,
IndexNotYellowTimeout,
} from './wait_for_index_status_yellow';
import { IndexNotYellowTimeout, waitForIndexStatusYellow } from './wait_for_index_status_yellow';
export type { WaitForIndexStatusParams, IndexNotYellowTimeout } from './wait_for_index_status';
import {
type IndexNotGreenTimeout,
type IndexNotYellowTimeout,
waitForIndexStatus,
} from './wait_for_index_status';

export type { WaitForTaskResponse, WaitForTaskCompletionTimeout } from './wait_for_task';
import { waitForTask, WaitForTaskCompletionTimeout } from './wait_for_task';
Expand All @@ -48,7 +49,7 @@ export type { UpdateByQueryResponse } from './pickup_updated_mappings';
import { pickupUpdatedMappings } from './pickup_updated_mappings';

export type { OpenPitResponse, OpenPitParams } from './open_pit';
export { openPit, pitKeepAlive } from './open_pit';
export { openPit } from './open_pit';

export type { ReadWithPit, ReadWithPitParams } from './read_with_pit';
export { readWithPit } from './read_with_pit';
Expand All @@ -69,9 +70,6 @@ import type { IncompatibleMappingException } from './wait_for_reindex_task';

export { waitForReindexTask } from './wait_for_reindex_task';

export type { VerifyReindexParams } from './verify_reindex';
export { verifyReindex } from './verify_reindex';

import type { AliasNotFound, RemoveIndexNotAConcreteIndex } from './update_aliases';

export type { AliasAction, UpdateAliasesParams } from './update_aliases';
Expand Down Expand Up @@ -114,7 +112,7 @@ export type {
} from './calculate_exclude_filters';
export { calculateExcludeFilters } from './calculate_exclude_filters';

export { pickupUpdatedMappings, waitForTask, waitForIndexStatusYellow };
export { pickupUpdatedMappings, waitForTask, waitForIndexStatus };
export type { AliasNotFound, RemoveIndexNotAConcreteIndex };

export interface IndexNotFound {
Expand Down Expand Up @@ -153,6 +151,7 @@ export interface ActionErrorTypeMap {
request_entity_too_large_exception: RequestEntityTooLargeException;
unknown_docs_found: UnknownDocsFound;
incompatible_cluster_routing_allocation: IncompatibleClusterRoutingAllocation;
index_not_green_timeout: IndexNotGreenTimeout;
index_not_yellow_timeout: IndexNotYellowTimeout;
cluster_shard_limit_exceeded: ClusterShardLimitExceeded;
}
Expand Down
Loading

0 comments on commit fb88527

Please sign in to comment.