Skip to content

Commit

Permalink
[Connectors] Use Connector API to create a connector (#183398)
Browse files Browse the repository at this point in the history
## Summary

Use [Connector API
endpoint](https://www.elastic.co/guide/en/elasticsearch/reference/master/create-connector-api.html)
in the create connectors action. Note:
elastic/elasticsearch#109248 was merged into ES
very recently, you might need to pull latest ES image to get this
working.

Note: some crawler features also utilise connector index, since it was
agreed not to support those features in the Connector API I'm leaving
crawler related logic unchanged

### Validation
- Add unit tests
- Test locally with stack
- Test locally with serverless
  • Loading branch information
jedrazb authored Jun 6, 2024
1 parent 584362e commit 678ffa0
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 19 deletions.
198 changes: 198 additions & 0 deletions packages/kbn-search-connectors/lib/create_connector.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { errors } from '@elastic/elasticsearch';

import { ElasticsearchClient } from '@kbn/core/server';
import { FeatureName } from '../types';

import { createConnector } from './create_connector';

const notFoundError = new errors.ResponseError({
statusCode: 404,
body: {
error: {
type: `document_missing_exception`,
},
},
} as any);

describe('createConnector lib', () => {
const mockClient = {
transport: {
request: jest.fn(),
},
};
beforeEach(() => {
jest.clearAllMocks();
});

it('should create connector with _connector API endpoint', async () => {
const connectorId = 'connectorId';
const mockConnector = {
id: connectorId,
index_name: 'indexName',
language: 'en',
is_native: true,
};
mockClient.transport.request
.mockResolvedValueOnce({ id: connectorId })
.mockResolvedValueOnce(mockConnector);

await expect(
createConnector(mockClient as unknown as ElasticsearchClient, {
isNative: true,
indexName: mockConnector.index_name,
language: mockConnector.language,
})
).resolves.toEqual(mockConnector);
expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'POST',
path: `/_connector`,
body: {
index_name: 'indexName',
language: 'en',
is_native: true,
name: '',
},
});
expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'GET',
path: `/_connector/${connectorId}`,
});
});

it('should update pipeline params if provided', async () => {
const connectorId = 'connectorId';
const mockConnector = {
id: connectorId,
index_name: 'indexName',
language: 'en',
is_native: true,
};

const mockPipeline = {
extract_binary_content: true,
name: 'test',
reduce_whitespace: true,
run_ml_inference: true,
};

mockClient.transport.request
.mockResolvedValueOnce({ id: connectorId })
.mockResolvedValueOnce({ result: 'updated' })
.mockResolvedValueOnce(mockConnector);

await expect(
createConnector(mockClient as unknown as ElasticsearchClient, {
isNative: true,
indexName: 'indexName',
language: 'en',
pipeline: mockPipeline,
})
).resolves.toEqual(mockConnector);

expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'POST',
path: `/_connector`,
body: {
index_name: 'indexName',
language: 'en',
is_native: true,
name: '',
},
});
expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'PUT',
path: `/_connector/${connectorId}/_pipeline`,
body: { pipeline: mockPipeline },
});
expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'GET',
path: `/_connector/${connectorId}`,
});
});

it('should update connector features if provided', async () => {
const connectorId = 'connectorId';
const mockConnector = {
id: connectorId,
index_name: 'indexName',
language: 'en',
is_native: true,
};

const mockFeatures = {
[FeatureName.FILTERING_ADVANCED_CONFIG]: true,
[FeatureName.FILTERING_RULES]: true,
[FeatureName.SYNC_RULES]: {
advanced: { enabled: true },
basic: { enabled: true },
},
};

mockClient.transport.request
.mockResolvedValueOnce({ id: connectorId })
.mockResolvedValueOnce({ result: 'updated' })
.mockResolvedValueOnce(mockConnector);

await expect(
createConnector(mockClient as unknown as ElasticsearchClient, {
isNative: true,
indexName: 'indexName',
language: 'en',
features: mockFeatures,
})
).resolves.toEqual(mockConnector);

expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'POST',
path: `/_connector`,
body: {
index_name: 'indexName',
language: 'en',
is_native: true,
name: '',
},
});
expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'PUT',
path: `/_connector/${connectorId}/_features`,
body: { features: mockFeatures },
});
expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'GET',
path: `/_connector/${connectorId}`,
});
});

