Skip to content

Commit

Permalink
[Fleet] added back batch exec for update tags (#148618)
Browse files Browse the repository at this point in the history
## Summary

Closes #148233

Fixing issue of bulk update tags not working with the new agent status
runtime field.
Refactored update tags to use batching again, resolving 10k agent ids at
a time and `updateByQuery` on batches.
Works locally with dummy agents, has to be tested on cloud with horde to
simulate conflicts.


### Checklist

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
juliaElastic and kibanamachine authored Jan 24, 2023
1 parent ca815a7 commit ccecb25
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 201 deletions.
2 changes: 1 addition & 1 deletion x-pack/plugins/fleet/server/routes/agent/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ export const getAgentsHandler: RequestHandler<
kuery: request.query.kuery,
sortField: request.query.sortField,
sortOrder: request.query.sortOrder,
getTotalInactive: true,
getTotalInactive: request.query.showInactive,
});

const { total, page, perPage, totalInactive = 0 } = agentRes;
Expand Down
24 changes: 11 additions & 13 deletions x-pack/plugins/fleet/server/services/agents/action_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/

import { v4 as uuidv4 } from 'uuid';
import type { SortResults } from '@elastic/elasticsearch/lib/api/types';
import type { ElasticsearchClient, SavedObjectsClientContract } from '@kbn/core/server';
import { withSpan } from '@kbn/apm-utils';

Expand All @@ -21,8 +20,8 @@ import { SO_SEARCH_LIMIT } from '../../../common/constants';
import { getAgentActions } from './actions';
import { closePointInTime, getAgentsByKuery } from './crud';
import type { BulkActionsResolver } from './bulk_actions_resolver';

export const MAX_RETRY_COUNT = 5;
import type { RetryParams } from './retry_helper';
import { getRetryParams, MAX_RETRY_COUNT } from './retry_helper';

