Skip to content

Commit

Permalink
splitting up stop and delete ations and expose that in the EntityClient
Browse files Browse the repository at this point in the history
  • Loading branch information
tiansivive committed Sep 5, 2024
1 parent df53ef2 commit fe4b833
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { ElasticsearchClient, Logger } from '@kbn/core/server';
import { EntityDefinition } from '@kbn/entities-schema';

import {
generateHistoryTransformId,
generateHistoryBackfillTransformId,
generateLatestTransformId,
} from './helpers/generate_component_id';
import { retryTransientEsErrors } from './helpers/retry';

export async function deleteHistoryTransform(
esClient: ElasticsearchClient,
definition: EntityDefinition,
logger: Logger
) {
try {
const historyTransformId = generateHistoryTransformId(definition);
await retryTransientEsErrors(
() =>
esClient.transform.deleteTransform(
{ transform_id: historyTransformId, force: true },
{ ignore: [404] }
),
{ logger }
);
} catch (e) {
logger.error(`Cannot delete history transform [${definition.id}]: ${e}`);
throw e;
}
}

export async function deleteHistoryBackfillTransform(
esClient: ElasticsearchClient,
definition: EntityDefinition,
logger: Logger
) {
try {
const historyBackfillTransformId = generateHistoryBackfillTransformId(definition);
await retryTransientEsErrors(
() =>
esClient.transform.deleteTransform(
{ transform_id: historyBackfillTransformId, force: true },
{ ignore: [404] }
),
{ logger }
);
} catch (e) {
logger.error(`Cannot delete history backfill transform [${definition.id}]: ${e}`);
throw e;
}
}