it('should throw an error if connector doc is not found', async () => {
mockClient.transport.request
.mockResolvedValueOnce({ id: 'connectorId' })
.mockRejectedValueOnce(notFoundError);

await expect(
createConnector(mockClient as unknown as ElasticsearchClient, {
isNative: true,
indexName: 'some-index',
language: 'somelang',
})
).rejects.toEqual(new Error('Could not retrieve the created connector'));

expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'POST',
path: `/_connector`,
body: {
index_name: 'some-index',
is_native: true,
language: 'somelang',
name: '',
},
});
});
});
51 changes: 39 additions & 12 deletions packages/kbn-search-connectors/lib/create_connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
*/

import { ElasticsearchClient } from '@kbn/core/server';
import { CURRENT_CONNECTORS_INDEX } from '..';
import { i18n } from '@kbn/i18n';
import { fetchConnectorById } from '..';

import { Connector, ConnectorConfiguration, IngestPipelineParams } from '../types/connectors';
import { createConnectorDocument } from './create_connector_document';

export const createConnector = async (
client: ElasticsearchClient,
Expand All @@ -23,19 +23,46 @@ export const createConnector = async (
name?: string;
pipeline?: IngestPipelineParams;
serviceType?: string | null;
instant_response?: boolean;
}
): Promise<Connector> => {
const document = createConnectorDocument({
...input,
serviceType: input.serviceType || null,
const { id: connectorId } = await client.transport.request<{ id: string }>({
method: 'POST',
path: `/_connector`,
body: {
...(input.indexName && { index_name: input.indexName }),
is_native: input.isNative,
...(input.language && { language: input.language }),
name: input.name || '',
...(input.serviceType && { service_type: input.serviceType }),
},
});

const result = await client.index({
document,
index: CURRENT_CONNECTORS_INDEX,
refresh: input.instant_response ? false : 'wait_for',
});
if (input.pipeline) {
await client.transport.request({
method: 'PUT',
path: `/_connector/${connectorId}/_pipeline`,
body: { pipeline: input.pipeline },
});
}

if (input.features) {
await client.transport.request({
method: 'PUT',
path: `/_connector/${connectorId}/_features`,
body: { features: input.features },
});
}

// createConnector function expects to return a Connector doc, so we fetch it from the index
const connector = await fetchConnectorById(client, connectorId);

if (!connector) {
throw new Error(
i18n.translate('searchConnectors.server.connectors.not_found_error', {
defaultMessage: 'Could not retrieve the created connector',
})
);
}

return { ...document, id: result._id };
return connector;
};
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ export function connectorStatusToText(connector: Connector): string {
);
}
if (
connector.error === SyncStatus.ERROR ||
connector.last_sync_error !== null ||
connector.last_access_control_sync_error !== null
connector.last_sync_status === SyncStatus.ERROR ||
connector.last_access_control_sync_status === SyncStatus.ERROR ||
connector.last_sync_error != null ||
connector.last_access_control_sync_error != null
) {
return i18n.translate(
'xpack.enterpriseSearch.content.searchIndices.connectorStatus.syncFailure.label',
Expand Down Expand Up @@ -87,9 +88,10 @@ export function connectorStatusToColor(connector: Connector): 'warning' | 'dange
if (
isLastSeenOld(connector) ||
connectorStatus === ConnectorStatus.ERROR ||
connector.error === SyncStatus.ERROR ||
connector.last_sync_error !== null ||
connector.last_access_control_sync_error !== null
connector.last_sync_status === SyncStatus.ERROR ||
connector.last_access_control_sync_status === SyncStatus.ERROR ||
connector.last_sync_error != null ||
connector.last_access_control_sync_error != null
) {
return 'danger';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ export const registerConnectorsRoutes = ({ http, router }: RouteDependencies) =>
const { client } = (await context.core).elasticsearch;
const connector = await createConnector(client.asCurrentUser, {
indexName: null,
instant_response: true,
isNative: false,
language: null,
});
Expand Down

0 comments on commit 678ffa0

Please sign in to comment.