export interface ActionParams {
kuery: string;
Expand All @@ -34,13 +33,6 @@ export interface ActionParams {
[key: string]: any;
}

export interface RetryParams {
pitId: string;
searchAfter?: SortResults;
retryCount?: number;
taskId?: string;
}

export abstract class ActionRunner {
protected esClient: ElasticsearchClient;
protected soClient: SavedObjectsClientContract;
Expand Down Expand Up @@ -79,7 +71,9 @@ export abstract class ActionRunner {
appContextService
.getLogger()
.info(
`Running action asynchronously, actionId: ${this.actionParams.actionId}, total agents: ${this.actionParams.total}`
`Running action asynchronously, actionId: ${this.actionParams.actionId}${
this.actionParams.total ? ', total agents:' + this.actionParams.total : ''
}`
);

if (!this.bulkActionsResolver) {
Expand Down Expand Up @@ -153,10 +147,12 @@ export abstract class ActionRunner {
this.actionParams.actionId!,
this.getTaskType() + ':check'
);
const retryParams: RetryParams = getRetryParams(this.getTaskType(), this.retryParams);

return await this.bulkActionsResolver!.run(
this.actionParams,
{
...this.retryParams,
...retryParams,
retryCount: 1,
},
this.getTaskType(),
Expand Down Expand Up @@ -233,7 +229,9 @@ export abstract class ActionRunner {
}
}

await closePointInTime(this.esClient, pitId!);
if (pitId) {
await closePointInTime(this.esClient, pitId!);
}

appContextService
.getLogger()
Expand Down
14 changes: 14 additions & 0 deletions x-pack/plugins/fleet/server/services/agents/bulk_action_types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export enum BulkActionTaskType {
REASSIGN_RETRY = 'fleet:reassign_action:retry',
UNENROLL_RETRY = 'fleet:unenroll_action:retry',
UPGRADE_RETRY = 'fleet:upgrade_action:retry',
UPDATE_AGENT_TAGS_RETRY = 'fleet:update_agent_tags:retry',
REQUEST_DIAGNOSTICS_RETRY = 'fleet:request_diagnostics:retry',
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,11 @@ import { ReassignActionRunner } from './reassign_action_runner';
import { UpgradeActionRunner } from './upgrade_action_runner';
import { UpdateAgentTagsActionRunner } from './update_agent_tags_action_runner';
import { UnenrollActionRunner } from './unenroll_action_runner';
import type { ActionParams, RetryParams } from './action_runner';
import type { ActionParams } from './action_runner';
import { RequestDiagnosticsActionRunner } from './request_diagnostics_action_runner';

export enum BulkActionTaskType {
REASSIGN_RETRY = 'fleet:reassign_action:retry',
UNENROLL_RETRY = 'fleet:unenroll_action:retry',
UPGRADE_RETRY = 'fleet:upgrade_action:retry',
UPDATE_AGENT_TAGS_RETRY = 'fleet:update_agent_tags:retry',
REQUEST_DIAGNOSTICS_RETRY = 'fleet:request_diagnostics:retry',
}
import type { RetryParams } from './retry_helper';
import { getRetryParams } from './retry_helper';
import { BulkActionTaskType } from './bulk_action_types';

/**
* Create and run retry tasks of agent bulk actions
Expand Down Expand Up @@ -114,11 +109,7 @@ export class BulkActionsResolver {
scope: ['fleet'],
state: {},
params: { actionParams, retryParams },
runAt:
runAt ??
moment(new Date())
.add(Math.pow(3, retryParams.retryCount ?? 1), 's')
.toDate(),
runAt: runAt ?? moment(new Date()).add(3, 's').toDate(),
});
appContextService.getLogger().info('Scheduling task ' + taskId);
return taskId;
Expand Down Expand Up @@ -146,7 +137,10 @@ export function createRetryTask(

const { esClient, soClient } = await getDeps();

const retryParams = taskInstance.params.retryParams;
const retryParams: RetryParams = getRetryParams(
taskInstance.taskType,
taskInstance.params.retryParams
);

appContextService
.getLogger()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { ActionRunner } from './action_runner';
import { bulkUpdateAgents } from './crud';
import { createErrorActionResults, createAgentAction } from './actions';
import { getHostedPolicies, isHostedAgent } from './hosted_agent';
import { BulkActionTaskType } from './bulk_actions_resolver';
import { BulkActionTaskType } from './bulk_action_types';

export class ReassignActionRunner extends ActionRunner {
protected async processAgents(agents: Agent[]): Promise<{ actionId: string }> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import type { Agent } from '../../types';

import { ActionRunner } from './action_runner';
import { createAgentAction } from './actions';
import { BulkActionTaskType } from './bulk_actions_resolver';
import { BulkActionTaskType } from './bulk_action_types';

export class RequestDiagnosticsActionRunner extends ActionRunner {
protected async processAgents(agents: Agent[]): Promise<{ actionId: string }> {
Expand Down
30 changes: 30 additions & 0 deletions x-pack/plugins/fleet/server/services/agents/retry_helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { SortResults } from '@elastic/elasticsearch/lib/api/types';

import { BulkActionTaskType } from './bulk_action_types';

export const MAX_RETRY_COUNT = 20;

export interface RetryParams {
pitId?: string;
searchAfter?: SortResults;
retryCount?: number;
taskId?: string;
}

export function getRetryParams(taskType: string, retryParams: RetryParams): RetryParams {
// update tags will retry with tags filter
return taskType === BulkActionTaskType.UPDATE_AGENT_TAGS_RETRY
? {
...retryParams,
pitId: undefined,
searchAfter: undefined,
}
: retryParams;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import {
getUnenrollAgentActions,
} from './actions';
import { getHostedPolicies, isHostedAgent } from './hosted_agent';
import { BulkActionTaskType } from './bulk_actions_resolver';
import { BulkActionTaskType } from './bulk_action_types';

export class UnenrollActionRunner extends ActionRunner {
protected async processAgents(agents: Agent[]): Promise<{ actionId: string }> {
Expand Down
Loading

0 comments on commit ccecb25

Please sign in to comment.