export async function deleteLatestTransform(
esClient: ElasticsearchClient,
definition: EntityDefinition,
logger: Logger
) {
try {
const latestTransformId = generateLatestTransformId(definition);
await retryTransientEsErrors(
() =>
esClient.transform.deleteTransform(
{ transform_id: latestTransformId, force: true },
{ ignore: [404] }
),
{ logger }
);
} catch (e) {
logger.error(`Cannot delete latest transform [${definition.id}]`);
throw e;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ import {
saveEntityDefinition,
updateEntityDefinition,
} from './save_entity_definition';
import {
stopAndDeleteHistoryBackfillTransform,
stopAndDeleteHistoryTransform,
stopAndDeleteLatestTransform,
} from './stop_and_delete_transform';

import { isBackfillEnabled } from './helpers/is_backfill_enabled';
import { deleteTemplate, upsertTemplate } from '../manage_index_templates';
import { generateEntitiesLatestIndexTemplateConfig } from './templates/entities_latest_template';
Expand All @@ -45,6 +41,16 @@ import { EntityIdConflict } from './errors/entity_id_conflict_error';
import { EntityDefinitionNotFound } from './errors/entity_not_found';
import { EntityDefinitionWithState } from './types';
import { mergeEntityDefinitionUpdate } from './helpers/merge_definition_update';
import {
stopHistoryBackfillTransform,
stopHistoryTransform,
stopLatestTransform,
} from './stop_transforms';
import {
deleteHistoryBackfillTransform,
deleteHistoryTransform,
deleteLatestTransform,
} from './delete_transforms';

export interface InstallDefinitionParams {
esClient: ElasticsearchClient;
Expand Down Expand Up @@ -91,14 +97,7 @@ export async function installEntityDefinition({
return await install({ esClient, soClient, logger, definition: entityDefinition });
} catch (e) {
logger.error(`Failed to install entity definition ${definition.id}: ${e}`);

await Promise.all([
stopAndDeleteHistoryTransform(esClient, definition, logger),
isBackfillEnabled(definition)
? stopAndDeleteHistoryBackfillTransform(esClient, definition, logger)
: Promise.resolve(),
stopAndDeleteLatestTransform(esClient, definition, logger),
]);
await stopAndDeleteTransforms(esClient, definition, logger);

await Promise.all([
deleteHistoryIngestPipeline(esClient, definition, logger),
Expand Down Expand Up @@ -253,13 +252,7 @@ export async function reinstallEntityDefinition({
});

logger.debug(`Deleting transforms for definition ${definition.id} v${definition.version}`);
await Promise.all([
stopAndDeleteHistoryTransform(esClient, definition, logger),
isBackfillEnabled(definition)
? stopAndDeleteHistoryBackfillTransform(esClient, definition, logger)
: Promise.resolve(),
stopAndDeleteLatestTransform(esClient, definition, logger),
]);
await stopAndDeleteTransforms(esClient, definition, logger);

return await install({
esClient,
Expand Down Expand Up @@ -308,3 +301,25 @@ const shouldReinstallBuiltinDefinition = (

return timedOut || outdated || failed || partial;
};

const stopAndDeleteTransforms = async (
esClient: ElasticsearchClient,
definition: EntityDefinition,
logger: Logger
) => {
await Promise.all([
stopHistoryTransform(esClient, definition, logger),
isBackfillEnabled(definition)
? stopHistoryBackfillTransform(esClient, definition, logger)
: Promise.resolve(),
stopLatestTransform(esClient, definition, logger),
]);

await Promise.all([
deleteHistoryTransform(esClient, definition, logger),
isBackfillEnabled(definition)
? deleteHistoryBackfillTransform(esClient, definition, logger)
: Promise.resolve(),
deleteLatestTransform(esClient, definition, logger),
]);
};
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@

import { ElasticsearchClient, Logger } from '@kbn/core/server';
import { EntityDefinition } from '@kbn/entities-schema';

import {
generateHistoryBackfillTransformId,
generateHistoryTransformId,
generateHistoryBackfillTransformId,
generateLatestTransformId,
} from './helpers/generate_component_id';
import { retryTransientEsErrors } from './helpers/retry';

export async function stopAndDeleteHistoryTransform(
export async function stopHistoryTransform(
esClient: ElasticsearchClient,
definition: EntityDefinition,
logger: Logger
Expand All @@ -29,21 +30,13 @@ export async function stopAndDeleteHistoryTransform(
),
{ logger }
);
await retryTransientEsErrors(
() =>
esClient.transform.deleteTransform(
{ transform_id: historyTransformId, force: true },
{ ignore: [404] }
),
{ logger }
);
} catch (e) {
logger.error(`Cannot stop or delete history transform [${definition.id}]: ${e}`);
logger.error(`Cannot stop history transform [${definition.id}]: ${e}`);
throw e;
}
}

export async function stopAndDeleteHistoryBackfillTransform(
export async function stopHistoryBackfillTransform(
esClient: ElasticsearchClient,
definition: EntityDefinition,
logger: Logger
Expand All @@ -58,21 +51,13 @@ export async function stopAndDeleteHistoryBackfillTransform(
),
{ logger }
);
await retryTransientEsErrors(
() =>
esClient.transform.deleteTransform(
{ transform_id: historyBackfillTransformId, force: true },
{ ignore: [404] }
),
{ logger }
);
} catch (e) {
logger.error(`Cannot stop or delete history backfill transform [${definition.id}]: ${e}`);
logger.error(`Cannot stop history backfill transform [${definition.id}]: ${e}`);
throw e;
}
}

export async function stopAndDeleteLatestTransform(
export async function stopLatestTransform(
esClient: ElasticsearchClient,
definition: EntityDefinition,
logger: Logger
Expand All @@ -87,16 +72,8 @@ export async function stopAndDeleteLatestTransform(
),
{ logger }
);
await retryTransientEsErrors(
() =>
esClient.transform.deleteTransform(
{ transform_id: latestTransformId, force: true },
{ ignore: [404] }
),
{ logger }
);
} catch (e) {
logger.error(`Cannot stop or delete latest transform [${definition.id}]`);
logger.error(`Cannot stop latest transform [${definition.id}]`);
throw e;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,26 @@ import { deleteEntityDefinition } from './delete_entity_definition';
import { deleteIndices } from './delete_index';
import { deleteHistoryIngestPipeline, deleteLatestIngestPipeline } from './delete_ingest_pipeline';
import { findEntityDefinitions } from './find_entity_definition';
import {
stopAndDeleteHistoryBackfillTransform,
stopAndDeleteHistoryTransform,
stopAndDeleteLatestTransform,
} from './stop_and_delete_transform';

import { isBackfillEnabled } from './helpers/is_backfill_enabled';
import {
generateHistoryIndexTemplateId,
generateLatestIndexTemplateId,
} from './helpers/generate_component_id';
import { deleteTemplate } from '../manage_index_templates';

import {
stopHistoryTransform,
stopLatestTransform,
stopHistoryBackfillTransform,
} from './stop_transforms';

import {
deleteHistoryTransform,
deleteLatestTransform,
deleteHistoryBackfillTransform,
} from './delete_transforms';

export async function uninstallEntityDefinition({
definition,
esClient,
Expand All @@ -39,10 +47,17 @@ export async function uninstallEntityDefinition({
deleteData?: boolean;
}) {
await Promise.all([
stopAndDeleteHistoryTransform(esClient, definition, logger),
stopAndDeleteLatestTransform(esClient, definition, logger),
stopHistoryTransform(esClient, definition, logger),
stopLatestTransform(esClient, definition, logger),
isBackfillEnabled(definition)
? stopHistoryBackfillTransform(esClient, definition, logger)
: Promise.resolve(),
]);
await Promise.all([
deleteHistoryTransform(esClient, definition, logger),
deleteLatestTransform(esClient, definition, logger),
isBackfillEnabled(definition)
? stopAndDeleteHistoryBackfillTransform(esClient, definition, logger)
? deleteHistoryBackfillTransform(esClient, definition, logger)
: Promise.resolve(),
]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ import { findEntityDefinitions } from './entities/find_entity_definition';
import { uninstallEntityDefinition } from './entities/uninstall_entity_definition';
import { EntityDefinitionNotFound } from './entities/errors/entity_not_found';

import {
stopHistoryTransform,
stopLatestTransform,
stopHistoryBackfillTransform,
} from './entities/stop_transforms';

import { isBackfillEnabled } from './entities/helpers/is_backfill_enabled';

export class EntityClient {
constructor(
private options: {
Expand Down Expand Up @@ -78,4 +86,18 @@ export class EntityClient {

return { definitions };
}

async startTransforms(definition: EntityDefinition) {
return startTransform(this.options.esClient, definition, this.options.logger);
}

async stopTransforms(definition: EntityDefinition) {
return Promise.all([
stopHistoryTransform(this.options.esClient, definition, this.options.logger),
stopLatestTransform(this.options.esClient, definition, this.options.logger),
isBackfillEnabled(definition)
? stopHistoryBackfillTransform(this.options.esClient, definition, this.options.logger)
: Promise.resolve(),
]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ import { SetupRouteOptions } from '../types';
import { EntitySecurityException } from '../../lib/entities/errors/entity_security_exception';
import { InvalidTransformError } from '../../lib/entities/errors/invalid_transform_error';
import { readEntityDefinition } from '../../lib/entities/read_entity_definition';
import {
stopAndDeleteHistoryBackfillTransform,
stopAndDeleteHistoryTransform,
stopAndDeleteLatestTransform,
} from '../../lib/entities/stop_and_delete_transform';

import {
deleteHistoryIngestPipeline,
deleteLatestIngestPipeline,
Expand All @@ -33,6 +29,16 @@ import {
import { startTransform } from '../../lib/entities/start_transform';
import { EntityDefinitionNotFound } from '../../lib/entities/errors/entity_not_found';
import { isBackfillEnabled } from '../../lib/entities/helpers/is_backfill_enabled';
import {
deleteHistoryBackfillTransform,
deleteHistoryTransform,
deleteLatestTransform,
} from '../../lib/entities/delete_transforms';
import {
stopHistoryBackfillTransform,
stopHistoryTransform,
stopLatestTransform,
} from '../../lib/entities/stop_transforms';

export function resetEntityDefinitionRoute<T extends RequestHandlerContext>({
router,
Expand All @@ -53,11 +59,18 @@ export function resetEntityDefinitionRoute<T extends RequestHandlerContext>({
const definition = await readEntityDefinition(soClient, req.params.id, logger);

// Delete the transform and ingest pipeline
await stopAndDeleteHistoryTransform(esClient, definition, logger);
await stopHistoryTransform(esClient, definition, logger);
await deleteHistoryTransform(esClient, definition, logger);

if (isBackfillEnabled(definition)) {
await stopAndDeleteHistoryBackfillTransform(esClient, definition, logger);
await stopHistoryBackfillTransform(esClient, definition, logger);
await deleteHistoryBackfillTransform(esClient, definition, logger);
// await stopAndDeleteHistoryBackfillTransform(esClient, definition, logger);
}
await stopAndDeleteLatestTransform(esClient, definition, logger);

await stopLatestTransform(esClient, definition, logger);
await deleteLatestTransform(esClient, definition, logger);

await deleteHistoryIngestPipeline(esClient, definition, logger);
await deleteLatestIngestPipeline(esClient, definition, logger);
await deleteIndices(esClient, definition, logger);
Expand Down

0 comments on commit fe4b833

Please sign in to comment.