diff --git a/x-pack/plugins/fleet/server/mocks/index.ts b/x-pack/plugins/fleet/server/mocks/index.ts index fb6dd7d075cea5..857882c57525fe 100644 --- a/x-pack/plugins/fleet/server/mocks/index.ts +++ b/x-pack/plugins/fleet/server/mocks/index.ts @@ -31,6 +31,8 @@ import { packageServiceMock } from '../services/epm/package_service.mock'; import type { UninstallTokenServiceInterface } from '../services/security/uninstall_token_service'; import type { MessageSigningServiceInterface } from '../services/security'; +import { PackagePolicyMocks } from './package_policy.mocks'; + // Export all mocks from artifacts export * from '../services/artifacts/mocks'; @@ -40,6 +42,8 @@ export * from '../services/files/mocks'; // export all mocks from fleet actions client export * from '../services/actions/mocks'; +export * from './package_policy.mocks'; + export interface MockedFleetAppContext extends FleetAppContext { elasticsearch: ReturnType; data: ReturnType; @@ -144,6 +148,22 @@ export const createPackagePolicyServiceMock = (): jest.Mocked { + return { + async *[Symbol.asyncIterator]() { + yield Promise.resolve([PackagePolicyMocks.generatePackagePolicy({ id: '111' })]); + yield Promise.resolve([PackagePolicyMocks.generatePackagePolicy({ id: '222' })]); + }, + }; + }), + fetchAllItemIds: jest.fn((..._) => { + return { + async *[Symbol.asyncIterator]() { + yield Promise.resolve(['111']); + yield Promise.resolve(['222']); + }, + }; + }), }; }; diff --git a/x-pack/plugins/fleet/server/mocks/package_policy.mocks.ts b/x-pack/plugins/fleet/server/mocks/package_policy.mocks.ts new file mode 100644 index 00000000000000..a159917cb5e175 --- /dev/null +++ b/x-pack/plugins/fleet/server/mocks/package_policy.mocks.ts @@ -0,0 +1,109 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { SavedObjectsFindResponse } from '@kbn/core-saved-objects-api-server'; + +import type { SavedObjectsFindResult } from '@kbn/core-saved-objects-api-server'; + +import { mapPackagePolicySavedObjectToPackagePolicy } from '../services/package_policies'; + +import type { PackagePolicy } from '../../common'; +import { PACKAGE_POLICY_SAVED_OBJECT_TYPE } from '../../common'; + +import type { PackagePolicySOAttributes } from '../types'; + +const generatePackagePolicySOAttributesMock = ( + overrides: Partial = {} +): PackagePolicySOAttributes => { + return { + name: `Package Policy 1`, + description: 'Policy for things', + created_at: '2024-01-24T15:21:13.389Z', + created_by: 'elastic', + updated_at: '2024-01-25T15:21:13.389Z', + updated_by: 'user-a', + policy_id: '444-555-666', + enabled: true, + inputs: [], + namespace: 'default', + package: { + name: 'endpoint', + title: 'Elastic Endpoint', + version: '1.0.0', + }, + revision: 1, + is_managed: false, + secret_references: [], + vars: {}, + elasticsearch: { + privileges: { + cluster: [], + }, + }, + agents: 2, + + ...overrides, + }; +}; + +const generatePackagePolicyMock = (overrides: Partial = {}) => { + return { + ...mapPackagePolicySavedObjectToPackagePolicy(generatePackagePolicySavedObjectMock()), + ...overrides, + }; +}; + +const generatePackagePolicySavedObjectMock = ( + soAttributes: PackagePolicySOAttributes = generatePackagePolicySOAttributesMock() +): SavedObjectsFindResult => { + return { + score: 1, + id: 'so-123', + type: PACKAGE_POLICY_SAVED_OBJECT_TYPE, + version: 'abc', + created_at: soAttributes.created_at, + updated_at: soAttributes.updated_at, + attributes: soAttributes, + references: [], + sort: ['created_at'], + }; +}; + +const generatePackagePolicySavedObjectFindResponseMock = ( + soResults?: PackagePolicySOAttributes[] +): SavedObjectsFindResponse => { + const soList = soResults ?? [ + generatePackagePolicySOAttributesMock(), + generatePackagePolicySOAttributesMock(), + ]; + + return { + saved_objects: soList.map((soAttributes) => { + return { + score: 1, + id: 'so-123', + type: PACKAGE_POLICY_SAVED_OBJECT_TYPE, + version: 'abc', + created_at: soAttributes.created_at, + updated_at: soAttributes.updated_at, + attributes: soAttributes, + references: [], + sort: ['created_at'], + }; + }), + total: soList.length, + per_page: 10, + page: 1, + pit_id: 'pit-id-1', + }; +}; + +export const PackagePolicyMocks = Object.freeze({ + generatePackagePolicySOAttributes: generatePackagePolicySOAttributesMock, + generatePackagePolicySavedObjectFindResponse: generatePackagePolicySavedObjectFindResponseMock, + generatePackagePolicy: generatePackagePolicyMock, +}); diff --git a/x-pack/plugins/fleet/server/services/artifacts/artifacts.test.ts b/x-pack/plugins/fleet/server/services/artifacts/artifacts.test.ts index f3b332a5930fc3..782b044a84697b 100644 --- a/x-pack/plugins/fleet/server/services/artifacts/artifacts.test.ts +++ b/x-pack/plugins/fleet/server/services/artifacts/artifacts.test.ts @@ -11,6 +11,8 @@ import { errors } from '@elastic/elasticsearch'; import type { TransportResult } from '@elastic/elasticsearch'; +import { set } from '@kbn/safer-lodash-set'; + import { FLEET_SERVER_ARTIFACTS_INDEX } from '../../../common'; import { ArtifactsElasticsearchError } from '../../errors'; @@ -33,12 +35,14 @@ import { createArtifact, deleteArtifact, encodeArtifactContent, + fetchAllArtifacts, generateArtifactContentHash, getArtifact, listArtifacts, } from './artifacts'; import type { NewArtifact } from './types'; +import type { FetchAllArtifactsOptions } from './types'; describe('When using the artifacts services', () => { let esClientMock: ReturnType; @@ -324,8 +328,28 @@ describe('When using the artifacts services', () => { newArtifact, ]); - expect(responseErrors).toEqual([new Error('error')]); - expect(artifacts).toBeUndefined(); + expect(responseErrors).toEqual([ + new Error( + 'Create of artifact id [undefined] returned: result [undefined], status [400], reason [{"reason":"error"}]' + ), + ]); + expect(artifacts).toEqual([ + { + body: 'eJyrVkrNKynKTC1WsoqOrQUAJxkFKQ==', + compressionAlgorithm: 'zlib', + created: expect.any(String), + decodedSha256: 'd801aa1fb', + decodedSize: 14, + encodedSha256: 'd29238d40', + encodedSize: 22, + encryptionAlgorithm: 'none', + id: 'endpoint:trustlist-v1-d801aa1fb', + identifier: 'trustlist-v1', + packageName: 'endpoint', + relative_url: '/api/fleet/artifacts/trustlist-v1/d801aa1fb', + type: 'trustlist', + }, + ]); }); }); @@ -488,4 +512,113 @@ describe('When using the artifacts services', () => { }); }); }); + + describe('and calling `fetchAll()`', () => { + beforeEach(() => { + esClientMock.search + .mockResolvedValueOnce(generateArtifactEsSearchResultHitsMock()) + .mockResolvedValueOnce(generateArtifactEsSearchResultHitsMock()) + .mockResolvedValueOnce(set(generateArtifactEsSearchResultHitsMock(), 'hits.hits', [])); + }); + + it('should return an iterator', async () => { + expect(fetchAllArtifacts(esClientMock)).toEqual({ + [Symbol.asyncIterator]: expect.any(Function), + }); + }); + + it('should provide artifacts on each iteration', async () => { + for await (const artifacts of fetchAllArtifacts(esClientMock)) { + expect(artifacts[0]).toEqual({ + body: expect.anything(), + compressionAlgorithm: expect.anything(), + created: expect.anything(), + decodedSha256: expect.anything(), + decodedSize: expect.anything(), + encodedSha256: expect.anything(), + encodedSize: expect.anything(), + encryptionAlgorithm: expect.anything(), + id: expect.anything(), + identifier: expect.anything(), + packageName: expect.anything(), + relative_url: expect.anything(), + type: expect.anything(), + }); + } + + expect(esClientMock.search).toHaveBeenCalledTimes(3); + }); + + it('should use defaults if no `options` were provided', async () => { + for await (const artifacts of fetchAllArtifacts(esClientMock)) { + expect(artifacts.length).toBeGreaterThan(0); + } + + expect(esClientMock.search).toHaveBeenLastCalledWith( + expect.objectContaining({ + q: '', + size: 1000, + sort: [{ created: { order: 'asc' } }], + _source_excludes: undefined, + }) + ); + }); + + it('should use custom options when provided', async () => { + const options: FetchAllArtifactsOptions = { + kuery: 'foo: something', + sortOrder: 'desc', + perPage: 500, + sortField: 'someField', + includeArtifactBody: false, + }; + + for await (const artifacts of fetchAllArtifacts(esClientMock, options)) { + expect(artifacts.length).toBeGreaterThan(0); + } + + expect(esClientMock.search).toHaveBeenCalledWith( + expect.objectContaining({ + q: options.kuery, + size: options.perPage, + sort: [{ [options.sortField!]: { order: options.sortOrder } }], + _source_excludes: 'body', + }) + ); + }); + + it('should set `done` to true if loop `break`s out', async () => { + const iterator = fetchAllArtifacts(esClientMock); + + for await (const _ of iterator) { + break; + } + + await expect(iterator[Symbol.asyncIterator]().next()).resolves.toEqual({ + done: true, + value: expect.any(Array), + }); + + expect(esClientMock.search).toHaveBeenCalledTimes(1); + }); + + it('should handle throwing in loop by setting `done` to `true`', async () => { + const iterator = fetchAllArtifacts(esClientMock); + + try { + for await (const _ of iterator) { + throw new Error('test'); + } + } catch (e) { + expect(e); // just to silence eslint + } + + await expect(iterator[Symbol.asyncIterator]().next()).resolves.toEqual({ + done: true, + value: expect.any(Array), + }); + + expect(esClientMock.search).toHaveBeenCalledTimes(1); + }); + }); }); diff --git a/x-pack/plugins/fleet/server/services/artifacts/artifacts.ts b/x-pack/plugins/fleet/server/services/artifacts/artifacts.ts index 5516ab6f70e23a..43cf3f745cc6c1 100644 --- a/x-pack/plugins/fleet/server/services/artifacts/artifacts.ts +++ b/x-pack/plugins/fleet/server/services/artifacts/artifacts.ts @@ -15,6 +15,8 @@ import { isEmpty, sortBy } from 'lodash'; import type { ElasticsearchClient } from '@kbn/core/server'; +import { createEsSearchIterable } from '../utils/create_es_search_iterable'; + import type { ListResult } from '../../../common/types'; import { FLEET_SERVER_ARTIFACTS_INDEX } from '../../../common'; @@ -34,6 +36,7 @@ import type { ArtifactsClientCreateOptions, ListArtifactsProps, NewArtifact, + FetchAllArtifactsOptions, } from './types'; import { esSearchHitToArtifact, @@ -137,10 +140,10 @@ export const bulkCreateArtifacts = async ( artifacts, appContextService.getConfig()?.createArtifactsBulkBatchSize ); - const logger = appContextService.getLogger(); const nonConflictErrors = []; logger.debug(`Number of batches generated for fleet artifacts: ${batches.length}`); + for (let batchN = 0; batchN < batches.length; batchN++) { logger.debug( `Creating artifacts for batch ${batchN + 1} with ${batches[batchN].length / 2} artifacts` @@ -154,12 +157,22 @@ export const bulkCreateArtifacts = async ( refresh, }) ); + // Track errors of the bulk create action if (res.errors) { nonConflictErrors.push( ...res.items.reduce((acc, item) => { - if (item.create?.status !== 409) { - acc.push(new Error(item.create?.error?.reason)); + // 409's (conflict - record already exists) are ignored since the artifact already exists + if (item.create && item.create.status !== 409) { + acc.push( + new Error( + `Create of artifact id [${item.create._id}] returned: result [${ + item.create.result + }], status [${item.create.status}], reason [${JSON.stringify( + item.create?.error || '' + )}]` + ) + ); } return acc; }, []) @@ -167,11 +180,6 @@ export const bulkCreateArtifacts = async ( } } - // If any non conflict error, it returns only the errors - if (nonConflictErrors.length > 0) { - return { errors: nonConflictErrors }; - } - // Use non sorted artifacts array to preserve the artifacts order in the response const nonSortedEsArtifactsResponse: Artifact[] = artifacts.map((artifact) => { return esSearchHitToArtifact({ @@ -182,6 +190,7 @@ export const bulkCreateArtifacts = async ( return { artifacts: nonSortedEsArtifactsResponse, + errors: nonConflictErrors.length ? nonConflictErrors : undefined, }; }; @@ -281,3 +290,66 @@ export const encodeArtifactContent = async ( return encodedArtifact; }; + +/** + * Returns an iterator that loops through all the artifacts stored in the index + * + * @param esClient + * @param options + * + * @example + * + * async () => { + * for await (const value of fetchAllArtifactsIterator()) { + * // process page of data here + * } + * } + */ +export const fetchAllArtifacts = ( + esClient: ElasticsearchClient, + options: FetchAllArtifactsOptions = {} +): AsyncIterable => { + const { kuery = '', perPage = 1000, sortField, sortOrder, includeArtifactBody = true } = options; + + return createEsSearchIterable({ + esClient, + searchRequest: { + index: FLEET_SERVER_ARTIFACTS_INDEX, + rest_total_hits_as_int: true, + track_total_hits: false, + q: kuery, + size: perPage, + sort: [ + { + // MUST have a sort field and sort order + [sortField || 'created']: { + order: sortOrder || 'asc', + }, + }, + ], + _source_excludes: includeArtifactBody ? undefined : 'body', + }, + resultsMapper: (data): Artifact[] => { + return data.hits.hits.map((hit) => { + // @ts-expect-error @elastic/elasticsearch _source is optional + const artifact = esSearchHitToArtifact(hit); + + // If not body attribute is included, still create the property in the object (since the + // return type is `Artifact` and `body` is required), but throw an error is caller attempts + // to still access it. + if (!includeArtifactBody) { + Object.defineProperty(artifact, 'body', { + enumerable: false, + get(): string { + throw new Error( + `'body' attribute not included due to request to 'fetchAllArtifacts()' having options 'includeArtifactBody' set to 'false'` + ); + }, + }); + } + + return artifact; + }); + }, + }); +}; diff --git a/x-pack/plugins/fleet/server/services/artifacts/client.ts b/x-pack/plugins/fleet/server/services/artifacts/client.ts index 7ba2452e83fe74..0b40a7acdcc8d5 100644 --- a/x-pack/plugins/fleet/server/services/artifacts/client.ts +++ b/x-pack/plugins/fleet/server/services/artifacts/client.ts @@ -17,6 +17,7 @@ import type { ArtifactsClientInterface, NewArtifact, ListArtifactsProps, + FetchAllArtifactsOptions, } from './types'; import { relativeDownloadUrlFromArtifact, uniqueIdFromId } from './mappings'; @@ -29,6 +30,7 @@ import { listArtifacts, bulkCreateArtifacts, bulkDeleteArtifacts, + fetchAllArtifacts, } from './artifacts'; /** @@ -49,6 +51,15 @@ export class FleetArtifactsClient implements ArtifactsClientInterface { return artifact; } + /** + * Creates a `kuery` string using the provided value on input that is bound to the integration package + * @param kuery + * @private + */ + private buildFilter(kuery: string): string { + return `(package_name: "${this.packageName}")${kuery ? ` AND ${kuery}` : ''}`; + } + async getArtifact(id: string): Promise { const artifact = await getArtifact(this.esClient, id); return artifact ? this.validate(artifact) : undefined; @@ -119,20 +130,37 @@ export class FleetArtifactsClient implements ArtifactsClientInterface { } /** - * Get a list of artifacts. - * NOTE that when using the `kuery` filtering param, that all filters property names should - * match the internal attribute names of the index + * Get a list of artifacts. A few things to note: + * - if wanting to get ALL artifacts, consider using instead the `fetchAll()` method instead + * as it will property return data past the 10k ES limitation + * - when using the `kuery` filtering param, all filters property names should match the + * internal attribute names in the index */ async listArtifacts({ kuery, ...options }: ListArtifactsProps = {}): Promise< ListResult > { - // All filtering for artifacts should be bound to the `packageName`, so we insert - // that into the KQL value and use `AND` to add the defined `kuery` (if any) to it. - const filter = `(package_name: "${this.packageName}")${kuery ? ` AND ${kuery}` : ''}`; - return listArtifacts(this.esClient, { ...options, - kuery: filter, + kuery: this.buildFilter(kuery), + }); + } + + /** + * Returns an `AsyncIterable` object that can be used to iterate over all artifacts + * + * @param options + * + * @example + * async () => { + * for await (const artifacts of fleetArtifactsClient.fetchAll()) { + * // artifacts === first page of items + * } + * } + */ + fetchAll({ kuery, ...options }: FetchAllArtifactsOptions = {}): AsyncIterable { + return fetchAllArtifacts(this.esClient, { + ...options, + kuery: this.buildFilter(kuery), }); } diff --git a/x-pack/plugins/fleet/server/services/artifacts/mocks.ts b/x-pack/plugins/fleet/server/services/artifacts/mocks.ts index dc831558cb7bb2..4e5d8c93f06436 100644 --- a/x-pack/plugins/fleet/server/services/artifacts/mocks.ts +++ b/x-pack/plugins/fleet/server/services/artifacts/mocks.ts @@ -44,6 +44,34 @@ export const createArtifactsClientMock = (): jest.Mocked { + return createFetchAllArtifactsIterableMock(); + }), + }; +}; + +export const createFetchAllArtifactsIterableMock = (artifactPages: Artifact[][] = []) => { + const totalPagesOfResults = artifactPages.length; + let nextResults = 0; + + return { + [Symbol.asyncIterator]() { + return { + async next() { + return { + value: artifactPages[nextResults++] ?? [], + done: nextResults > totalPagesOfResults, + }; + }, + + async return() { + return { + value: [], + done: true, + }; + }, + }; + }, }; }; @@ -100,6 +128,7 @@ export const generateArtifactEsGetSingleHitMock = ( _version: 1, _score: 1, _source, + sort: ['abc'], }; }; diff --git a/x-pack/plugins/fleet/server/services/artifacts/types.ts b/x-pack/plugins/fleet/server/services/artifacts/types.ts index 4b0aacd92bc20a..697815a593fdd7 100644 --- a/x-pack/plugins/fleet/server/services/artifacts/types.ts +++ b/x-pack/plugins/fleet/server/services/artifacts/types.ts @@ -72,6 +72,12 @@ export type ListArtifactsProps = Pick & { + sortField?: string | keyof ArtifactElasticsearchProperties; + /** If false, then the `body` property of the Artifact will be excluded from the results. Default is `true` */ + includeArtifactBody?: boolean; +}; + /** * The interface exposed out of Fleet's Artifact service via the client class */ @@ -93,4 +99,6 @@ export interface ArtifactsClientInterface { encodeContent(content: ArtifactsClientCreateOptions['content']): Promise; generateHash(content: string): string; + + fetchAll(options?: FetchAllArtifactsOptions): AsyncIterable; } diff --git a/x-pack/plugins/fleet/server/services/package_policies/index.ts b/x-pack/plugins/fleet/server/services/package_policies/index.ts index d0d4fa4aae8250..a7eacdc76a3a7f 100644 --- a/x-pack/plugins/fleet/server/services/package_policies/index.ts +++ b/x-pack/plugins/fleet/server/services/package_policies/index.ts @@ -7,3 +7,4 @@ export * from './experimental_datastream_features'; export * from './package_policy_name_helper'; +export * from './utils'; diff --git a/x-pack/plugins/fleet/server/services/package_policies/utils.test.ts b/x-pack/plugins/fleet/server/services/package_policies/utils.test.ts new file mode 100644 index 00000000000000..363ffe9c38fa47 --- /dev/null +++ b/x-pack/plugins/fleet/server/services/package_policies/utils.test.ts @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { PackagePolicyMocks } from '../../mocks'; + +import { mapPackagePolicySavedObjectToPackagePolicy } from './utils'; + +describe('Package Policy Utils', () => { + describe('mapPackagePolicySavedObjectToPackagePolicy()', () => { + it('should return only exposed SO properties', () => { + const soItem = + PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse().saved_objects.at(0)!; + + expect(mapPackagePolicySavedObjectToPackagePolicy(soItem)).toEqual({ + agents: 2, + created_at: '2024-01-24T15:21:13.389Z', + created_by: 'elastic', + description: 'Policy for things', + elasticsearch: { + privileges: { + cluster: [], + }, + }, + enabled: true, + id: 'so-123', + inputs: [], + is_managed: false, + name: 'Package Policy 1', + namespace: 'default', + package: { + name: 'endpoint', + title: 'Elastic Endpoint', + version: '1.0.0', + }, + policy_id: '444-555-666', + revision: 1, + secret_references: [], + updated_at: '2024-01-25T15:21:13.389Z', + updated_by: 'user-a', + vars: {}, + version: 'abc', + }); + }); + }); +}); diff --git a/x-pack/plugins/fleet/server/services/package_policies/utils.ts b/x-pack/plugins/fleet/server/services/package_policies/utils.ts new file mode 100644 index 00000000000000..309db211bbf142 --- /dev/null +++ b/x-pack/plugins/fleet/server/services/package_policies/utils.ts @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { SavedObject } from '@kbn/core-saved-objects-common/src/server_types'; + +import type { PackagePolicy, PackagePolicySOAttributes } from '../../types'; + +export const mapPackagePolicySavedObjectToPackagePolicy = ({ + /* eslint-disable @typescript-eslint/naming-convention */ + id, + version, + attributes: { + name, + description, + namespace, + enabled, + is_managed, + policy_id, + // `package` is a reserved keyword + package: packageInfo, + inputs, + vars, + elasticsearch, + agents, + revision, + secret_references, + updated_at, + updated_by, + created_at, + created_by, + /* eslint-enable @typescript-eslint/naming-convention */ + }, +}: SavedObject): PackagePolicy => { + return { + id, + name, + description, + namespace, + enabled, + is_managed, + policy_id, + package: packageInfo, + inputs, + vars, + elasticsearch, + version, + agents, + revision, + secret_references, + updated_at, + updated_by, + created_at, + created_by, + }; +}; diff --git a/x-pack/plugins/fleet/server/services/package_policy.test.ts b/x-pack/plugins/fleet/server/services/package_policy.test.ts index 24483be93a9f5b..cc605900c3a585 100644 --- a/x-pack/plugins/fleet/server/services/package_policy.test.ts +++ b/x-pack/plugins/fleet/server/services/package_policy.test.ts @@ -19,6 +19,8 @@ import type { } from '@kbn/core/server'; import { SavedObjectsErrorHelpers } from '@kbn/core/server'; +import { PackagePolicyMocks } from '../mocks/package_policy.mocks'; + import type { PackageInfo, PackagePolicySOAttributes, @@ -53,6 +55,8 @@ import { import { PACKAGE_POLICY_SAVED_OBJECT_TYPE } from '../constants'; +import { mapPackagePolicySavedObjectToPackagePolicy } from './package_policies'; + import { preconfigurePackageInputs, updatePackageInputs, @@ -4918,6 +4922,149 @@ describe('Package policy service', () => { ).rejects.toEqual(new FleetError('Package notinstalled is not installed')); }); }); + + describe('fetchAllItemIds()', () => { + let soClientMock: ReturnType; + + beforeEach(() => { + soClientMock = savedObjectsClientMock.create(); + + soClientMock.find + .mockResolvedValueOnce(PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse()) + .mockResolvedValueOnce(PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse()) + .mockResolvedValueOnce( + Object.assign(PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse(), { + saved_objects: [], + }) + ); + }); + + it('should return an iterator', async () => { + expect(packagePolicyService.fetchAllItemIds(soClientMock)).toEqual({ + [Symbol.asyncIterator]: expect.any(Function), + }); + }); + + it('should provide item ids on every iteration', async () => { + for await (const ids of packagePolicyService.fetchAllItemIds(soClientMock)) { + expect(ids).toEqual(['so-123', 'so-123']); + } + + expect(soClientMock.find).toHaveBeenCalledTimes(3); + }); + + it('should use default options', async () => { + for await (const ids of packagePolicyService.fetchAllItemIds(soClientMock)) { + expect(ids); + } + + expect(soClientMock.find).toHaveBeenCalledWith( + expect.objectContaining({ + type: PACKAGE_POLICY_SAVED_OBJECT_TYPE, + perPage: 1000, + sortField: 'created_at', + sortOrder: 'asc', + fields: [], + filter: undefined, + }) + ); + }); + + it('should use custom options when defined', async () => { + for await (const ids of packagePolicyService.fetchAllItemIds(soClientMock, { + perPage: 13, + kuery: 'one=two', + })) { + expect(ids); + } + + expect(soClientMock.find).toHaveBeenCalledWith( + expect.objectContaining({ + type: PACKAGE_POLICY_SAVED_OBJECT_TYPE, + perPage: 13, + sortField: 'created_at', + sortOrder: 'asc', + fields: [], + filter: 'one=two', + }) + ); + }); + }); + + describe('fetchAllItems()', () => { + let soClientMock: ReturnType; + + beforeEach(() => { + soClientMock = savedObjectsClientMock.create(); + + soClientMock.find + .mockResolvedValueOnce(PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse()) + .mockResolvedValueOnce(PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse()) + .mockResolvedValueOnce( + Object.assign(PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse(), { + saved_objects: [], + }) + ); + }); + + it('should return an iterator', async () => { + expect(packagePolicyService.fetchAllItems(soClientMock)).toEqual({ + [Symbol.asyncIterator]: expect.any(Function), + }); + }); + + it('should provide items on every iteration', async () => { + for await (const items of packagePolicyService.fetchAllItems(soClientMock)) { + expect(items).toEqual( + PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse().saved_objects.map( + (soItem) => { + return mapPackagePolicySavedObjectToPackagePolicy(soItem); + } + ) + ); + } + + expect(soClientMock.find).toHaveBeenCalledTimes(3); + }); + + it('should use default options', async () => { + for await (const ids of packagePolicyService.fetchAllItemIds(soClientMock)) { + expect(ids); + } + + expect(soClientMock.find).toHaveBeenCalledWith( + expect.objectContaining({ + type: PACKAGE_POLICY_SAVED_OBJECT_TYPE, + perPage: 1000, + sortField: 'created_at', + sortOrder: 'asc', + fields: [], + filter: undefined, + }) + ); + }); + + it('should use custom options when defined', async () => { + for await (const ids of packagePolicyService.fetchAllItems(soClientMock, { + kuery: 'one=two', + perPage: 12, + sortOrder: 'desc', + sortField: 'updated_by', + })) { + expect(ids); + } + + expect(soClientMock.find).toHaveBeenCalledWith( + expect.objectContaining({ + type: PACKAGE_POLICY_SAVED_OBJECT_TYPE, + perPage: 12, + sortField: 'updated_by', + sortOrder: 'desc', + filter: 'one=two', + }) + ); + }); + }); }); describe('getUpgradeDryRunDiff', () => { diff --git a/x-pack/plugins/fleet/server/services/package_policy.ts b/x-pack/plugins/fleet/server/services/package_policy.ts index e89ac0160f62c7..45753540af2565 100644 --- a/x-pack/plugins/fleet/server/services/package_policy.ts +++ b/x-pack/plugins/fleet/server/services/package_policy.ts @@ -95,6 +95,8 @@ import type { } from '../types'; import type { ExternalCallback } from '..'; +import { createSoFindIterable } from './utils/create_so_find_iterable'; + import type { FleetAuthzRouteConfig } from './security'; import { getAuthzFromRequest, doesNotHaveRequiredFleetAuthz } from './security'; @@ -109,9 +111,16 @@ import { appContextService } from '.'; import { removeOldAssets } from './epm/packages/cleanup'; import type { PackageUpdateEvent, UpdateEventType } from './upgrade_sender'; import { sendTelemetryEvents } from './upgrade_sender'; -import { handleExperimentalDatastreamFeatureOptIn } from './package_policies'; +import { + handleExperimentalDatastreamFeatureOptIn, + mapPackagePolicySavedObjectToPackagePolicy, +} from './package_policies'; import { updateDatastreamExperimentalFeatures } from './epm/packages/update'; -import type { PackagePolicyClient, PackagePolicyService } from './package_policy_service'; +import type { + PackagePolicyClient, + PackagePolicyClientFetchAllItemsOptions, + PackagePolicyService, +} from './package_policy_service'; import { installAssetsForInputPackagePolicy } from './epm/packages/install'; import { auditLoggingService } from './audit_logging'; import { @@ -122,6 +131,7 @@ import { } from './secrets'; import { getPackageAssetsMap } from './epm/packages/get'; import { validateOutputForNewPackagePolicy } from './agent_policies/outputs_helpers'; +import type { PackagePolicyClientFetchAllItemIdsOptions } from './package_policy_service'; export type InputsOverride = Partial & { vars?: Array; @@ -1886,6 +1896,60 @@ class PackagePolicyClientImpl implements PackagePolicyClient { } } } + + fetchAllItemIds( + soClient: SavedObjectsClientContract, + { perPage = 1000, kuery }: PackagePolicyClientFetchAllItemIdsOptions = {} + ): AsyncIterable { + // TODO:PT Question for fleet team: do I need to `auditLoggingService.writeCustomSoAuditLog()` here? Its only IDs + + return createSoFindIterable<{}>({ + soClient, + findRequest: { + type: SAVED_OBJECT_TYPE, + perPage, + sortField: 'created_at', + sortOrder: 'asc', + fields: [], + filter: kuery ? normalizeKuery(SAVED_OBJECT_TYPE, kuery) : undefined, + }, + resultsMapper: (data) => { + return data.saved_objects.map((packagePolicySO) => packagePolicySO.id); + }, + }); + } + + fetchAllItems( + soClient: SavedObjectsClientContract, + { + perPage = 1000, + kuery, + sortOrder = 'asc', + sortField = 'created_at', + }: PackagePolicyClientFetchAllItemsOptions = {} + ): AsyncIterable { + return createSoFindIterable({ + soClient, + findRequest: { + type: SAVED_OBJECT_TYPE, + sortField, + sortOrder, + perPage, + filter: kuery ? normalizeKuery(SAVED_OBJECT_TYPE, kuery) : undefined, + }, + resultsMapper(data) { + return data.saved_objects.map((packagePolicySO) => { + auditLoggingService.writeCustomSoAuditLog({ + action: 'find', + id: packagePolicySO.id, + savedObjectType: PACKAGE_POLICY_SAVED_OBJECT_TYPE, + }); + + return mapPackagePolicySavedObjectToPackagePolicy(packagePolicySO); + }); + }, + }); + } } export class PackagePolicyServiceImpl diff --git a/x-pack/plugins/fleet/server/services/package_policy_service.ts b/x-pack/plugins/fleet/server/services/package_policy_service.ts index 9519cafbc6a736..de960c44b7879e 100644 --- a/x-pack/plugins/fleet/server/services/package_policy_service.ts +++ b/x-pack/plugins/fleet/server/services/package_policy_service.ts @@ -213,4 +213,31 @@ export interface PackagePolicyClient { packageInfo: PackageInfo; experimentalDataStreamFeatures: ExperimentalDataStreamFeature[]; }>; + + /** + * Returns an `AsyncIterable` for retrieving all integration policy IDs + * @param soClient + * @param options + */ + fetchAllItemIds( + soClient: SavedObjectsClientContract, + options?: PackagePolicyClientFetchAllItemIdsOptions + ): AsyncIterable; + + /** + * Returns an `AsyncIterable` for retrieving all integration policies + * @param soClient + * @param options + */ + fetchAllItems( + soClient: SavedObjectsClientContract, + options?: PackagePolicyClientFetchAllItemsOptions + ): AsyncIterable; } + +export type PackagePolicyClientFetchAllItemIdsOptions = Pick; + +export type PackagePolicyClientFetchAllItemsOptions = Pick< + ListWithKuery, + 'perPage' | 'kuery' | 'sortField' | 'sortOrder' +>; diff --git a/x-pack/plugins/fleet/server/services/utils/create_es_search_iterable.ts b/x-pack/plugins/fleet/server/services/utils/create_es_search_iterable.ts new file mode 100644 index 00000000000000..ae4cb9551bc8c5 --- /dev/null +++ b/x-pack/plugins/fleet/server/services/utils/create_es_search_iterable.ts @@ -0,0 +1,165 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; + +import type * as estypes from '@kbn/es-types'; + +import type { SearchRequest, SearchResponse } from '@elastic/elasticsearch/lib/api/types'; + +export interface CreateEsSearchIterableOptions { + esClient: ElasticsearchClient; + searchRequest: Omit & + Pick, 'sort' | 'index'>; + /** + * An optional callback for mapping the results retrieved from ES. If defined, the iterator + * `value` will be set to the data returned by this mapping function. + * + * @param data + */ + resultsMapper?: (data: SearchResponse) => any; + /** If a Point in Time should be used while executing the search. Defaults to `true` */ + usePointInTime?: boolean; +} + +export type InferEsSearchIteratorResultValue = + CreateEsSearchIterableOptions['resultsMapper'] extends undefined + ? SearchResponse + : ReturnType>['resultsMapper']>; + +/** + * Creates an `AsyncIterable` that can be used to iterate (ex. via `for..await..of`) over all the data + * matching the search query. The search request to ES will use `search_after`, thus can iterate over + * datasets above 10k items as well. + * + * @param options + * + * @example + * + * const yourFn = async () => { + * const dataIterable = createEsSearchIterable({ + * esClient, + * searchRequest: { + * index: 'some-index', + * sort: [ + * { + * created: { order: 'asc' } + * } + * ] + * } + * }); + * + * for await (const data of dataIterable) { + * // data === your search results + * } + * } + */ +export const createEsSearchIterable = ({ + esClient, + searchRequest: { size = 1000, index, ...searchOptions }, + resultsMapper, + usePointInTime = true, +}: CreateEsSearchIterableOptions): AsyncIterable< + InferEsSearchIteratorResultValue +> => { + const keepAliveValue = '5m'; + let done = false; + let value: SearchResponse; + let searchAfterValue: estypes.SearchHit['sort'] | undefined; + let pointInTime: Promise<{ id: string }> = usePointInTime + ? esClient.openPointInTime({ + index, + ignore_unavailable: true, + keep_alive: keepAliveValue, + }) + : Promise.resolve({ id: '' }); + + const createIteratorResult = (): IteratorResult> => { + return { done, value }; + }; + + const setValue = (searchResponse: SearchResponse): void => { + value = resultsMapper ? resultsMapper(searchResponse) : searchResponse; + }; + + const setDone = async (): Promise => { + done = true; + + if (usePointInTime) { + const pitId = (await pointInTime).id; + + if (pitId) { + await esClient.closePointInTime({ id: pitId }); + } + } + }; + + const fetchData = async () => { + const pitId = (await pointInTime).id; + + const searchResult = await esClient + .search({ + ...searchOptions, + size, + ...(usePointInTime + ? { + pit: { + id: pitId, + keep_alive: keepAliveValue, + }, + } + : { index }), + search_after: searchAfterValue, + }) + .catch((e) => { + Error.captureStackTrace(e); + throw e; + }); + + const searchHits = searchResult.hits.hits; + const lastSearchHit = searchHits[searchHits.length - 1]; + + if (searchHits.length === 0) { + await setDone(); + return; + } + + searchAfterValue = lastSearchHit.sort; + pointInTime = Promise.resolve({ id: searchResult.pit_id ?? '' }); + setValue(searchResult); + + // If (for some reason) we don't have a `searchAfterValue`, + // then throw an error, or else we'll keep looping forever + if (!searchAfterValue) { + await setDone(); + throw new Error( + `Unable to store 'search_after' value. Last 'SearchHit' did not include a 'sort' property \n(did you forget to set the 'sort' attribute on your SearchRequest?)':\n${JSON.stringify( + lastSearchHit + )}` + ); + } + }; + + return { + [Symbol.asyncIterator]() { + return { + async next() { + if (!done) { + await fetchData(); + } + + return createIteratorResult(); + }, + + async return() { + done = true; + return createIteratorResult(); + }, + }; + }, + }; +}; diff --git a/x-pack/plugins/fleet/server/services/utils/create_so_find_iterable.ts b/x-pack/plugins/fleet/server/services/utils/create_so_find_iterable.ts new file mode 100644 index 00000000000000..6b17b3ba980402 --- /dev/null +++ b/x-pack/plugins/fleet/server/services/utils/create_so_find_iterable.ts @@ -0,0 +1,142 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { + SavedObjectsClientContract, + SavedObjectsFindOptions, + SavedObjectsFindResponse, + SavedObjectsFindResult, +} from '@kbn/core-saved-objects-api-server'; + +export interface CreateSoFindIterableOptions { + soClient: SavedObjectsClientContract; + findRequest: Omit & + // sortField is required + Pick, 'sortField'>; + /** + * An optional callback for mapping the results retrieved from SavedObjects. If defined, the iterator + * `value` will be set to the data returned by this mapping function. + * + * @param data + */ + resultsMapper?: (data: SavedObjectsFindResponse) => any; + /** If a Point in Time should be used while executing the search. Defaults to `true` */ + usePointInTime?: boolean; +} + +export type InferSoFindIteratorResultValue = + CreateSoFindIterableOptions['resultsMapper'] extends undefined + ? SavedObjectsFindResponse + : ReturnType>['resultsMapper']>; + +/** + * Creates an `AsyncIterable` that can be used to iterate (ex. via `for..await..of`) over all the data + * matching the search query. The search request to Saved Object will use `searchAfter`, thus can iterate over + * datasets above 10k items as well. + * + * @param options + */ +export const createSoFindIterable = ({ + soClient, + findRequest: { perPage = 1000, ...findOptions }, + resultsMapper, + usePointInTime = true, +}: CreateSoFindIterableOptions): AsyncIterable< + InferSoFindIteratorResultValue +> => { + const keepAliveValue = '5m'; + let done = false; + let value: SavedObjectsFindResponse; + let searchAfterValue: SavedObjectsFindResult['sort'] | undefined; + let pointInTime: Promise<{ id: string }> = usePointInTime + ? soClient.openPointInTimeForType(findOptions.type, { keepAlive: keepAliveValue }) + : Promise.resolve({ id: '' }); + + const setValue = (findResponse: SavedObjectsFindResponse): void => { + value = resultsMapper ? resultsMapper(findResponse) : findResponse; + }; + + const setDone = async (): Promise => { + done = true; + + if (usePointInTime) { + const pitId = (await pointInTime).id; + + if (pitId) { + await soClient.closePointInTime(pitId); + } + } + }; + + const fetchData = async () => { + const findResult = await soClient + .find({ + ...findOptions, + ...(usePointInTime + ? { + pit: { + id: (await pointInTime).id, + keepAlive: keepAliveValue, + }, + } + : {}), + perPage, + searchAfter: searchAfterValue, + }) + .catch((e) => { + Error.captureStackTrace(e); + throw e; + }); + + const soItems = findResult.saved_objects; + const lastSearchHit = soItems[soItems.length - 1]; + + if (soItems.length === 0) { + setValue(findResult); + await setDone(); + return; + } + + searchAfterValue = lastSearchHit.sort; + pointInTime = Promise.resolve({ id: findResult.pit_id ?? '' }); + setValue(findResult); + + // If (for some reason) we don't have a `searchAfterValue`, + // then throw an error, or else we'll keep looping forever + if (!searchAfterValue) { + await setDone(); + throw new Error( + `Unable to store 'searchAfter' value. Last 'SavedObjectsFindResult' did not include a 'sort' property \n(did you forget to set the 'sortField' attribute on your SavedObjectsFindOptions?)':\n${JSON.stringify( + lastSearchHit + )}` + ); + } + }; + + const createIteratorResult = (): IteratorResult> => { + return { done, value }; + }; + + return { + [Symbol.asyncIterator]() { + return { + async next() { + if (!done) { + await fetchData(); + } + + return createIteratorResult(); + }, + + async return() { + done = true; + return createIteratorResult(); + }, + }; + }, + }; +}; diff --git a/x-pack/plugins/security_solution/common/endpoint/errors.ts b/x-pack/plugins/security_solution/common/endpoint/errors.ts index 495afaa126e2b6..2cd7fd931583b5 100644 --- a/x-pack/plugins/security_solution/common/endpoint/errors.ts +++ b/x-pack/plugins/security_solution/common/endpoint/errors.ts @@ -14,5 +14,9 @@ export class EndpointError extends Error { super(message); // For debugging - capture name of subclasses this.name = this.constructor.name; + + if (meta instanceof Error) { + this.stack += `\n----- original error -----\n${meta.stack}`; + } } } diff --git a/x-pack/plugins/security_solution/server/config.mock.ts b/x-pack/plugins/security_solution/server/config.mock.ts index 855cec11ab16eb..9f61523dbbe8b7 100644 --- a/x-pack/plugins/security_solution/server/config.mock.ts +++ b/x-pack/plugins/security_solution/server/config.mock.ts @@ -22,6 +22,7 @@ export const createMockConfig = (): ConfigType => { maxTimelineImportPayloadBytes: 10485760, enableExperimental, packagerTaskInterval: '60s', + packagerTaskTimeout: '5m', packagerTaskPackagePolicyUpdateBatchSize: 10, prebuiltRulesPackageVersion: '', alertMergeStrategy: 'missingFields', diff --git a/x-pack/plugins/security_solution/server/config.ts b/x-pack/plugins/security_solution/server/config.ts index adc8fbfb1174c5..4cb9ff479fff11 100644 --- a/x-pack/plugins/security_solution/server/config.ts +++ b/x-pack/plugins/security_solution/server/config.ts @@ -92,14 +92,20 @@ export const configSchema = schema.object({ }), /** - * Artifacts Configuration + * Endpoint Artifacts Configuration: the interval between runs of the task that builds the + * artifacts and associated manifest. */ packagerTaskInterval: schema.string({ defaultValue: '60s' }), + /** + * Endpoint Artifacts Configuration: timeout value for how long the task should run. + */ + packagerTaskTimeout: schema.string({ defaultValue: '20m' }), + /** * Artifacts Configuration for package policy update concurrency */ - packagerTaskPackagePolicyUpdateBatchSize: schema.number({ defaultValue: 10, max: 50, min: 1 }), + packagerTaskPackagePolicyUpdateBatchSize: schema.number({ defaultValue: 25, max: 50, min: 1 }), /** * For internal use. Specify which version of the Detection Rules fleet package to install diff --git a/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.test.ts b/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.test.ts index a0ad1f9712be1e..a1988cb7a13aef 100644 --- a/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.test.ts +++ b/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.test.ts @@ -19,7 +19,7 @@ import { getMockArtifacts } from './mocks'; import { InvalidInternalManifestError } from '../../services/artifacts/errors'; import { loggingSystemMock } from '@kbn/core/server/mocks'; -describe('task', () => { +describe('Endpoint artifact packager task', () => { const MOCK_TASK_INSTANCE = { id: `${ManifestTaskConstants.TYPE}:1.0.0`, runAt: new Date(), @@ -170,7 +170,7 @@ describe('task', () => { await runTask(manifestManager); - expect(logger.info).toHaveBeenCalledWith('recovering from invalid internal manifest'); + expect(logger.warn).toHaveBeenCalledWith('recovering from invalid internal manifest'); expect(logger.error).toHaveBeenNthCalledWith(1, expect.any(InvalidInternalManifestError)); }); diff --git a/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.ts b/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.ts index dafa13141a0c6b..8547eb6dca11c0 100644 --- a/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.ts +++ b/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.ts @@ -22,6 +22,10 @@ import { wrapErrorIfNeeded } from '../../utils'; import { EndpointError } from '../../../../common/endpoint/errors'; export const ManifestTaskConstants = { + /** + * No longer used. Timeout value now comes from `xpack.securitySolution.packagerTaskTimeout` + * @deprecated + */ TIMEOUT: '1m', TYPE: 'endpoint:user-artifact-packager', VERSION: '1.0.0', @@ -44,22 +48,37 @@ export class ManifestTask { constructor(setupContract: ManifestTaskSetupContract) { this.endpointAppContext = setupContract.endpointAppContext; this.logger = this.endpointAppContext.logFactory.get(this.getTaskId()); + const { packagerTaskInterval, packagerTaskTimeout, packagerTaskPackagePolicyUpdateBatchSize } = + this.endpointAppContext.serverConfig; + + this.logger.info( + `Registering ${ManifestTaskConstants.TYPE} task with timeout of [${packagerTaskTimeout}], interval of [${packagerTaskInterval}] and policy update batch size of [${packagerTaskPackagePolicyUpdateBatchSize}]` + ); setupContract.taskManager.registerTaskDefinitions({ [ManifestTaskConstants.TYPE]: { title: 'Security Solution Endpoint Exceptions Handler', - timeout: ManifestTaskConstants.TIMEOUT, + timeout: packagerTaskTimeout, createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => { return { run: async () => { - const taskInterval = (await this.endpointAppContext.config()).packagerTaskInterval; - const startTime = new Date().getTime(); + const taskInterval = packagerTaskInterval; + const startTime = new Date(); + + this.logger.info(`Started. Checking for changes to endpoint artifacts`); + await this.runTask(taskInstance.id); + const endTime = new Date().getTime(); - this.logger.debug( - `${ManifestTaskConstants.TYPE} task run took ${endTime - startTime}ms` + + this.logger.info( + `Complete. Task run took ${ + endTime - startTime.getTime() + }ms [ stated: ${startTime.toISOString()} ]` ); + const nextRun = new Date(); + if (taskInterval.endsWith('s')) { const seconds = parseInt(taskInterval.slice(0, -1), 10); nextRun.setSeconds(nextRun.getSeconds() + seconds); @@ -70,12 +89,20 @@ export class ManifestTask { this.logger.error(`Invalid task interval: ${taskInterval}`); return; } + return { state: {}, runAt: nextRun, }; }, - cancel: async () => {}, + cancel: async () => { + // TODO:PT add support for AbortController to Task manager + this.logger.warn( + 'Task run was canceled. Packaging of endpoint artifacts may be taking longer due to the ' + + 'amount of policies/artifacts. Consider increasing the `xpack.securitySolution.packagerTaskTimeout` ' + + 'server configuration setting if this continues' + ); + }, }; }, }, @@ -91,7 +118,7 @@ export class ManifestTask { taskType: ManifestTaskConstants.TYPE, scope: ['securitySolution'], schedule: { - interval: (await this.endpointAppContext.config()).packagerTaskInterval, + interval: this.endpointAppContext.serverConfig.packagerTaskInterval, }, state: {}, params: { version: ManifestTaskConstants.VERSION }, @@ -127,23 +154,29 @@ export class ManifestTask { } try { - let oldManifest: Manifest | null; + let oldManifest: Manifest | null = null; try { // Last manifest we computed, which was saved to ES oldManifest = await manifestManager.getLastComputedManifest(); } catch (e) { + this.logger.error(e); + // Lets recover from a failure in getting the internal manifest map by creating an empty default manifest if (e instanceof InvalidInternalManifestError) { - this.logger.error(e); - this.logger.info('recovering from invalid internal manifest'); + this.logger.warn('recovering from invalid internal manifest'); oldManifest = ManifestManager.createDefaultManifest(); + } else { + this.logger.error( + `unable to recover from error while attempting to retrieve last computed manifest` + ); + + return; } } - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - if (oldManifest! == null) { - this.logger.debug('Last computed manifest not available yet'); + if (!oldManifest) { + this.logger.info('Last computed manifest not available yet'); return; } @@ -152,10 +185,17 @@ export class ManifestTask { const diff = newManifest.diff(oldManifest); + this.logger.debug( + `New -vs- old manifest diff counts: ${Object.entries(diff).map( + ([diffType, diffItems]) => `${diffType}: ${diffItems.length}` + )}` + ); + const persistErrors = await manifestManager.pushArtifacts( diff.additions as InternalArtifactCompleteSchema[], newManifest ); + if (persistErrors.length) { reportErrors(this.logger, persistErrors); throw new Error('Unable to persist new artifacts.'); @@ -167,8 +207,9 @@ export class ManifestTask { await manifestManager.commit(newManifest); } - // Try dispatching to ingest-manager package policies + // Dispatch updates to Fleet integration policies with new manifest info const dispatchErrors = await manifestManager.tryDispatch(newManifest); + if (dispatchErrors.length) { reportErrors(this.logger, dispatchErrors); throw new Error('Error dispatching manifest.'); @@ -178,9 +219,11 @@ export class ManifestTask { const deleteErrors = await manifestManager.deleteArtifacts( diff.removals.map((artifact) => getArtifactId(artifact)) ); + if (deleteErrors.length) { reportErrors(this.logger, deleteErrors); } + await manifestManager.cleanup(newManifest); } catch (err) { this.logger.error(wrapErrorIfNeeded(err)); diff --git a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/artifact_client.ts b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/artifact_client.ts index 3e00310a5bb640..24c04ee881b8e2 100644 --- a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/artifact_client.ts +++ b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/artifact_client.ts @@ -11,9 +11,11 @@ import type { ListArtifactsProps, } from '@kbn/fleet-plugin/server'; import type { ListResult } from '@kbn/fleet-plugin/common'; +import type { FetchAllArtifactsOptions } from '@kbn/fleet-plugin/server/services'; import type { InternalArtifactCompleteSchema } from '../../schemas/artifacts'; -export interface EndpointArtifactClientInterface { +export interface EndpointArtifactClientInterface + extends Pick { getArtifact(id: string): Promise; createArtifact(artifact: InternalArtifactCompleteSchema): Promise; @@ -67,6 +69,15 @@ export class EndpointArtifactClient implements EndpointArtifactClientInterface { return this.fleetArtifacts.listArtifacts(options); } + fetchAll({ + // Our default, unlike the Fleet service, is to NOT include the body of + // the artifact, since we really don't need it when processing all artifacts + includeArtifactBody = false, + ...options + }: FetchAllArtifactsOptions = {}): AsyncIterable { + return this.fleetArtifacts.fetchAll({ ...options, includeArtifactBody }); + } + async createArtifact( artifact: InternalArtifactCompleteSchema ): Promise { diff --git a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.test.ts b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.test.ts index 9ff37c67e613d2..1d935ccb905d0f 100644 --- a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.test.ts +++ b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.test.ts @@ -39,6 +39,10 @@ import { EndpointError } from '../../../../../common/endpoint/errors'; import type { Artifact } from '@kbn/fleet-plugin/server'; import { AppFeatureSecurityKey } from '@kbn/security-solution-features/keys'; import type { ExceptionListItemSchema } from '@kbn/securitysolution-io-ts-list-types/src/response/exception_list_item_schema'; +import { + createFetchAllArtifactsIterableMock, + generateArtifactMock, +} from '@kbn/fleet-plugin/server/services/artifacts/mocks'; const getArtifactObject = (artifact: InternalArtifactSchema) => JSON.parse(Buffer.from(artifact.body!, 'base64').toString()); @@ -76,12 +80,9 @@ describe('ManifestManager', () => { const ARTIFACT_NAME_BLOCKLISTS_WINDOWS = 'endpoint-blocklist-windows-v1'; const ARTIFACT_NAME_BLOCKLISTS_LINUX = 'endpoint-blocklist-linux-v1'; - const mockPolicyListIdsResponse = (items: string[]) => - jest.fn().mockResolvedValue({ - items, - page: 1, - per_page: 100, - total: items.length, + const getMockPolicyFetchAllItemIds = (items: string[]) => + jest.fn(async function* () { + yield items; }); let ARTIFACTS: InternalArtifactCompleteSchema[] = []; @@ -200,9 +201,7 @@ describe('ManifestManager', () => { ( manifestManagerContext.artifactClient as jest.Mocked - ).listArtifacts.mockImplementation(async () => { - return { items: ARTIFACTS as Artifact[], total: 100, page: 1, perPage: 100 }; - }); + ).fetchAll.mockReturnValue(createFetchAllArtifactsIterableMock([ARTIFACTS as Artifact[]])); const manifest = await manifestManager.getLastComputedManifest(); @@ -259,33 +258,26 @@ describe('ManifestManager', () => { ( manifestManagerContext.artifactClient as jest.Mocked - ).listArtifacts.mockImplementation(async () => { - // report the MACOS Exceptions artifact as not found - return { - items: [ + ).fetchAll.mockReturnValue( + createFetchAllArtifactsIterableMock([ + // report the MACOS Exceptions artifact as not found + [ ARTIFACT_TRUSTED_APPS_MACOS, ARTIFACT_EXCEPTIONS_WINDOWS, ARTIFACT_TRUSTED_APPS_WINDOWS, ARTIFACTS_BY_ID[ARTIFACT_ID_EXCEPTIONS_LINUX], ] as Artifact[], - total: 100, - page: 1, - perPage: 100, - }; - }); + ]) + ); const manifest = await manifestManager.getLastComputedManifest(); expect(manifest?.getAllArtifacts()).toStrictEqual(ARTIFACTS.slice(1, 5)); - expect(manifestManagerContext.logger.error).toHaveBeenCalledWith( - new InvalidInternalManifestError( - `artifact id [${ARTIFACT_ID_EXCEPTIONS_MACOS}] not found!`, - { - entry: ARTIFACTS_BY_ID[ARTIFACT_ID_EXCEPTIONS_MACOS], - action: 'removed from internal ManifestManger tracking map', - } - ) + expect(manifestManagerContext.logger.warn).toHaveBeenCalledWith( + "Missing artifacts detected! Internal artifact manifest (SavedObject version [2.0.0]) references [1] artifact IDs that don't exist.\n" + + "First 10 below (run with logging set to 'debug' to see all):\n" + + 'endpoint-exceptionlist-macos-v1-96b76a1a911662053a1562ac14c4ff1e87c2ff550d6fe52e1e0b3790526597d3' ); }); }); @@ -327,7 +319,9 @@ describe('ManifestManager', () => { const manifestManager = new ManifestManager(context); context.exceptionListClient.findExceptionListItem = mockFindExceptionListItemResponses({}); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); context.savedObjectsClient.create = jest .fn() @@ -389,7 +383,9 @@ describe('ManifestManager', () => { .mockImplementation((_type: string, object: InternalManifestSchema) => ({ attributes: object, })); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); const manifest = await manifestManager.buildNewManifest(); @@ -460,7 +456,9 @@ describe('ManifestManager', () => { context.exceptionListClient.findExceptionListItem = mockFindExceptionListItemResponses({ [ENDPOINT_LIST_ID]: { macos: [exceptionListItem] }, }); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); context.savedObjectsClient.create = jest .fn() .mockImplementation((_type: string, object: InternalManifestSchema) => ({ @@ -576,7 +574,7 @@ describe('ManifestManager', () => { .mockImplementation((_type: string, object: InternalManifestSchema) => ({ attributes: object, })); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([ + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ TEST_POLICY_ID_1, TEST_POLICY_ID_2, ]); @@ -679,7 +677,7 @@ describe('ManifestManager', () => { linux: [trustedAppListItem, trustedAppListItemPolicy2], }, }); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([ + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ TEST_POLICY_ID_1, TEST_POLICY_ID_2, ]); @@ -795,7 +793,9 @@ describe('ManifestManager', () => { .mockImplementation((_type: string, object: InternalManifestSchema) => ({ attributes: object, })); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); const manifest = await manifestManager.buildNewManifest(); @@ -878,7 +878,9 @@ describe('ManifestManager', () => { .mockImplementation((_type: string, object: InternalManifestSchema) => ({ attributes: object, })); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); const manifest = await manifestManager.buildNewManifest(); @@ -960,7 +962,9 @@ describe('ManifestManager', () => { .mockImplementation((_type: string, object: InternalManifestSchema) => ({ attributes: object, })); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); const manifest = await manifestManager.buildNewManifest(); @@ -1026,7 +1030,9 @@ describe('ManifestManager', () => { .mockImplementation((_type: string, object: InternalManifestSchema) => ({ attributes: object, })); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); const manifest = await manifestManager.buildNewManifest(); @@ -1068,7 +1074,9 @@ describe('ManifestManager', () => { .mockImplementation((_type: string, object: InternalManifestSchema) => ({ attributes: object, })); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); const manifest = await manifestManager.buildNewManifest(); @@ -1299,12 +1307,9 @@ describe('ManifestManager', () => { }); describe('tryDispatch', () => { - const mockPolicyListResponse = (items: PackagePolicy[]) => - jest.fn().mockResolvedValue({ - items, - page: 1, - per_page: 100, - total: items.length, + const getMockPolicyFetchAllItems = (items: PackagePolicy[]) => + jest.fn(async function* () { + yield items; }); test('Should not dispatch if no policies', async () => { @@ -1313,8 +1318,7 @@ describe('ManifestManager', () => { const manifest = new Manifest({ soVersion: '1.0.0' }); manifest.addEntry(ARTIFACT_EXCEPTIONS_MACOS); - - context.packagePolicyService.list = mockPolicyListResponse([]); + context.packagePolicyService.fetchAllItems = getMockPolicyFetchAllItems([]); await expect(manifestManager.tryDispatch(manifest)).resolves.toStrictEqual([]); @@ -1328,7 +1332,7 @@ describe('ManifestManager', () => { const manifest = new Manifest({ soVersion: '1.0.0' }); manifest.addEntry(ARTIFACT_EXCEPTIONS_MACOS); - context.packagePolicyService.list = mockPolicyListResponse([ + context.packagePolicyService.fetchAllItems = getMockPolicyFetchAllItems([ createPackagePolicyWithConfigMock({ id: TEST_POLICY_ID_1 }), ]); @@ -1346,7 +1350,7 @@ describe('ManifestManager', () => { const manifest = new Manifest({ soVersion: '1.0.0' }); manifest.addEntry(ARTIFACT_EXCEPTIONS_MACOS); - context.packagePolicyService.list = mockPolicyListResponse([ + context.packagePolicyService.fetchAllItems = getMockPolicyFetchAllItems([ createPackagePolicyWithConfigMock({ id: TEST_POLICY_ID_1, config: { @@ -1378,7 +1382,7 @@ describe('ManifestManager', () => { manifest.addEntry(ARTIFACT_EXCEPTIONS_WINDOWS, TEST_POLICY_ID_2); manifest.addEntry(ARTIFACT_TRUSTED_APPS_MACOS, TEST_POLICY_ID_2); - context.packagePolicyService.list = mockPolicyListResponse([ + context.packagePolicyService.fetchAllItems = getMockPolicyFetchAllItems([ createPackagePolicyWithConfigMock({ id: TEST_POLICY_ID_1, config: { @@ -1446,7 +1450,7 @@ describe('ManifestManager', () => { manifest.addEntry(ARTIFACT_EXCEPTIONS_WINDOWS, TEST_POLICY_ID_2); manifest.addEntry(ARTIFACT_TRUSTED_APPS_MACOS, TEST_POLICY_ID_2); - context.packagePolicyService.list = mockPolicyListResponse([ + context.packagePolicyService.fetchAllItems = getMockPolicyFetchAllItems([ createPackagePolicyWithConfigMock({ id: TEST_POLICY_ID_1, config: { @@ -1516,7 +1520,7 @@ describe('ManifestManager', () => { const manifest = new Manifest({ soVersion: '1.0.0', semanticVersion: '1.0.1' }); manifest.addEntry(ARTIFACT_EXCEPTIONS_MACOS); - context.packagePolicyService.list = mockPolicyListResponse([ + context.packagePolicyService.fetchAllItems = getMockPolicyFetchAllItems([ createPackagePolicyWithConfigMock({ id: TEST_POLICY_ID_1, config: { @@ -1557,8 +1561,14 @@ describe('ManifestManager', () => { const context = buildManifestManagerContextMock({}); const manifestManager = new ManifestManager(context); + (context.artifactClient.fetchAll as jest.Mock).mockReturnValue( + createFetchAllArtifactsIterableMock([[generateArtifactMock()]]) + ); + context.exceptionListClient.findExceptionListItem = mockFindExceptionListItemResponses({}); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); context.savedObjectsClient.create = jest .fn() @@ -1581,7 +1591,9 @@ describe('ManifestManager', () => { const manifestManager = new ManifestManager(context); context.exceptionListClient.findExceptionListItem = mockFindExceptionListItemResponses({}); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); context.savedObjectsClient.create = jest .fn() diff --git a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.ts b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.ts index 27f528aba27168..a1ec74bc57b09a 100644 --- a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.ts +++ b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.ts @@ -6,15 +6,17 @@ */ import semver from 'semver'; -import { chunk, isEmpty, isEqual, keyBy } from 'lodash'; +import { isEmpty, isEqual, keyBy } from 'lodash'; import type { ElasticsearchClient } from '@kbn/core/server'; import { type Logger, type SavedObjectsClientContract } from '@kbn/core/server'; import { ENDPOINT_LIST_ID, ENDPOINT_ARTIFACT_LISTS } from '@kbn/securitysolution-list-constants'; -import type { ListResult, PackagePolicy } from '@kbn/fleet-plugin/common'; +import type { PackagePolicy } from '@kbn/fleet-plugin/common'; import type { Artifact, PackagePolicyClient } from '@kbn/fleet-plugin/server'; import type { ExceptionListClient } from '@kbn/lists-plugin/server'; import type { ExceptionListItemSchema } from '@kbn/securitysolution-io-ts-list-types'; import { AppFeatureKey } from '@kbn/security-solution-features/keys'; +import { stringify } from '../../../utils/stringify'; +import { QueueProcessor } from '../../../utils/queue_processor'; import type { AppFeaturesService } from '../../../../lib/app_features_service/app_features_service'; import type { ExperimentalFeatures } from '../../../../../common'; import type { ManifestSchemaVersion } from '../../../../../common/endpoint/schema/common'; @@ -70,24 +72,6 @@ const iterateArtifactsBuildResult = ( } }; -const iterateAllListItems = async ( - pageSupplier: (page: number, perPage: number) => Promise>, - itemCallback: (items: T[]) => void -) => { - let paging = true; - let page = 1; - const perPage = 1000; - - while (paging) { - const { items, total } = await pageSupplier(page, perPage); - - itemCallback(items); - - paging = (page - 1) * perPage + items.length < total; - page++; - } -}; - export interface ManifestManagerContext { savedObjectsClient: SavedObjectsClientContract; artifactClient: EndpointArtifactClientInterface; @@ -407,7 +391,7 @@ export class ManifestManager { } /** - * Writes new artifact SOs. + * Writes new artifact to Fleet * * @param artifacts An InternalArtifactCompleteSchema array representing the artifacts. * @param newManifest A Manifest representing the new manifest @@ -418,7 +402,6 @@ export class ManifestManager { newManifest: Manifest ): Promise { const errors: Error[] = []; - const artifactsToCreate: InternalArtifactCompleteSchema[] = []; for (const artifact of artifacts) { @@ -433,28 +416,58 @@ export class ManifestManager { return errors; } + this.logger.debug(`Creating [${artifactsToCreate.length}] artifacts`); + const { artifacts: fleetArtifacts, errors: createErrors } = await this.artifactClient.bulkCreateArtifacts(artifactsToCreate); + this.logger.info(`Count of artifacts created: ${fleetArtifacts?.length ?? 0}`); + if (createErrors) { errors.push(...createErrors); } + const newArtifactsAddedToManifest: string[] = []; + const artifactsNotCreated: string[] = []; + if (fleetArtifacts) { - const fleetArtfactsByIdentifier: { [key: string]: InternalArtifactCompleteSchema } = {}; + const fleetArtifactsByIdentifier: { [key: string]: InternalArtifactCompleteSchema } = {}; + fleetArtifacts.forEach((fleetArtifact) => { - fleetArtfactsByIdentifier[getArtifactId(fleetArtifact)] = fleetArtifact; + fleetArtifactsByIdentifier[getArtifactId(fleetArtifact)] = fleetArtifact; }); + artifactsToCreate.forEach((artifact) => { const artifactId = getArtifactId(artifact); - const fleetArtifact = fleetArtfactsByIdentifier[artifactId]; + const fleetArtifact = fleetArtifactsByIdentifier[artifactId]; + + if (!fleetArtifact) { + artifactsNotCreated.push(artifactId); + + return; + } - if (!fleetArtifact) return; newManifest.replaceArtifact(fleetArtifact); - this.logger.debug(`New created artifact ${artifactId} added to the manifest`); + newArtifactsAddedToManifest.push(artifactId); }); } + if (artifactsNotCreated.length) { + this.logger.debug( + `A total of [${ + artifactsNotCreated.length + }] artifacts were not created. Prior version of the artifact will remain in manifest.\n${artifactsNotCreated.join( + '\n' + )}` + ); + } + + if (newArtifactsAddedToManifest.length !== 0) { + this.logger.debug( + `Newly created artifacts added to the manifest:\n${newArtifactsAddedToManifest.join('\n')}` + ); + } + return errors; } @@ -469,15 +482,24 @@ export class ManifestManager { if (isEmpty(artifactIds)) { return []; } + const errors = await this.artifactClient.bulkDeleteArtifacts(artifactIds); + if (!isEmpty(errors)) { return errors; } - for (const artifactId of artifactIds) { - this.logger.info(`Cleaned up artifact ${artifactId}`); + + this.logger.info(`Count of cleaned up artifacts: ${artifactIds.length}`); + + if (artifactIds.length !== 0) { + this.logger.debug(`Deleted artifacts from cleanup:\n${artifactIds.join('\n ')}`); } + return []; } catch (err) { + this.logger.error( + `Attempted to delete [${artifactIds.length}] outdated artifacts failed with: ${err.message}\n${err.stack}` + ); return [err]; } } @@ -508,22 +530,35 @@ export class ManifestManager { const fleetArtifacts = await this.listAllArtifacts(); const fleetArtifactsById = keyBy(fleetArtifacts, (artifact) => getArtifactId(artifact)); + const invalidArtifactIds: string[] = []; + // Ensure that all artifacts currently defined in the Manifest have a valid artifact in fleet, + // and remove any that does not have an actual artifact from the manifest for (const entry of manifestSo.attributes.artifacts) { const artifact = fleetArtifactsById[entry.artifactId]; if (!artifact) { - this.logger.error( - new InvalidInternalManifestError(`artifact id [${entry.artifactId}] not found!`, { - entry, - action: 'removed from internal ManifestManger tracking map', - }) - ); + invalidArtifactIds.push(entry.artifactId); } else { manifest.addEntry(artifact, entry.policyId); } } + if (invalidArtifactIds.length) { + this.logger.warn( + `Missing artifacts detected! Internal artifact manifest (SavedObject version [${ + manifestSo.version + }]) references [${ + invalidArtifactIds.length + }] artifact IDs that don't exist.\nFirst 10 below (run with logging set to 'debug' to see all):\n${invalidArtifactIds + .slice(0, 10) + .join('\n')}` + ); + this.logger.debug( + `Artifact ID references that are missing:\n${stringify(invalidArtifactIds)}` + ); + } + return manifest; } catch (error) { if (!error.output || error.output.statusCode !== 404) { @@ -569,15 +604,10 @@ export class ManifestManager { for (const result of results) { iterateArtifactsBuildResult(result, (artifact, policyId) => { - const artifactToAdd = baselineManifest.getArtifact(getArtifactId(artifact)) || artifact; - if (!internalArtifactCompleteSchema.is(artifactToAdd)) { - throw new EndpointError( - `Incomplete artifact detected: ${getArtifactId(artifactToAdd)}`, - artifactToAdd - ); - } - - manifest.addEntry(artifactToAdd, policyId); + manifest.addEntry( + baselineManifest.getArtifact(getArtifactId(artifact)) || artifact, + policyId + ); }); } @@ -592,81 +622,93 @@ export class ManifestManager { * @returns {Promise} Any errors encountered. */ public async tryDispatch(manifest: Manifest): Promise { - const allPackagePolicies: PackagePolicy[] = []; - await iterateAllListItems( - (page, perPage) => this.listEndpointPolicies(page, perPage), - (packagePoliciesBatch) => { - allPackagePolicies.push(...packagePoliciesBatch); - } - ); + const errors: Error[] = []; + const updatedPolicies: string[] = []; + const unChangedPolicies: string[] = []; + const manifestVersion = manifest.getSemanticVersion(); + const execId = Math.random().toString(32).substring(3, 8); + const policyUpdateBatchProcessor = new QueueProcessor({ + batchSize: this.packagerTaskPackagePolicyUpdateBatchSize, + logger: this.logger, + key: `tryDispatch.${execId}`, + batchHandler: async ({ data: currentBatch }) => { + const response = await this.packagePolicyService.bulkUpdate( + this.savedObjectsClient, + this.esClient, + currentBatch + ); - const packagePoliciesToUpdate: PackagePolicy[] = []; + if (!isEmpty(response.failedPolicies)) { + errors.push( + ...response.failedPolicies.map((failedPolicy) => { + if (failedPolicy.error instanceof Error) { + return failedPolicy.error; + } else { + return new Error(failedPolicy.error.message); + } + }) + ); + } - const errors: Error[] = []; - allPackagePolicies.forEach((packagePolicy) => { - const { id } = packagePolicy; - if (packagePolicy.inputs.length > 0 && packagePolicy.inputs[0].config !== undefined) { - const oldManifest = packagePolicy.inputs[0].config.artifact_manifest ?? { - value: {}, - }; - - const newManifestVersion = manifest.getSemanticVersion(); - if (semver.gt(newManifestVersion, oldManifest.value.manifest_version)) { - const serializedManifest = manifest.toPackagePolicyManifest(id); - - if (!manifestDispatchSchema.is(serializedManifest)) { - errors.push(new EndpointError(`Invalid manifest for policy ${id}`, serializedManifest)); - } else if (!manifestsEqual(serializedManifest, oldManifest.value)) { - packagePolicy.inputs[0].config.artifact_manifest = { value: serializedManifest }; - packagePoliciesToUpdate.push(packagePolicy); + if (response.updatedPolicies) { + updatedPolicies.push( + ...response.updatedPolicies.map((policy) => { + return `[${policy.id}][${policy.name}] updated with manifest version: [${manifestVersion}]`; + }) + ); + } + }, + }); + + for await (const policies of this.fetchAllPolicies()) { + for (const packagePolicy of policies) { + const { id, name } = packagePolicy; + + if (packagePolicy.inputs.length > 0 && packagePolicy.inputs[0].config !== undefined) { + const oldManifest = packagePolicy.inputs[0].config.artifact_manifest ?? { + value: {}, + }; + + const newManifestVersion = manifest.getSemanticVersion(); + + if (semver.gt(newManifestVersion, oldManifest.value.manifest_version)) { + const serializedManifest = manifest.toPackagePolicyManifest(id); + + if (!manifestDispatchSchema.is(serializedManifest)) { + errors.push( + new EndpointError(`Invalid manifest for policy ${id}`, serializedManifest) + ); + } else if (!manifestsEqual(serializedManifest, oldManifest.value)) { + packagePolicy.inputs[0].config.artifact_manifest = { value: serializedManifest }; + policyUpdateBatchProcessor.addToQueue(packagePolicy); + } else { + unChangedPolicies.push(`[${id}][${name}] No change in manifest content`); + } } else { - this.logger.debug( - `No change in manifest content for package policy: ${id}. Staying on old version` - ); + unChangedPolicies.push(`[${id}][${name}] No change in manifest version`); } } else { - this.logger.debug(`No change in manifest version for package policy: ${id}`); + errors.push( + new EndpointError(`Package Policy ${id} has no 'inputs[0].config'`, packagePolicy) + ); } - } else { - errors.push( - new EndpointError(`Package Policy ${id} has no 'inputs[0].config'`, packagePolicy) - ); } - }); + } - // Split updates in batches with batch size: packagerTaskPackagePolicyUpdateBatchSize - const updateBatches = chunk( - packagePoliciesToUpdate, - this.packagerTaskPackagePolicyUpdateBatchSize + await policyUpdateBatchProcessor.complete(); + + this.logger.info( + `Processed [${updatedPolicies.length + unChangedPolicies.length}] Policies: updated: [${ + updatedPolicies.length + }], un-changed: [${unChangedPolicies.length}]` ); - for (const currentBatch of updateBatches) { - const response = await this.packagePolicyService.bulkUpdate( - this.savedObjectsClient, - this.esClient, - currentBatch - ); + if (updatedPolicies.length) { + this.logger.debug(`Updated Policies:\n ${updatedPolicies.join('\n ')}`); + } - // Update errors - if (!isEmpty(response.failedPolicies)) { - errors.push( - ...response.failedPolicies.map((failedPolicy) => { - if (failedPolicy.error instanceof Error) { - return failedPolicy.error; - } else { - return new Error(failedPolicy.error.message); - } - }) - ); - } - // Log success updates - for (const updatedPolicy of response.updatedPolicies || []) { - this.logger.debug( - `Updated package policy ${ - updatedPolicy.id - } with manifest version ${manifest.getSemanticVersion()}` - ); - } + if (unChangedPolicies.length) { + this.logger.debug(`Un-changed Policies:\n ${unChangedPolicies.join('\n ')}`); } return errors; @@ -696,31 +738,24 @@ export class ManifestManager { this.logger.info(`Committed manifest ${manifest.getSemanticVersion()}`); } - private async listEndpointPolicies( - page: number, - perPage: number - ): Promise> { - return this.packagePolicyService.list(this.savedObjectsClient, { - page, - perPage, + private fetchAllPolicies(): AsyncIterable { + return this.packagePolicyService.fetchAllItems(this.savedObjectsClient, { kuery: 'ingest-package-policies.package.name:endpoint', }); } private async listEndpointPolicyIds(): Promise { const allPolicyIds: string[] = []; - await iterateAllListItems( - (page, perPage) => { - return this.packagePolicyService.listIds(this.savedObjectsClient, { - page, - perPage, - kuery: 'ingest-package-policies.package.name:endpoint', - }); - }, - (packagePolicyIdsBatch) => { - allPolicyIds.push(...packagePolicyIdsBatch); - } - ); + const idFetcher = this.packagePolicyService.fetchAllItemIds(this.savedObjectsClient, { + kuery: 'ingest-package-policies.package.name:endpoint', + }); + + for await (const itemIds of idFetcher) { + allPolicyIds.push(...itemIds); + } + + this.logger.debug(`Retrieved [${allPolicyIds.length}] endpoint integration policy IDs`); + return allPolicyIds; } @@ -733,70 +768,68 @@ export class ManifestManager { * @returns Artifact[] */ private async listAllArtifacts(): Promise { - const fleetArtifacts = []; - const perPage = 100; - let page = 1; + const fleetArtifacts: Artifact[] = []; + let total = 0; - let fleetArtifactsResponse = await this.artifactClient.listArtifacts({ - perPage, - page, - }); - fleetArtifacts.push(...fleetArtifactsResponse.items); - - while ( - fleetArtifactsResponse.total > fleetArtifacts.length && - !isEmpty(fleetArtifactsResponse.items) - ) { - page += 1; - fleetArtifactsResponse = await this.artifactClient.listArtifacts({ - perPage, - page, - }); - fleetArtifacts.push(...fleetArtifactsResponse.items); + for await (const artifacts of this.artifactClient.fetchAll()) { + fleetArtifacts.push(...artifacts); + total += artifacts.length; } + + this.logger.info(`Count of current stored artifacts: ${total}`); + return fleetArtifacts; } /** - * Cleanup .fleet-artifacts index if there are some orphan artifacts + * Pulls in all artifacts from Fleet and checks to ensure they are all being referenced + * by the Manifest. If any are found to not be in the current Manifest (orphan), they + * are cleaned up (deleted) */ public async cleanup(manifest: Manifest) { - try { - const fleetArtifacts = await this.listAllArtifacts(); - if (isEmpty(fleetArtifacts)) { - return; - } - - const badArtifacts = []; - const badArtifactIds = []; + const badArtifactIds: string[] = []; + const errors: string[] = []; + const artifactDeletionProcess = new QueueProcessor({ + batchSize: this.packagerTaskPackagePolicyUpdateBatchSize, + logger: this.logger, + key: 'cleanup', + batchHandler: async ({ batch, data }) => { + const deleteErrors = await this.artifactClient.bulkDeleteArtifacts(data); + + badArtifactIds.push(...data); + + if (deleteErrors.length) { + errors.push( + `Delete batch #[${batch}] with [${data.length}] items:\n${stringify(deleteErrors)}` + ); + } + }, + }); - const manifestArtifactsIds = manifest - .getAllArtifacts() - .map((artifact) => getArtifactId(artifact)); + const validArtifactIds = manifest.getAllArtifacts().map((artifact) => getArtifactId(artifact)); - for (const fleetArtifact of fleetArtifacts) { - const artifactId = getArtifactId(fleetArtifact); - const isArtifactInManifest = manifestArtifactsIds.includes(artifactId); + for await (const artifacts of this.artifactClient.fetchAll()) { + for (const artifact of artifacts) { + const artifactId = getArtifactId(artifact); + const isArtifactInManifest = validArtifactIds.includes(artifactId); if (!isArtifactInManifest) { - badArtifacts.push(fleetArtifact); - badArtifactIds.push(artifactId); + artifactDeletionProcess.addToQueue(artifactId); } } + } - if (isEmpty(badArtifacts)) { - return; - } + await artifactDeletionProcess.complete(); + if (errors.length > 0) { this.logger.error( - new EndpointError(`Cleaning up ${badArtifacts.length} orphan artifacts`, badArtifacts) + `The following errors were encountered while attempting to delete [${ + badArtifactIds.length + }] orphaned artifacts:\n${stringify(errors)}` ); - - await this.artifactClient.bulkDeleteArtifacts(badArtifactIds); - - this.logger.info(`All orphan artifacts has been removed successfully`); - } catch (error) { - this.logger.error(new EndpointError('There was an error cleaning orphan artifacts', error)); + } else if (badArtifactIds.length > 0) { + this.logger.info(`Count of orphan artifacts cleaned up: ${badArtifactIds.length}`); + this.logger.debug(`Orphan artifacts deleted from Fleet:\n${stringify(badArtifactIds)}`); } } } diff --git a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/mocks.ts b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/mocks.ts index 2acfa7b7b47945..1a1dd701e9803e 100644 --- a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/mocks.ts +++ b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/mocks.ts @@ -65,6 +65,7 @@ export const createEndpointArtifactClientMock = ( bulkDeleteArtifacts: jest.fn(async (...args) => endpointArtifactClientMocked.bulkDeleteArtifacts(...args) ), + fetchAll: jest.fn((...args) => endpointArtifactClientMocked.fetchAll(...args)), _esClient: esClient, }; }; diff --git a/x-pack/plugins/security_solution/server/endpoint/utils/queue_processor.ts b/x-pack/plugins/security_solution/server/endpoint/utils/queue_processor.ts new file mode 100644 index 00000000000000..f4f3e4ac76852f --- /dev/null +++ b/x-pack/plugins/security_solution/server/endpoint/utils/queue_processor.ts @@ -0,0 +1,152 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Logger } from '@kbn/core/server'; + +export interface QueueProcessorOptions { + batchHandler: (batch: { batch: number; data: T[] }) => Promise; + batchSize?: number; + logger?: Logger; + /** + * Used when `logger` is passed. It will be used to define the logging messages context path. + * Defaults to the name of the callback provided in `batchHandler` + */ + key?: string; +} + +/** + * Process an un-bound amount of items in batches. Each batch is process once the queued reach the + * `batchSize`, thus processing is gradually executed ensuring that data is not held in memory + * for too long. Once all items are added to the Queue, calling + * `.complete()` will ensure they are all processed. + * + * @example + * const processor = new QueueProcessor<{ id: string }>({ + * batchHandler: ({ data, batch }) => { + * // data === array of `{ id: string }` + * // batch === batch number + * } + * }); + * + * const myIdList = [ .... ]; // Array with 50 string + * + * for (const id of myIdList) { + * batchHandler.addToQueue({ id: id}); + * } + * + * await processor.complete(); + */ +export class QueueProcessor { + private readonly batchSize: number; + private readonly batchHandler: QueueProcessorOptions['batchHandler']; + private readonly logger: Logger | undefined = undefined; + + private queue: T[] = []; + private processingPromise: Promise | undefined = undefined; + private batchCount = 0; + private itemsProcessedCount = 0; + + constructor({ + batchHandler, + batchSize = 10, + logger, + key = 'QueueProcessor', + }: QueueProcessorOptions) { + if (batchSize < 1 || !Number.isFinite(batchSize)) { + throw new Error(`batchSize must be a number greater than zero`); + } + + this.batchSize = batchSize; + this.batchHandler = batchHandler; + this.logger = logger ? logger.get(key) : undefined; + } + + protected log( + message: string, + output: keyof Pick = 'info' + ): void { + if (this.logger) { + this.logger[output](message); + } + } + + protected async processQueue(all: boolean = false) { + if (this.processingPromise || this.queue.length === 0) { + return; + } + + const runThroughQueue = async () => { + let hasMoreData = true; + + while (hasMoreData) { + try { + if (all || this.queue.length >= this.batchSize) { + const batchPage = this.queue.splice(0, this.batchSize); + const batchPageSize = batchPage.length; + const remainingItemsSize = this.queue.length; + + hasMoreData = (all && remainingItemsSize > 0) || remainingItemsSize >= this.batchSize; + this.itemsProcessedCount += batchPageSize; + this.batchCount++; + + try { + this.log( + `Processing batch [${this.batchCount}] with [${batchPageSize}] items. Items remaining in queue: [${remainingItemsSize}]`, + 'debug' + ); + await this.batchHandler({ batch: this.batchCount, data: batchPage }); + } catch (err) { + this.log( + `batchHandler threw error (below). Will continue on to next batch page:\n${err}`, + 'debug' + ); + // ignore errors in the batch page processing and keep going to process others. + // callback should have handled errors that its process might throw + } + } else { + hasMoreData = false; + } + } catch (err) { + hasMoreData = false; + throw err; + } + } + }; + + this.processingPromise = runThroughQueue().finally(() => { + this.processingPromise = undefined; + }); + + return this.processingPromise; + } + + /** + * Adds an update to the queue + */ + public addToQueue(...data: T[]) { + this.queue.push(...data); + this.processQueue(); + } + + /** + * Flushes the queue and awaits processing of all remaining updates. + * + * **IMPORTANT**: Always make sure `complete()` is called to ensure no items are left in the queue + */ + public async complete(): Promise { + if (this.processingPromise) { + await this.processingPromise.finally(() => {}); + } + + await this.processQueue(true); + + this.log( + `Processed [${this.batchCount}] batches and a total of [${this.itemsProcessedCount}] items`, + 'debug' + ); + } +} diff --git a/x-pack/plugins/security_solution/server/plugin.ts b/x-pack/plugins/security_solution/server/plugin.ts index aaa104533e080d..0bc81fc0b17fd3 100644 --- a/x-pack/plugins/security_solution/server/plugin.ts +++ b/x-pack/plugins/security_solution/server/plugin.ts @@ -532,7 +532,7 @@ export class Plugin implements ISecuritySolutionPlugin { artifactClient, exceptionListClient, packagePolicyService: plugins.fleet.packagePolicyService, - logger, + logger: this.pluginContext.logger.get('ManifestManager'), experimentalFeatures: config.experimentalFeatures, packagerTaskPackagePolicyUpdateBatchSize: config.packagerTaskPackagePolicyUpdateBatchSize, esClient: core.elasticsearch.client.asInternalUser,