Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server-side create/update ingest pipelines #62744

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ describe('pipeline_serialization', () => {
},
},
],
on_failure: [
{
set: {
field: 'error.message',
value: '{{ failure_message }}',
},
},
],
},
pipeline2: {
description: 'pipeline2 description',
Expand All @@ -39,6 +47,14 @@ describe('pipeline_serialization', () => {
},
},
],
onFailure: [
{
set: {
field: 'error.message',
value: '{{ failure_message }}',
},
},
],
},
{
name: 'pipeline2',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,25 @@ import { PipelinesByName, Pipeline } from '../types';
export function deserializePipelines(pipelinesByName: PipelinesByName): Pipeline[] {
const pipelineNames: string[] = Object.keys(pipelinesByName);

const deserializedTemplates = pipelineNames.map((name: string) => {
const { description, version, processors } = pipelinesByName[name];
const deserializedPipelines = pipelineNames.map((name: string) => {
const { description, version, processors, on_failure } = pipelinesByName[name];

return {
const pipeline = {
name,
description,
version,
processors,
onFailure: on_failure,
};

// Remove any undefined values
return Object.entries(pipeline).reduce((pipelineDefinition: any, [key, value]) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, what issues was having e.g., { version: undefined ... } causing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed we had followed this pattern elsewhere, but you bring up a good point. It's not causing any issues atm, so I removed this logic for now.

if (value !== undefined) {
pipelineDefinition[key] = value;
}
return pipelineDefinition;
}, {});
});

return deserializedTemplates;
return deserializedPipelines;
}
8 changes: 7 additions & 1 deletion x-pack/plugins/ingest_pipelines/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@ export interface Pipeline {
description: string;
version?: number;
processors: Processor[];
onFailure?: Processor[];
}

export interface PipelinesByName {
[key: string]: Omit<Pipeline, 'name'>;
[key: string]: {
description: string;
version?: number;
processors: Processor[];
on_failure?: Processor[];
};
}
83 changes: 83 additions & 0 deletions x-pack/plugins/ingest_pipelines/server/routes/api/create.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { i18n } from '@kbn/i18n';
import { schema } from '@kbn/config-schema';

import { Pipeline } from '../../../common/types';
import { API_BASE_PATH } from '../../../common/constants';
import { RouteDependencies } from '../../types';

const bodySchema = schema.object({
name: schema.string(),
description: schema.string(),
processors: schema.arrayOf(schema.recordOf(schema.string(), schema.any())),
version: schema.maybe(schema.number()),
onFailure: schema.maybe(schema.arrayOf(schema.recordOf(schema.string(), schema.any()))),
});

export const registerCreateRoute = ({
router,
license,
lib: { isEsError },
}: RouteDependencies): void => {
router.put(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To follow REST principles it would be better to use POST to create a resource. Tiny change but it goes a long way 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I can change this in a follow-up PR. I think I may have used put here originally to align with the underlying ES API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change made via 6bba3da

{
path: API_BASE_PATH,
validate: {
body: bodySchema,
},
},
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.dataClient;
const pipeline = req.body as Pipeline;

const { name, description, processors, version, onFailure } = pipeline;

try {
// Check that a pipeline with the same name doesn't already exist
const pipelineByName = await callAsCurrentUser('ingest.getPipeline', { id: name });

if (pipelineByName[name]) {
return res.conflict({
body: new Error(
i18n.translate('xpack.ingestPipelines.createRoute.duplicatePipelineIdErrorMessage', {
defaultMessage: "There is already a pipeline with name '{name}'.",
values: {
name,
},
})
),
});
}
} catch (e) {
// Silently swallow error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we check for 404 here as a success indicator and return internal error otherwise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we would want to block on this. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to leave this as is for now, so I can merge this PR and start working on the UI. I'm open to revisiting it in a follow-up PR though.

}

try {
const response = await callAsCurrentUser('ingest.putPipeline', {
id: name,
body: {
description,
processors,
version,
on_failure: onFailure,
},
});

return res.ok({ body: response });
} catch (error) {
if (isEsError(error)) {
return res.customError({
statusCode: error.statusCode,
body: error,
});
}

return res.internalError({ body: error });
}
})
);
};
4 changes: 4 additions & 0 deletions x-pack/plugins/ingest_pipelines/server/routes/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@
*/

export { registerGetRoutes } from './get';

export { registerCreateRoute } from './create';

export { registerUpdateRoute } from './update';
70 changes: 70 additions & 0 deletions x-pack/plugins/ingest_pipelines/server/routes/api/update.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { schema } from '@kbn/config-schema';

import { Pipeline } from '../../../common/types';
import { API_BASE_PATH } from '../../../common/constants';
import { RouteDependencies } from '../../types';

const bodySchema = schema.object({
description: schema.string(),
processors: schema.arrayOf(schema.recordOf(schema.string(), schema.any())),
version: schema.maybe(schema.number()),
onFailure: schema.maybe(schema.arrayOf(schema.recordOf(schema.string(), schema.any()))),
});

const paramsSchema = schema.object({
name: schema.string(),
});

export const registerUpdateRoute = ({
router,
license,
lib: { isEsError },
}: RouteDependencies): void => {
router.put(
{
path: `${API_BASE_PATH}/{name}`,
validate: {
body: bodySchema,
params: paramsSchema,
},
},
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.dataClient;
const { name } = req.params as typeof paramsSchema.type;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider modifying license service gaurdApiRoute to preserve type information:

  guardApiRoute<P, Q, B>(handler: RequestHandler<P, Q, B>) {
    const license = this;

    return function licenseCheck(
      ctx: RequestHandlerContext,
      request: KibanaRequest<P, Q, B>,

Then we can remove as declarations (although I can see we may want to keep as Pipeline on the body).

const pipeline = req.body as Pipeline;

const { description, processors, version, onFailure } = pipeline;

try {
// Verify pipeline exists; ES will throw 404 if it doesn't
await callAsCurrentUser('ingest.getPipeline', { id: name });

const response = await callAsCurrentUser('ingest.putPipeline', {
id: name,
body: {
description,
processors,
version,
on_failure: onFailure,
},
});

return res.ok({ body: response });
} catch (error) {
if (isEsError(error)) {
return res.customError({
statusCode: error.statusCode,
body: error,
});
}

return res.internalError({ body: error });
}
})
);
};
4 changes: 3 additions & 1 deletion x-pack/plugins/ingest_pipelines/server/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

import { RouteDependencies } from '../types';

import { registerGetRoutes } from './api';
import { registerGetRoutes, registerCreateRoute, registerUpdateRoute } from './api';

export class ApiRoutes {
setup(dependencies: RouteDependencies) {
registerGetRoutes(dependencies);
registerCreateRoute(dependencies);
registerUpdateRoute(dependencies);
}
}
1 change: 1 addition & 0 deletions x-pack/test/api_integration/apis/management/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ export default function({ loadTestFile }) {
loadTestFile(require.resolve('./rollup'));
loadTestFile(require.resolve('./index_management'));
loadTestFile(require.resolve('./index_lifecycle_management'));
loadTestFile(require.resolve('./ingest_pipelines'));
});
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { FtrProviderContext } from '../../../ftr_provider_context';

export default function({ loadTestFile }: FtrProviderContext) {
describe('Ingest Node Pipelines', () => {
loadTestFile(require.resolve('./ingest_pipelines'));
});
}
Loading