From 73951f9f64928d34e2957287fc0137c7fe09b982 Mon Sep 17 00:00:00 2001 From: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Date: Tue, 2 Jan 2024 19:10:31 -0500 Subject: [PATCH] [8.12] Handle content stream errors in report pre-deletion (#173792) (#174139) # Backport This will backport the following commits from `main` to `8.12`: - [Handle content stream errors in report pre-deletion (#173792)](https://github.com/elastic/kibana/pull/173792) ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sqren/backport) Co-authored-by: Tim Sullivan --- .../routes/common/jobs/get_job_routes.ts | 44 ++- .../management/integration_tests/jobs.test.ts | 262 ++++++++++-------- .../public/integration_tests/jobs.test.ts | 210 +++++++------- 3 files changed, 295 insertions(+), 221 deletions(-) diff --git a/x-pack/plugins/reporting/server/routes/common/jobs/get_job_routes.ts b/x-pack/plugins/reporting/server/routes/common/jobs/get_job_routes.ts index a88abae999be05..ca25b13990dc81 100644 --- a/x-pack/plugins/reporting/server/routes/common/jobs/get_job_routes.ts +++ b/x-pack/plugins/reporting/server/routes/common/jobs/get_job_routes.ts @@ -81,17 +81,43 @@ export const commonJobsRouteHandlerFactory = (reporting: ReportingCore) => { return jobManagementPreRouting(reporting, res, docId, user, counters, async (doc) => { const docIndex = doc.index; const stream = await getContentStream(reporting, { id: docId, index: docIndex }); - /** @note Overwriting existing content with an empty buffer to remove all the chunks. */ - await new Promise((resolve) => { - stream.end('', 'utf8', () => { - resolve(); - }); + const reportingSetup = reporting.getPluginSetupDeps(); + const logger = reportingSetup.logger.get('delete-report'); + + // An "error" event is emitted if an error is + // passed to the `stream.end` callback from + // the _final method of the ContentStream. + // This event must be handled. + stream.on('error', (err) => { + logger.error(err); }); - await jobsQuery.delete(docIndex, docId); - return res.ok({ - body: { deleted: true }, - }); + try { + // Overwriting existing content with an + // empty buffer to remove all the chunks. + await new Promise((resolve, reject) => { + stream.end('', 'utf8', (error?: Error) => { + if (error) { + // handle error that could be thrown + // from the _write method of the ContentStream + reject(error); + } else { + resolve(); + } + }); + }); + + await jobsQuery.delete(docIndex, docId); + + return res.ok({ + body: { deleted: true }, + }); + } catch (error) { + logger.error(error); + return res.customError({ + statusCode: 500, + }); + } }); }; diff --git a/x-pack/plugins/reporting/server/routes/internal/management/integration_tests/jobs.test.ts b/x-pack/plugins/reporting/server/routes/internal/management/integration_tests/jobs.test.ts index decc2300026e35..6fe086f33faa5f 100644 --- a/x-pack/plugins/reporting/server/routes/internal/management/integration_tests/jobs.test.ts +++ b/x-pack/plugins/reporting/server/routes/internal/management/integration_tests/jobs.test.ts @@ -36,7 +36,7 @@ import { registerJobInfoRoutesInternal as registerJobInfoRoutes } from '../jobs' type SetupServerReturn = Awaited>; -describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => { +describe(`Reporting Job Management Routes: Internal`, () => { const reportingSymbol = Symbol('reporting'); let server: SetupServerReturn['server']; let usageCounter: IUsageCounter; @@ -144,148 +144,148 @@ describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => { await server.stop(); }); - it('fails on malformed download IDs', async () => { - mockEsClient.search.mockResponseOnce(getHits()); - registerJobInfoRoutes(core); + describe('download report', () => { + it('fails on malformed download IDs', async () => { + mockEsClient.search.mockResponseOnce(getHits()); + registerJobInfoRoutes(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`) - .expect(400) - .then(({ body }) => - expect(body.message).toMatchInlineSnapshot( - '"[request params.docId]: value has length [1] but it must have a minimum length of [3]."' - ) - ); - }); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`) + .expect(400) + .then(({ body }) => + expect(body.message).toMatchInlineSnapshot( + '"[request params.docId]: value has length [1] but it must have a minimum length of [3]."' + ) + ); + }); - it('fails on unauthenticated users', async () => { - mockStartDeps = await createMockPluginStart( - { - licensing: { - ...licensingMock.createStart(), - license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }), + it('fails on unauthenticated users', async () => { + mockStartDeps = await createMockPluginStart( + { + licensing: { + ...licensingMock.createStart(), + license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }), + }, + security: { authc: { getCurrentUser: () => undefined } }, }, - security: { authc: { getCurrentUser: () => undefined } }, - }, - mockConfigSchema - ); - core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps); - registerJobInfoRoutes(core); + mockConfigSchema + ); + core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps); + registerJobInfoRoutes(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`) - .expect(401) - .then(({ body }) => - expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`) - ); - }); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`) + .expect(401) + .then(({ body }) => + expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`) + ); + }); - it('returns 404 if job not found', async () => { - mockEsClient.search.mockResponseOnce(getHits()); - registerJobInfoRoutes(core); + it('returns 404 if job not found', async () => { + mockEsClient.search.mockResponseOnce(getHits()); + registerJobInfoRoutes(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) - .expect(404); - }); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) + .expect(404); + }); - it('returns a 403 if not a valid job type', async () => { - mockEsClient.search.mockResponseOnce( - getHits({ - jobtype: 'invalidJobType', - payload: { title: 'invalid!' }, - }) - ); - registerJobInfoRoutes(core); + it('returns a 403 if not a valid job type', async () => { + mockEsClient.search.mockResponseOnce( + getHits({ + jobtype: 'invalidJobType', + payload: { title: 'invalid!' }, + }) + ); + registerJobInfoRoutes(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) - .expect(403); - }); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) + .expect(403); + }); - it(`returns job's info`, async () => { - mockEsClient.search.mockResponseOnce( - getHits({ - jobtype: mockJobTypeBase64Encoded, - payload: {}, // payload is irrelevant - }) - ); + it(`returns job's info`, async () => { + mockEsClient.search.mockResponseOnce( + getHits({ + jobtype: mockJobTypeBase64Encoded, + payload: {}, // payload is irrelevant + }) + ); - registerJobInfoRoutes(core); + registerJobInfoRoutes(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`) - .expect(200); - }); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`) + .expect(200); + }); - it(`returns 403 if a user cannot view a job's info`, async () => { - mockEsClient.search.mockResponseOnce( - getHits({ - jobtype: 'customForbiddenJobType', - payload: {}, // payload is irrelevant - }) - ); + it(`returns 403 if a user cannot view a job's info`, async () => { + mockEsClient.search.mockResponseOnce( + getHits({ + jobtype: 'customForbiddenJobType', + payload: {}, // payload is irrelevant + }) + ); - registerJobInfoRoutes(core); + registerJobInfoRoutes(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`) - .expect(403); - }); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`) + .expect(403); + }); - it('when a job is incomplete', async () => { - mockEsClient.search.mockResponseOnce( - getHits({ - jobtype: mockJobTypeUnencoded, - status: 'pending', - payload: { title: 'incomplete!' }, - }) - ); - registerJobInfoRoutes(core); - - await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) - .expect(503) - .expect('Content-Type', 'text/plain; charset=utf-8') - .expect('Retry-After', '30') - .then(({ text }) => expect(text).toEqual('pending')); - }); + it('when a job is incomplete', async () => { + mockEsClient.search.mockResponseOnce( + getHits({ + jobtype: mockJobTypeUnencoded, + status: 'pending', + payload: { title: 'incomplete!' }, + }) + ); + registerJobInfoRoutes(core); - it('when a job fails', async () => { - mockEsClient.search.mockResponse( - getHits({ - jobtype: mockJobTypeUnencoded, - status: 'failed', - output: { content: 'job failure message' }, - payload: { title: 'failing job!' }, - }) - ); - registerJobInfoRoutes(core); - - await server.start(); - await supertest(httpSetup.server.listener) - .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) - .expect(500) - .expect('Content-Type', 'application/json; charset=utf-8') - .then(({ body }) => - expect(body.message).toEqual('Reporting generation failed: job failure message') + await server.start(); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) + .expect(503) + .expect('Content-Type', 'text/plain; charset=utf-8') + .expect('Retry-After', '30') + .then(({ text }) => expect(text).toEqual('pending')); + }); + + it('when a job fails', async () => { + mockEsClient.search.mockResponse( + getHits({ + jobtype: mockJobTypeUnencoded, + status: 'failed', + output: { content: 'job failure message' }, + payload: { title: 'failing job!' }, + }) ); - }); + registerJobInfoRoutes(core); + + await server.start(); + await supertest(httpSetup.server.listener) + .get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) + .expect(500) + .expect('Content-Type', 'application/json; charset=utf-8') + .then(({ body }) => + expect(body.message).toEqual('Reporting generation failed: job failure message') + ); + }); - describe('successful downloads', () => { it('when a known job-type is complete', async () => { mockEsClient.search.mockResponseOnce(getCompleteHits()); registerJobInfoRoutes(core); @@ -483,4 +483,28 @@ describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => { }); }); }); + + describe('delete report', () => { + it('handles content stream errors', async () => { + stream = new Readable({ + read() { + this.push('test'); + this.push(null); + }, + }) as typeof stream; + stream.end = jest.fn().mockImplementation((_name, _encoding, callback) => { + callback(new Error('An error occurred in ending the content stream')); + }); + + (getContentStream as jest.MockedFunction).mockResolvedValue(stream); + mockEsClient.search.mockResponseOnce(getCompleteHits()); + registerJobInfoRoutes(core); + + await server.start(); + await supertest(httpSetup.server.listener) + .delete(`${INTERNAL_ROUTES.JOBS.DELETE_PREFIX}/dank`) + .expect(500) + .expect('Content-Type', 'application/json; charset=utf-8'); + }); + }); }); diff --git a/x-pack/plugins/reporting/server/routes/public/integration_tests/jobs.test.ts b/x-pack/plugins/reporting/server/routes/public/integration_tests/jobs.test.ts index b301b1546dabfd..1d2ef78cf60e3d 100644 --- a/x-pack/plugins/reporting/server/routes/public/integration_tests/jobs.test.ts +++ b/x-pack/plugins/reporting/server/routes/public/integration_tests/jobs.test.ts @@ -36,7 +36,7 @@ import { registerJobInfoRoutesPublic } from '../jobs'; type SetupServerReturn = Awaited>; -describe(`GET ${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => { +describe(`Reporting Job Management Routes: Public`, () => { const reportingSymbol = Symbol('reporting'); let server: SetupServerReturn['server']; let usageCounter: IUsageCounter; @@ -135,114 +135,114 @@ describe(`GET ${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => { await server.stop(); }); - it('fails on malformed download IDs', async () => { - mockEsClient.search.mockResponseOnce(getHits()); - registerJobInfoRoutesPublic(core); + describe('download report', () => { + it('fails on malformed download IDs', async () => { + mockEsClient.search.mockResponseOnce(getHits()); + registerJobInfoRoutesPublic(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`) - .expect(400) - .then(({ body }) => - expect(body.message).toMatchInlineSnapshot( - '"[request params.docId]: value has length [1] but it must have a minimum length of [3]."' - ) - ); - }); + await supertest(httpSetup.server.listener) + .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`) + .expect(400) + .then(({ body }) => + expect(body.message).toMatchInlineSnapshot( + '"[request params.docId]: value has length [1] but it must have a minimum length of [3]."' + ) + ); + }); - it('fails on unauthenticated users', async () => { - mockStartDeps = await createMockPluginStart( - { - licensing: { - ...licensingMock.createStart(), - license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }), + it('fails on unauthenticated users', async () => { + mockStartDeps = await createMockPluginStart( + { + licensing: { + ...licensingMock.createStart(), + license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }), + }, + security: { authc: { getCurrentUser: () => undefined } }, }, - security: { authc: { getCurrentUser: () => undefined } }, - }, - mockConfigSchema - ); - core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps); - registerJobInfoRoutesPublic(core); + mockConfigSchema + ); + core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps); + registerJobInfoRoutesPublic(core); - await server.start(); + await server.start(); - await supertest(httpSetup.server.listener) - .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`) - .expect(401) - .then(({ body }) => - expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`) - ); - }); + await supertest(httpSetup.server.listener) + .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`) + .expect(401) + .then(({ body }) => + expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`) + ); + }); + + it('returns 404 if job not found', async () => { + mockEsClient.search.mockResponseOnce(getHits()); + registerJobInfoRoutesPublic(core); - it('returns 404 if job not found', async () => { - mockEsClient.search.mockResponseOnce(getHits()); - registerJobInfoRoutesPublic(core); + await server.start(); - await server.start(); + await supertest(httpSetup.server.listener) + .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) + .expect(404); + }); - await supertest(httpSetup.server.listener) - .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) - .expect(404); - }); + it('returns a 403 if not a valid job type', async () => { + mockEsClient.search.mockResponseOnce( + getHits({ + jobtype: 'invalidJobType', + payload: { title: 'invalid!' }, + }) + ); + registerJobInfoRoutesPublic(core); - it('returns a 403 if not a valid job type', async () => { - mockEsClient.search.mockResponseOnce( - getHits({ - jobtype: 'invalidJobType', - payload: { title: 'invalid!' }, - }) - ); - registerJobInfoRoutesPublic(core); + await server.start(); - await server.start(); + await supertest(httpSetup.server.listener) + .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) + .expect(403); + }); - await supertest(httpSetup.server.listener) - .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`) - .expect(403); - }); + it('when a job is incomplete', async () => { + mockEsClient.search.mockResponseOnce( + getHits({ + jobtype: 'unencodedJobType', + status: 'pending', + payload: { title: 'incomplete!' }, + }) + ); + registerJobInfoRoutesPublic(core); - it('when a job is incomplete', async () => { - mockEsClient.search.mockResponseOnce( - getHits({ - jobtype: 'unencodedJobType', - status: 'pending', - payload: { title: 'incomplete!' }, - }) - ); - registerJobInfoRoutesPublic(core); - - await server.start(); - await supertest(httpSetup.server.listener) - .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) - .expect(503) - .expect('Content-Type', 'text/plain; charset=utf-8') - .expect('Retry-After', '30') - .then(({ text }) => expect(text).toEqual('pending')); - }); + await server.start(); + await supertest(httpSetup.server.listener) + .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) + .expect(503) + .expect('Content-Type', 'text/plain; charset=utf-8') + .expect('Retry-After', '30') + .then(({ text }) => expect(text).toEqual('pending')); + }); - it('when a job fails', async () => { - mockEsClient.search.mockResponse( - getHits({ - jobtype: 'unencodedJobType', - status: 'failed', - output: { content: 'job failure message' }, - payload: { title: 'failing job!' }, - }) - ); - registerJobInfoRoutesPublic(core); - - await server.start(); - await supertest(httpSetup.server.listener) - .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) - .expect(500) - .expect('Content-Type', 'application/json; charset=utf-8') - .then(({ body }) => - expect(body.message).toEqual('Reporting generation failed: job failure message') + it('when a job fails', async () => { + mockEsClient.search.mockResponse( + getHits({ + jobtype: 'unencodedJobType', + status: 'failed', + output: { content: 'job failure message' }, + payload: { title: 'failing job!' }, + }) ); - }); + registerJobInfoRoutesPublic(core); + + await server.start(); + await supertest(httpSetup.server.listener) + .get(`${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`) + .expect(500) + .expect('Content-Type', 'application/json; charset=utf-8') + .then(({ body }) => + expect(body.message).toEqual('Reporting generation failed: job failure message') + ); + }); - describe('successful downloads', () => { it('when a known job-type is complete', async () => { mockEsClient.search.mockResponseOnce(getCompleteHits()); registerJobInfoRoutesPublic(core); @@ -292,4 +292,28 @@ describe(`GET ${PUBLIC_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => { }); }); }); + + describe('delete report', () => { + it('handles content stream errors', async () => { + stream = new Readable({ + read() { + this.push('test'); + this.push(null); + }, + }) as typeof stream; + stream.end = jest.fn().mockImplementation((_name, _encoding, callback) => { + callback(new Error('An error occurred in ending the content stream')); + }); + + (getContentStream as jest.MockedFunction).mockResolvedValue(stream); + mockEsClient.search.mockResponseOnce(getCompleteHits()); + registerJobInfoRoutesPublic(core); + + await server.start(); + await supertest(httpSetup.server.listener) + .delete('/api/reporting/jobs/delete/denk') + .expect(500) + .expect('Content-Type', 'application/json; charset=utf-8'); + }); + }); });