diff --git a/package.json b/package.json index 07645f27939f29..88fcbb1aa4d5e3 100644 --- a/package.json +++ b/package.json @@ -122,6 +122,7 @@ "@elastic/apm-rum": "^5.2.0", "@elastic/charts": "19.8.1", "@elastic/datemath": "5.0.3", + "@elastic/elasticsearch": "7.8.0", "@elastic/ems-client": "7.9.3", "@elastic/eui": "24.1.0", "@elastic/filesaver": "1.1.2", @@ -289,7 +290,6 @@ "devDependencies": { "@babel/parser": "^7.10.2", "@babel/types": "^7.10.2", - "@elastic/elasticsearch": "^7.4.0", "@elastic/eslint-config-kibana": "0.15.0", "@elastic/eslint-plugin-eui": "0.0.2", "@elastic/github-checks-reporter": "0.0.20b3", diff --git a/src/core/server/elasticsearch/client/client_config.test.ts b/src/core/server/elasticsearch/client/client_config.test.ts new file mode 100644 index 00000000000000..675d8840e7118e --- /dev/null +++ b/src/core/server/elasticsearch/client/client_config.test.ts @@ -0,0 +1,483 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { duration } from 'moment'; +import { ElasticsearchClientConfig, parseClientOptions } from './client_config'; + +const createConfig = ( + parts: Partial = {} +): ElasticsearchClientConfig => { + return { + customHeaders: {}, + logQueries: false, + sniffOnStart: false, + sniffOnConnectionFault: false, + sniffInterval: false, + requestHeadersWhitelist: ['authorization'], + hosts: ['http://localhost:80'], + ...parts, + }; +}; + +describe('parseClientOptions', () => { + describe('basic options', () => { + it('`customHeaders` option', () => { + const config = createConfig({ + customHeaders: { + foo: 'bar', + hello: 'dolly', + }, + }); + + expect(parseClientOptions(config, false)).toEqual( + expect.objectContaining({ + headers: { + foo: 'bar', + hello: 'dolly', + }, + }) + ); + }); + + it('`keepAlive` option', () => { + expect(parseClientOptions(createConfig({ keepAlive: true }), false)).toEqual( + expect.objectContaining({ agent: { keepAlive: true } }) + ); + expect(parseClientOptions(createConfig({ keepAlive: false }), false).agent).toBeUndefined(); + }); + + it('`sniffOnStart` options', () => { + expect( + parseClientOptions( + createConfig({ + sniffOnStart: true, + }), + false + ).sniffOnStart + ).toEqual(true); + + expect( + parseClientOptions( + createConfig({ + sniffOnStart: false, + }), + false + ).sniffOnStart + ).toEqual(false); + }); + it('`sniffOnConnectionFault` options', () => { + expect( + parseClientOptions( + createConfig({ + sniffOnConnectionFault: true, + }), + false + ).sniffOnConnectionFault + ).toEqual(true); + + expect( + parseClientOptions( + createConfig({ + sniffOnConnectionFault: false, + }), + false + ).sniffOnConnectionFault + ).toEqual(false); + }); + it('`sniffInterval` options', () => { + expect( + parseClientOptions( + createConfig({ + sniffInterval: false, + }), + false + ).sniffInterval + ).toEqual(false); + + expect( + parseClientOptions( + createConfig({ + sniffInterval: duration(100, 'ms'), + }), + false + ).sniffInterval + ).toEqual(100); + }); + + it('`hosts` option', () => { + const options = parseClientOptions( + createConfig({ + hosts: ['http://node-A:9200', 'http://node-B', 'https://node-C'], + }), + false + ); + + expect(options.nodes).toMatchInlineSnapshot(` + Array [ + Object { + "url": "http://node-a:9200/", + }, + Object { + "url": "http://node-b/", + }, + Object { + "url": "https://node-c/", + }, + ] + `); + }); + }); + + describe('authorization', () => { + describe('when `scoped` is false', () => { + it('adds the `auth` option if both `username` and `password` are set', () => { + expect( + parseClientOptions( + createConfig({ + username: 'user', + }), + false + ).auth + ).toBeUndefined(); + + expect( + parseClientOptions( + createConfig({ + password: 'pass', + }), + false + ).auth + ).toBeUndefined(); + + expect( + parseClientOptions( + createConfig({ + username: 'user', + password: 'pass', + }), + false + ) + ).toEqual( + expect.objectContaining({ + auth: { + username: 'user', + password: 'pass', + }, + }) + ); + }); + + it('adds auth to the nodes if both `username` and `password` are set', () => { + let options = parseClientOptions( + createConfig({ + username: 'user', + hosts: ['http://node-A:9200'], + }), + false + ); + expect(options.nodes).toMatchInlineSnapshot(` + Array [ + Object { + "url": "http://node-a:9200/", + }, + ] + `); + + options = parseClientOptions( + createConfig({ + password: 'pass', + hosts: ['http://node-A:9200'], + }), + false + ); + expect(options.nodes).toMatchInlineSnapshot(` + Array [ + Object { + "url": "http://node-a:9200/", + }, + ] + `); + + options = parseClientOptions( + createConfig({ + username: 'user', + password: 'pass', + hosts: ['http://node-A:9200'], + }), + false + ); + expect(options.nodes).toMatchInlineSnapshot(` + Array [ + Object { + "url": "http://user:pass@node-a:9200/", + }, + ] + `); + }); + }); + describe('when `scoped` is true', () => { + it('does not add the `auth` option even if both `username` and `password` are set', () => { + expect( + parseClientOptions( + createConfig({ + username: 'user', + password: 'pass', + }), + true + ).auth + ).toBeUndefined(); + }); + + it('does not add auth to the nodes even if both `username` and `password` are set', () => { + const options = parseClientOptions( + createConfig({ + username: 'user', + password: 'pass', + hosts: ['http://node-A:9200'], + }), + true + ); + expect(options.nodes).toMatchInlineSnapshot(` + Array [ + Object { + "url": "http://node-a:9200/", + }, + ] + `); + }); + }); + }); + + describe('ssl config', () => { + it('does not generate ssl option is ssl config is not set', () => { + expect(parseClientOptions(createConfig({}), false).ssl).toBeUndefined(); + expect(parseClientOptions(createConfig({}), true).ssl).toBeUndefined(); + }); + + it('handles the `certificateAuthorities` option', () => { + expect( + parseClientOptions( + createConfig({ + ssl: { verificationMode: 'full', certificateAuthorities: ['content-of-ca-path'] }, + }), + false + ).ssl!.ca + ).toEqual(['content-of-ca-path']); + expect( + parseClientOptions( + createConfig({ + ssl: { verificationMode: 'full', certificateAuthorities: ['content-of-ca-path'] }, + }), + true + ).ssl!.ca + ).toEqual(['content-of-ca-path']); + }); + + describe('verificationMode', () => { + it('handles `none` value', () => { + expect( + parseClientOptions( + createConfig({ + ssl: { + verificationMode: 'none', + }, + }), + false + ).ssl + ).toMatchInlineSnapshot(` + Object { + "ca": undefined, + "rejectUnauthorized": false, + } + `); + }); + it('handles `certificate` value', () => { + expect( + parseClientOptions( + createConfig({ + ssl: { + verificationMode: 'certificate', + }, + }), + false + ).ssl + ).toMatchInlineSnapshot(` + Object { + "ca": undefined, + "checkServerIdentity": [Function], + "rejectUnauthorized": true, + } + `); + }); + it('handles `full` value', () => { + expect( + parseClientOptions( + createConfig({ + ssl: { + verificationMode: 'full', + }, + }), + false + ).ssl + ).toMatchInlineSnapshot(` + Object { + "ca": undefined, + "rejectUnauthorized": true, + } + `); + }); + it('throws for invalid values', () => { + expect( + () => + parseClientOptions( + createConfig({ + ssl: { + verificationMode: 'unknown' as any, + }, + }), + false + ).ssl + ).toThrowErrorMatchingInlineSnapshot(`"Unknown ssl verificationMode: unknown"`); + }); + it('throws for undefined values', () => { + expect( + () => + parseClientOptions( + createConfig({ + ssl: { + verificationMode: undefined as any, + }, + }), + false + ).ssl + ).toThrowErrorMatchingInlineSnapshot(`"Unknown ssl verificationMode: undefined"`); + }); + }); + + describe('`certificate`, `key` and `passphrase`', () => { + it('are not added if `key` is not present', () => { + expect( + parseClientOptions( + createConfig({ + ssl: { + verificationMode: 'full', + certificate: 'content-of-cert', + keyPassphrase: 'passphrase', + }, + }), + false + ).ssl + ).toMatchInlineSnapshot(` + Object { + "ca": undefined, + "rejectUnauthorized": true, + } + `); + }); + + it('are not added if `certificate` is not present', () => { + expect( + parseClientOptions( + createConfig({ + ssl: { + verificationMode: 'full', + key: 'content-of-key', + keyPassphrase: 'passphrase', + }, + }), + false + ).ssl + ).toMatchInlineSnapshot(` + Object { + "ca": undefined, + "rejectUnauthorized": true, + } + `); + }); + + it('are added if `key` and `certificate` are present and `scoped` is false', () => { + expect( + parseClientOptions( + createConfig({ + ssl: { + verificationMode: 'full', + key: 'content-of-key', + certificate: 'content-of-cert', + keyPassphrase: 'passphrase', + }, + }), + false + ).ssl + ).toMatchInlineSnapshot(` + Object { + "ca": undefined, + "cert": "content-of-cert", + "key": "content-of-key", + "passphrase": "passphrase", + "rejectUnauthorized": true, + } + `); + }); + + it('are not added if `scoped` is true unless `alwaysPresentCertificate` is true', () => { + expect( + parseClientOptions( + createConfig({ + ssl: { + verificationMode: 'full', + key: 'content-of-key', + certificate: 'content-of-cert', + keyPassphrase: 'passphrase', + }, + }), + true + ).ssl + ).toMatchInlineSnapshot(` + Object { + "ca": undefined, + "rejectUnauthorized": true, + } + `); + + expect( + parseClientOptions( + createConfig({ + ssl: { + verificationMode: 'full', + key: 'content-of-key', + certificate: 'content-of-cert', + keyPassphrase: 'passphrase', + alwaysPresentCertificate: true, + }, + }), + true + ).ssl + ).toMatchInlineSnapshot(` + Object { + "ca": undefined, + "cert": "content-of-cert", + "key": "content-of-key", + "passphrase": "passphrase", + "rejectUnauthorized": true, + } + `); + }); + }); + }); +}); diff --git a/src/core/server/elasticsearch/client/client_config.ts b/src/core/server/elasticsearch/client/client_config.ts new file mode 100644 index 00000000000000..f365ca331cfea9 --- /dev/null +++ b/src/core/server/elasticsearch/client/client_config.ts @@ -0,0 +1,158 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { ConnectionOptions as TlsConnectionOptions } from 'tls'; +import { URL } from 'url'; +import { Duration } from 'moment'; +import { ClientOptions, NodeOptions } from '@elastic/elasticsearch'; +import { ElasticsearchConfig } from '../elasticsearch_config'; + +/** + * Configuration options to be used to create a {@link IClusterClient | cluster client} using the + * {@link ElasticsearchServiceStart.createClient | createClient API} + * + * @public + */ +export type ElasticsearchClientConfig = Pick< + ElasticsearchConfig, + | 'customHeaders' + | 'logQueries' + | 'sniffOnStart' + | 'sniffOnConnectionFault' + | 'requestHeadersWhitelist' + | 'sniffInterval' + | 'hosts' + | 'username' + | 'password' +> & { + pingTimeout?: ElasticsearchConfig['pingTimeout'] | ClientOptions['pingTimeout']; + requestTimeout?: ElasticsearchConfig['requestTimeout'] | ClientOptions['requestTimeout']; + ssl?: Partial; + keepAlive?: boolean; +}; + +/** + * Parse the client options from given client config and `scoped` flag. + * + * @param config The config to generate the client options from. + * @param scoped if true, will adapt the configuration to be used by a scoped client + * (will remove basic auth and ssl certificates) + */ +export function parseClientOptions( + config: ElasticsearchClientConfig, + scoped: boolean +): ClientOptions { + const clientOptions: ClientOptions = { + sniffOnStart: config.sniffOnStart, + sniffOnConnectionFault: config.sniffOnConnectionFault, + headers: config.customHeaders, + }; + + if (config.pingTimeout != null) { + clientOptions.pingTimeout = getDurationAsMs(config.pingTimeout); + } + if (config.requestTimeout != null) { + clientOptions.requestTimeout = getDurationAsMs(config.requestTimeout); + } + if (config.sniffInterval != null) { + clientOptions.sniffInterval = + typeof config.sniffInterval === 'boolean' + ? config.sniffInterval + : getDurationAsMs(config.sniffInterval); + } + if (config.keepAlive) { + clientOptions.agent = { + keepAlive: config.keepAlive, + }; + } + + if (config.username && config.password && !scoped) { + clientOptions.auth = { + username: config.username, + password: config.password, + }; + } + + clientOptions.nodes = config.hosts.map((host) => convertHost(host, !scoped, config)); + + if (config.ssl) { + clientOptions.ssl = generateSslConfig( + config.ssl, + scoped && !config.ssl.alwaysPresentCertificate + ); + } + + return clientOptions; +} + +const generateSslConfig = ( + sslConfig: Required['ssl'], + ignoreCertAndKey: boolean +): TlsConnectionOptions => { + const ssl: TlsConnectionOptions = { + ca: sslConfig.certificateAuthorities, + }; + + const verificationMode = sslConfig.verificationMode; + switch (verificationMode) { + case 'none': + ssl.rejectUnauthorized = false; + break; + case 'certificate': + ssl.rejectUnauthorized = true; + // by default, NodeJS is checking the server identify + ssl.checkServerIdentity = () => undefined; + break; + case 'full': + ssl.rejectUnauthorized = true; + break; + default: + throw new Error(`Unknown ssl verificationMode: ${verificationMode}`); + } + + // Add client certificate and key if required by elasticsearch + if (!ignoreCertAndKey && sslConfig.certificate && sslConfig.key) { + ssl.cert = sslConfig.certificate; + ssl.key = sslConfig.key; + ssl.passphrase = sslConfig.keyPassphrase; + } + + return ssl; +}; + +const convertHost = ( + host: string, + needAuth: boolean, + { username, password }: ElasticsearchClientConfig +): NodeOptions => { + const url = new URL(host); + const isHTTPS = url.protocol === 'https:'; + url.port = url.port || (isHTTPS ? '443' : '80'); + if (needAuth && username && password) { + url.username = username; + url.password = password; + } + + return { + url, + }; +}; + +const getDurationAsMs = (duration: number | Duration) => + typeof duration === 'number' ? duration : duration.asMilliseconds(); diff --git a/src/core/server/elasticsearch/client/cluster_client.test.mocks.ts b/src/core/server/elasticsearch/client/cluster_client.test.mocks.ts new file mode 100644 index 00000000000000..e08c0d55b45519 --- /dev/null +++ b/src/core/server/elasticsearch/client/cluster_client.test.mocks.ts @@ -0,0 +1,23 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export const configureClientMock = jest.fn(); +jest.doMock('./configure_client', () => ({ + configureClient: configureClientMock, +})); diff --git a/src/core/server/elasticsearch/client/cluster_client.test.ts b/src/core/server/elasticsearch/client/cluster_client.test.ts new file mode 100644 index 00000000000000..85517b80745f17 --- /dev/null +++ b/src/core/server/elasticsearch/client/cluster_client.test.ts @@ -0,0 +1,376 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { configureClientMock } from './cluster_client.test.mocks'; +import { loggingSystemMock } from '../../logging/logging_system.mock'; +import { httpServerMock } from '../../http/http_server.mocks'; +import { GetAuthHeaders } from '../../http'; +import { elasticsearchClientMock } from './mocks'; +import { ClusterClient } from './cluster_client'; +import { ElasticsearchClientConfig } from './client_config'; + +const createConfig = ( + parts: Partial = {} +): ElasticsearchClientConfig => { + return { + logQueries: false, + sniffOnStart: false, + sniffOnConnectionFault: false, + sniffInterval: false, + requestHeadersWhitelist: ['authorization'], + customHeaders: {}, + hosts: ['http://localhost'], + ...parts, + }; +}; + +describe('ClusterClient', () => { + let logger: ReturnType; + let getAuthHeaders: jest.MockedFunction; + let internalClient: ReturnType; + let scopedClient: ReturnType; + + beforeEach(() => { + logger = loggingSystemMock.createLogger(); + internalClient = elasticsearchClientMock.createInternalClient(); + scopedClient = elasticsearchClientMock.createInternalClient(); + getAuthHeaders = jest.fn().mockImplementation(() => ({ + authorization: 'auth', + foo: 'bar', + })); + + configureClientMock.mockImplementation((config, { scoped = false }) => { + return scoped ? scopedClient : internalClient; + }); + }); + + afterEach(() => { + configureClientMock.mockReset(); + }); + + it('creates a single internal and scoped client during initialization', () => { + const config = createConfig(); + + new ClusterClient(config, logger, getAuthHeaders); + + expect(configureClientMock).toHaveBeenCalledTimes(2); + expect(configureClientMock).toHaveBeenCalledWith(config, { logger }); + expect(configureClientMock).toHaveBeenCalledWith(config, { logger, scoped: true }); + }); + + describe('#asInternalUser', () => { + it('returns the internal client', () => { + const clusterClient = new ClusterClient(createConfig(), logger, getAuthHeaders); + + expect(clusterClient.asInternalUser).toBe(internalClient); + }); + }); + + describe('#asScoped', () => { + it('returns a scoped cluster client bound to the request', () => { + const clusterClient = new ClusterClient(createConfig(), logger, getAuthHeaders); + const request = httpServerMock.createKibanaRequest(); + + const scopedClusterClient = clusterClient.asScoped(request); + + expect(scopedClient.child).toHaveBeenCalledTimes(1); + expect(scopedClient.child).toHaveBeenCalledWith({ headers: expect.any(Object) }); + + expect(scopedClusterClient.asInternalUser).toBe(clusterClient.asInternalUser); + expect(scopedClusterClient.asCurrentUser).toBe(scopedClient.child.mock.results[0].value); + }); + + it('returns a distinct scoped cluster client on each call', () => { + const clusterClient = new ClusterClient(createConfig(), logger, getAuthHeaders); + const request = httpServerMock.createKibanaRequest(); + + const scopedClusterClient1 = clusterClient.asScoped(request); + const scopedClusterClient2 = clusterClient.asScoped(request); + + expect(scopedClient.child).toHaveBeenCalledTimes(2); + + expect(scopedClusterClient1).not.toBe(scopedClusterClient2); + expect(scopedClusterClient1.asInternalUser).toBe(scopedClusterClient2.asInternalUser); + }); + + it('creates a scoped client with filtered request headers', () => { + const config = createConfig({ + requestHeadersWhitelist: ['foo'], + }); + getAuthHeaders.mockReturnValue({}); + + const clusterClient = new ClusterClient(config, logger, getAuthHeaders); + const request = httpServerMock.createKibanaRequest({ + headers: { + foo: 'bar', + hello: 'dolly', + }, + }); + + clusterClient.asScoped(request); + + expect(scopedClient.child).toHaveBeenCalledTimes(1); + expect(scopedClient.child).toHaveBeenCalledWith({ + headers: { foo: 'bar' }, + }); + }); + + it('creates a scoped facade with filtered auth headers', () => { + const config = createConfig({ + requestHeadersWhitelist: ['authorization'], + }); + getAuthHeaders.mockReturnValue({ + authorization: 'auth', + other: 'nope', + }); + + const clusterClient = new ClusterClient(config, logger, getAuthHeaders); + const request = httpServerMock.createKibanaRequest({}); + + clusterClient.asScoped(request); + + expect(scopedClient.child).toHaveBeenCalledTimes(1); + expect(scopedClient.child).toHaveBeenCalledWith({ + headers: { authorization: 'auth' }, + }); + }); + + it('respects auth headers precedence', () => { + const config = createConfig({ + requestHeadersWhitelist: ['authorization'], + }); + getAuthHeaders.mockReturnValue({ + authorization: 'auth', + other: 'nope', + }); + + const clusterClient = new ClusterClient(config, logger, getAuthHeaders); + const request = httpServerMock.createKibanaRequest({ + headers: { + authorization: 'override', + }, + }); + + clusterClient.asScoped(request); + + expect(scopedClient.child).toHaveBeenCalledTimes(1); + expect(scopedClient.child).toHaveBeenCalledWith({ + headers: { authorization: 'auth' }, + }); + }); + + it('includes the `customHeaders` from the config without filtering them', () => { + const config = createConfig({ + customHeaders: { + foo: 'bar', + hello: 'dolly', + }, + requestHeadersWhitelist: ['authorization'], + }); + getAuthHeaders.mockReturnValue({}); + + const clusterClient = new ClusterClient(config, logger, getAuthHeaders); + const request = httpServerMock.createKibanaRequest({}); + + clusterClient.asScoped(request); + + expect(scopedClient.child).toHaveBeenCalledTimes(1); + expect(scopedClient.child).toHaveBeenCalledWith({ + headers: { + foo: 'bar', + hello: 'dolly', + }, + }); + }); + + it('respect the precedence of auth headers over config headers', () => { + const config = createConfig({ + customHeaders: { + foo: 'config', + hello: 'dolly', + }, + requestHeadersWhitelist: ['foo'], + }); + getAuthHeaders.mockReturnValue({ + foo: 'auth', + }); + + const clusterClient = new ClusterClient(config, logger, getAuthHeaders); + const request = httpServerMock.createKibanaRequest({}); + + clusterClient.asScoped(request); + + expect(scopedClient.child).toHaveBeenCalledTimes(1); + expect(scopedClient.child).toHaveBeenCalledWith({ + headers: { + foo: 'auth', + hello: 'dolly', + }, + }); + }); + + it('respect the precedence of request headers over config headers', () => { + const config = createConfig({ + customHeaders: { + foo: 'config', + hello: 'dolly', + }, + requestHeadersWhitelist: ['foo'], + }); + getAuthHeaders.mockReturnValue({}); + + const clusterClient = new ClusterClient(config, logger, getAuthHeaders); + const request = httpServerMock.createKibanaRequest({ + headers: { foo: 'request' }, + }); + + clusterClient.asScoped(request); + + expect(scopedClient.child).toHaveBeenCalledTimes(1); + expect(scopedClient.child).toHaveBeenCalledWith({ + headers: { + foo: 'request', + hello: 'dolly', + }, + }); + }); + + it('filter headers when called with a `FakeRequest`', () => { + const config = createConfig({ + requestHeadersWhitelist: ['authorization'], + }); + getAuthHeaders.mockReturnValue({}); + + const clusterClient = new ClusterClient(config, logger, getAuthHeaders); + const request = { + headers: { + authorization: 'auth', + hello: 'dolly', + }, + }; + + clusterClient.asScoped(request); + + expect(scopedClient.child).toHaveBeenCalledTimes(1); + expect(scopedClient.child).toHaveBeenCalledWith({ + headers: { authorization: 'auth' }, + }); + }); + + it('does not add auth headers when called with a `FakeRequest`', () => { + const config = createConfig({ + requestHeadersWhitelist: ['authorization', 'foo'], + }); + getAuthHeaders.mockReturnValue({ + authorization: 'auth', + }); + + const clusterClient = new ClusterClient(config, logger, getAuthHeaders); + const request = { + headers: { + foo: 'bar', + hello: 'dolly', + }, + }; + + clusterClient.asScoped(request); + + expect(scopedClient.child).toHaveBeenCalledTimes(1); + expect(scopedClient.child).toHaveBeenCalledWith({ + headers: { foo: 'bar' }, + }); + }); + }); + + describe('#close', () => { + it('closes both underlying clients', async () => { + const clusterClient = new ClusterClient(createConfig(), logger, getAuthHeaders); + + await clusterClient.close(); + + expect(internalClient.close).toHaveBeenCalledTimes(1); + expect(scopedClient.close).toHaveBeenCalledTimes(1); + }); + + it('waits for both clients to close', async (done) => { + expect.assertions(4); + + const clusterClient = new ClusterClient(createConfig(), logger, getAuthHeaders); + + let internalClientClosed = false; + let scopedClientClosed = false; + let clusterClientClosed = false; + + let closeInternalClient: () => void; + let closeScopedClient: () => void; + + internalClient.close.mockReturnValue( + new Promise((resolve) => { + closeInternalClient = resolve; + }).then(() => { + expect(clusterClientClosed).toBe(false); + internalClientClosed = true; + }) + ); + scopedClient.close.mockReturnValue( + new Promise((resolve) => { + closeScopedClient = resolve; + }).then(() => { + expect(clusterClientClosed).toBe(false); + scopedClientClosed = true; + }) + ); + + clusterClient.close().then(() => { + clusterClientClosed = true; + expect(internalClientClosed).toBe(true); + expect(scopedClientClosed).toBe(true); + done(); + }); + + closeInternalClient!(); + closeScopedClient!(); + }); + + it('return a rejected promise is any client rejects', async () => { + const clusterClient = new ClusterClient(createConfig(), logger, getAuthHeaders); + + internalClient.close.mockRejectedValue(new Error('error closing client')); + + expect(clusterClient.close()).rejects.toThrowErrorMatchingInlineSnapshot( + `"error closing client"` + ); + }); + + it('does nothing after the first call', async () => { + const clusterClient = new ClusterClient(createConfig(), logger, getAuthHeaders); + + await clusterClient.close(); + + expect(internalClient.close).toHaveBeenCalledTimes(1); + expect(scopedClient.close).toHaveBeenCalledTimes(1); + + await clusterClient.close(); + await clusterClient.close(); + + expect(internalClient.close).toHaveBeenCalledTimes(1); + expect(scopedClient.close).toHaveBeenCalledTimes(1); + }); + }); +}); diff --git a/src/core/server/elasticsearch/client/cluster_client.ts b/src/core/server/elasticsearch/client/cluster_client.ts new file mode 100644 index 00000000000000..d9a0e6fe3f2384 --- /dev/null +++ b/src/core/server/elasticsearch/client/cluster_client.ts @@ -0,0 +1,113 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Client } from '@elastic/elasticsearch'; +import { Logger } from '../../logging'; +import { GetAuthHeaders, isRealRequest, Headers } from '../../http'; +import { ensureRawRequest, filterHeaders } from '../../http/router'; +import { ScopeableRequest } from '../types'; +import { ElasticsearchClient } from './types'; +import { configureClient } from './configure_client'; +import { ElasticsearchClientConfig } from './client_config'; +import { ScopedClusterClient, IScopedClusterClient } from './scoped_cluster_client'; + +const noop = () => undefined; + +/** + * Represents an Elasticsearch cluster API client created by the platform. + * It allows to call API on behalf of the internal Kibana user and + * the actual user that is derived from the request headers (via `asScoped(...)`). + * + * @public + **/ +export interface IClusterClient { + /** + * A {@link ElasticsearchClient | client} to be used to query the ES cluster on behalf of the Kibana internal user + */ + readonly asInternalUser: ElasticsearchClient; + /** + * Creates a {@link IScopedClusterClient | scoped cluster client} bound to given {@link ScopeableRequest | request} + */ + asScoped: (request: ScopeableRequest) => IScopedClusterClient; +} + +/** + * See {@link IClusterClient} + * + * @public + */ +export interface ICustomClusterClient extends IClusterClient { + /** + * Closes the cluster client. After that client cannot be used and one should + * create a new client instance to be able to interact with Elasticsearch API. + */ + close: () => Promise; +} + +/** @internal **/ +export class ClusterClient implements ICustomClusterClient { + public readonly asInternalUser: Client; + private readonly rootScopedClient: Client; + + private isClosed = false; + + constructor( + private readonly config: ElasticsearchClientConfig, + logger: Logger, + private readonly getAuthHeaders: GetAuthHeaders = noop + ) { + this.asInternalUser = configureClient(config, { logger }); + this.rootScopedClient = configureClient(config, { logger, scoped: true }); + } + + asScoped(request: ScopeableRequest) { + const scopedHeaders = this.getScopedHeaders(request); + const scopedClient = this.rootScopedClient.child({ + headers: scopedHeaders, + }); + return new ScopedClusterClient(this.asInternalUser, scopedClient); + } + + public async close() { + if (this.isClosed) { + return; + } + this.isClosed = true; + await Promise.all([this.asInternalUser.close(), this.rootScopedClient.close()]); + } + + private getScopedHeaders(request: ScopeableRequest): Headers { + let scopedHeaders: Headers; + if (isRealRequest(request)) { + const authHeaders = this.getAuthHeaders(request); + const requestHeaders = ensureRawRequest(request).headers; + scopedHeaders = filterHeaders( + { ...requestHeaders, ...authHeaders }, + this.config.requestHeadersWhitelist + ); + } else { + scopedHeaders = filterHeaders(request?.headers ?? {}, this.config.requestHeadersWhitelist); + } + + return { + ...this.config.customHeaders, + ...scopedHeaders, + }; + } +} diff --git a/src/core/server/elasticsearch/client/configure_client.test.mocks.ts b/src/core/server/elasticsearch/client/configure_client.test.mocks.ts new file mode 100644 index 00000000000000..0a74f57120fb01 --- /dev/null +++ b/src/core/server/elasticsearch/client/configure_client.test.mocks.ts @@ -0,0 +1,32 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export const parseClientOptionsMock = jest.fn(); +jest.doMock('./client_config', () => ({ + parseClientOptions: parseClientOptionsMock, +})); + +export const ClientMock = jest.fn(); +jest.doMock('@elastic/elasticsearch', () => { + const actual = jest.requireActual('@elastic/elasticsearch'); + return { + ...actual, + Client: ClientMock, + }; +}); diff --git a/src/core/server/elasticsearch/client/configure_client.test.ts b/src/core/server/elasticsearch/client/configure_client.test.ts new file mode 100644 index 00000000000000..32da142764a786 --- /dev/null +++ b/src/core/server/elasticsearch/client/configure_client.test.ts @@ -0,0 +1,279 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { RequestEvent, errors } from '@elastic/elasticsearch'; +import { TransportRequestParams } from '@elastic/elasticsearch/lib/Transport'; + +import { parseClientOptionsMock, ClientMock } from './configure_client.test.mocks'; +import { loggingSystemMock } from '../../logging/logging_system.mock'; +import EventEmitter from 'events'; +import type { ElasticsearchClientConfig } from './client_config'; +import { configureClient } from './configure_client'; + +const createFakeConfig = ( + parts: Partial = {} +): ElasticsearchClientConfig => { + return ({ + type: 'fake-config', + ...parts, + } as unknown) as ElasticsearchClientConfig; +}; + +const createFakeClient = () => { + const client = new EventEmitter(); + jest.spyOn(client, 'on'); + return client; +}; + +const createApiResponse = ({ + body, + statusCode = 200, + headers = {}, + warnings = [], + params, +}: { + body: T; + statusCode?: number; + headers?: Record; + warnings?: string[]; + params?: TransportRequestParams; +}): RequestEvent => { + return { + body, + statusCode, + headers, + warnings, + meta: { + request: { + params: params!, + } as any, + } as any, + }; +}; + +describe('configureClient', () => { + let logger: ReturnType; + let config: ElasticsearchClientConfig; + + beforeEach(() => { + logger = loggingSystemMock.createLogger(); + config = createFakeConfig(); + parseClientOptionsMock.mockReturnValue({}); + ClientMock.mockImplementation(() => createFakeClient()); + }); + + afterEach(() => { + parseClientOptionsMock.mockReset(); + ClientMock.mockReset(); + }); + + it('calls `parseClientOptions` with the correct parameters', () => { + configureClient(config, { logger, scoped: false }); + + expect(parseClientOptionsMock).toHaveBeenCalledTimes(1); + expect(parseClientOptionsMock).toHaveBeenCalledWith(config, false); + + parseClientOptionsMock.mockClear(); + + configureClient(config, { logger, scoped: true }); + + expect(parseClientOptionsMock).toHaveBeenCalledTimes(1); + expect(parseClientOptionsMock).toHaveBeenCalledWith(config, true); + }); + + it('constructs a client using the options returned by `parseClientOptions`', () => { + const parsedOptions = { + nodes: ['http://localhost'], + }; + parseClientOptionsMock.mockReturnValue(parsedOptions); + + const client = configureClient(config, { logger, scoped: false }); + + expect(ClientMock).toHaveBeenCalledTimes(1); + expect(ClientMock).toHaveBeenCalledWith(parsedOptions); + expect(client).toBe(ClientMock.mock.results[0].value); + }); + + it('listens to client on `response` events', () => { + const client = configureClient(config, { logger, scoped: false }); + + expect(client.on).toHaveBeenCalledTimes(1); + expect(client.on).toHaveBeenCalledWith('response', expect.any(Function)); + }); + + describe('Client logging', () => { + it('logs error when the client emits an error', () => { + const client = configureClient(config, { logger, scoped: false }); + + const response = createApiResponse({ + body: { + error: { + type: 'error message', + }, + }, + }); + client.emit('response', new errors.ResponseError(response), null); + client.emit('response', new Error('some error'), null); + + expect(loggingSystemMock.collect(logger).error).toMatchInlineSnapshot(` + Array [ + Array [ + "ResponseError: error message", + ], + Array [ + "Error: some error", + ], + ] + `); + }); + + it('logs each queries if `logQueries` is true', () => { + const client = configureClient( + createFakeConfig({ + logQueries: true, + }), + { logger, scoped: false } + ); + + const response = createApiResponse({ + body: {}, + statusCode: 200, + params: { + method: 'GET', + path: '/foo', + querystring: { hello: 'dolly' }, + }, + }); + + client.emit('response', null, response); + + expect(loggingSystemMock.collect(logger).debug).toMatchInlineSnapshot(` + Array [ + Array [ + "200 + GET /foo + hello=dolly", + Object { + "tags": Array [ + "query", + ], + }, + ], + ] + `); + }); + + it('properly encode queries', () => { + const client = configureClient( + createFakeConfig({ + logQueries: true, + }), + { logger, scoped: false } + ); + + const response = createApiResponse({ + body: {}, + statusCode: 200, + params: { + method: 'GET', + path: '/foo', + querystring: { city: 'Münich' }, + }, + }); + + client.emit('response', null, response); + + expect(loggingSystemMock.collect(logger).debug).toMatchInlineSnapshot(` + Array [ + Array [ + "200 + GET /foo + city=M%C3%BCnich", + Object { + "tags": Array [ + "query", + ], + }, + ], + ] + `); + }); + + it('logs queries even in case of errors if `logQueries` is true', () => { + const client = configureClient( + createFakeConfig({ + logQueries: true, + }), + { logger, scoped: false } + ); + + const response = createApiResponse({ + statusCode: 500, + body: { + error: { + type: 'internal server error', + }, + }, + params: { + method: 'GET', + path: '/foo', + querystring: { hello: 'dolly' }, + }, + }); + client.emit('response', new errors.ResponseError(response), response); + + expect(loggingSystemMock.collect(logger).debug).toMatchInlineSnapshot(` + Array [ + Array [ + "500 + GET /foo + hello=dolly", + Object { + "tags": Array [ + "query", + ], + }, + ], + ] + `); + }); + + it('does not log queries if `logQueries` is false', () => { + const client = configureClient( + createFakeConfig({ + logQueries: false, + }), + { logger, scoped: false } + ); + + const response = createApiResponse({ + body: {}, + statusCode: 200, + params: { + method: 'GET', + path: '/foo', + }, + }); + + client.emit('response', null, response); + + expect(logger.debug).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/src/core/server/elasticsearch/client/configure_client.ts b/src/core/server/elasticsearch/client/configure_client.ts new file mode 100644 index 00000000000000..5377f8ca1b0706 --- /dev/null +++ b/src/core/server/elasticsearch/client/configure_client.ts @@ -0,0 +1,65 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { stringify } from 'querystring'; +import { Client } from '@elastic/elasticsearch'; +import { Logger } from '../../logging'; +import { parseClientOptions, ElasticsearchClientConfig } from './client_config'; + +export const configureClient = ( + config: ElasticsearchClientConfig, + { logger, scoped = false }: { logger: Logger; scoped?: boolean } +): Client => { + const clientOptions = parseClientOptions(config, scoped); + + const client = new Client(clientOptions); + addLogging(client, logger, config.logQueries); + + return client; +}; + +const addLogging = (client: Client, logger: Logger, logQueries: boolean) => { + client.on('response', (err, event) => { + if (err) { + logger.error(`${err.name}: ${err.message}`); + } + if (event && logQueries) { + const params = event.meta.request.params; + + // definition is wrong, `params.querystring` can be either a string or an object + const querystring = convertQueryString(params.querystring); + + logger.debug( + `${event.statusCode}\n${params.method} ${params.path}${ + querystring ? `\n${querystring}` : '' + }`, + { + tags: ['query'], + } + ); + } + }); +}; + +const convertQueryString = (qs: string | Record | undefined): string => { + if (qs === undefined || typeof qs === 'string') { + return qs ?? ''; + } + return stringify(qs); +}; diff --git a/src/core/server/elasticsearch/client/index.ts b/src/core/server/elasticsearch/client/index.ts new file mode 100644 index 00000000000000..18e84482024ca1 --- /dev/null +++ b/src/core/server/elasticsearch/client/index.ts @@ -0,0 +1,24 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export { ElasticsearchClient } from './types'; +export { IScopedClusterClient, ScopedClusterClient } from './scoped_cluster_client'; +export { ElasticsearchClientConfig } from './client_config'; +export { IClusterClient, ICustomClusterClient, ClusterClient } from './cluster_client'; +export { configureClient } from './configure_client'; diff --git a/src/core/server/elasticsearch/client/mocks.test.ts b/src/core/server/elasticsearch/client/mocks.test.ts new file mode 100644 index 00000000000000..b882f8d0c5d79c --- /dev/null +++ b/src/core/server/elasticsearch/client/mocks.test.ts @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { elasticsearchClientMock } from './mocks'; + +describe('Mocked client', () => { + let client: ReturnType; + + const expectMocked = (fn: jest.MockedFunction | undefined) => { + expect(fn).toBeDefined(); + expect(fn.mockReturnValue).toEqual(expect.any(Function)); + }; + + beforeEach(() => { + client = elasticsearchClientMock.createInternalClient(); + }); + + it('`transport.request` should be mocked', () => { + expectMocked(client.transport.request); + }); + + it('root level API methods should be mocked', () => { + expectMocked(client.bulk); + expectMocked(client.search); + }); + + it('nested level API methods should be mocked', () => { + expectMocked(client.asyncSearch.get); + expectMocked(client.nodes.info); + }); + + it('`close` should be mocked', () => { + expectMocked(client.close); + }); + + it('`child` should be mocked and return a mocked Client', () => { + expectMocked(client.child); + + const child = client.child(); + + expect(child).not.toBe(client); + expectMocked(child.search); + }); +}); diff --git a/src/core/server/elasticsearch/client/mocks.ts b/src/core/server/elasticsearch/client/mocks.ts new file mode 100644 index 00000000000000..75644435a7f2a7 --- /dev/null +++ b/src/core/server/elasticsearch/client/mocks.ts @@ -0,0 +1,148 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Client, ApiResponse } from '@elastic/elasticsearch'; +import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport'; +import { ElasticsearchClient } from './types'; +import { ICustomClusterClient } from './cluster_client'; + +const createInternalClientMock = (): DeeplyMockedKeys => { + // we mimic 'reflection' on a concrete instance of the client to generate the mocked functions. + const client = new Client({ + node: 'http://localhost', + }) as any; + + const blackListedProps = [ + '_events', + '_eventsCount', + '_maxListeners', + 'name', + 'serializer', + 'connectionPool', + 'transport', + 'helpers', + ]; + + const mockify = (obj: Record, blacklist: string[] = []) => { + Object.keys(obj) + .filter((key) => !blacklist.includes(key)) + .forEach((key) => { + const propType = typeof obj[key]; + if (propType === 'function') { + obj[key] = jest.fn(); + } else if (propType === 'object' && obj[key] != null) { + mockify(obj[key]); + } + }); + }; + + mockify(client, blackListedProps); + + client.transport = { + request: jest.fn(), + }; + client.close = jest.fn().mockReturnValue(Promise.resolve()); + client.child = jest.fn().mockImplementation(() => createInternalClientMock()); + + return (client as unknown) as DeeplyMockedKeys; +}; + +export type ElasticSearchClientMock = DeeplyMockedKeys; + +const createClientMock = (): ElasticSearchClientMock => + (createInternalClientMock() as unknown) as ElasticSearchClientMock; + +interface ScopedClusterClientMock { + asInternalUser: ElasticSearchClientMock; + asCurrentUser: ElasticSearchClientMock; +} + +const createScopedClusterClientMock = () => { + const mock: ScopedClusterClientMock = { + asInternalUser: createClientMock(), + asCurrentUser: createClientMock(), + }; + + return mock; +}; + +export interface ClusterClientMock { + asInternalUser: ElasticSearchClientMock; + asScoped: jest.MockedFunction<() => ScopedClusterClientMock>; +} + +const createClusterClientMock = () => { + const mock: ClusterClientMock = { + asInternalUser: createClientMock(), + asScoped: jest.fn(), + }; + + mock.asScoped.mockReturnValue(createScopedClusterClientMock()); + + return mock; +}; + +export type CustomClusterClientMock = jest.Mocked & ClusterClientMock; + +const createCustomClusterClientMock = () => { + const mock: CustomClusterClientMock = { + asInternalUser: createClientMock(), + asScoped: jest.fn(), + close: jest.fn(), + }; + + mock.asScoped.mockReturnValue(createScopedClusterClientMock()); + mock.close.mockReturnValue(Promise.resolve()); + + return mock; +}; + +export type MockedTransportRequestPromise = TransportRequestPromise & { + abort: jest.MockedFunction<() => undefined>; +}; + +const createMockedClientResponse = (body: T): MockedTransportRequestPromise> => { + const response: ApiResponse = { + body, + statusCode: 200, + warnings: [], + headers: {}, + meta: {} as any, + }; + const promise = Promise.resolve(response); + (promise as MockedTransportRequestPromise>).abort = jest.fn(); + + return promise as MockedTransportRequestPromise>; +}; + +const createMockedClientError = (err: any): MockedTransportRequestPromise => { + const promise = Promise.reject(err); + (promise as MockedTransportRequestPromise).abort = jest.fn(); + return promise as MockedTransportRequestPromise; +}; + +export const elasticsearchClientMock = { + createClusterClient: createClusterClientMock, + createCustomClusterClient: createCustomClusterClientMock, + createScopedClusterClient: createScopedClusterClientMock, + createElasticSearchClient: createClientMock, + createInternalClient: createInternalClientMock, + createClientResponse: createMockedClientResponse, + createClientError: createMockedClientError, +}; diff --git a/src/core/server/elasticsearch/client/scoped_cluster_client.test.ts b/src/core/server/elasticsearch/client/scoped_cluster_client.test.ts new file mode 100644 index 00000000000000..78ca8fcbd3c073 --- /dev/null +++ b/src/core/server/elasticsearch/client/scoped_cluster_client.test.ts @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { elasticsearchClientMock } from './mocks'; +import { ScopedClusterClient } from './scoped_cluster_client'; + +describe('ScopedClusterClient', () => { + it('uses the internal client passed in the constructor', () => { + const internalClient = elasticsearchClientMock.createElasticSearchClient(); + const scopedClient = elasticsearchClientMock.createElasticSearchClient(); + + const scopedClusterClient = new ScopedClusterClient(internalClient, scopedClient); + + expect(scopedClusterClient.asInternalUser).toBe(internalClient); + }); + + it('uses the scoped client passed in the constructor', () => { + const internalClient = elasticsearchClientMock.createElasticSearchClient(); + const scopedClient = elasticsearchClientMock.createElasticSearchClient(); + + const scopedClusterClient = new ScopedClusterClient(internalClient, scopedClient); + + expect(scopedClusterClient.asCurrentUser).toBe(scopedClient); + }); +}); diff --git a/src/core/server/elasticsearch/client/scoped_cluster_client.ts b/src/core/server/elasticsearch/client/scoped_cluster_client.ts new file mode 100644 index 00000000000000..1af7948a65e168 --- /dev/null +++ b/src/core/server/elasticsearch/client/scoped_cluster_client.ts @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { ElasticsearchClient } from './types'; + +/** + * Serves the same purpose as the normal {@link ClusterClient | cluster client} but exposes + * an additional `asCurrentUser` method that doesn't use credentials of the Kibana internal + * user (as `asInternalUser` does) to request Elasticsearch API, but rather passes HTTP headers + * extracted from the current user request to the API instead. + * + * @public + **/ +export interface IScopedClusterClient { + /** + * A {@link ElasticsearchClient | client} to be used to query the elasticsearch cluster + * on behalf of the internal Kibana user. + */ + readonly asInternalUser: ElasticsearchClient; + /** + * A {@link ElasticsearchClient | client} to be used to query the elasticsearch cluster + * on behalf of the user that initiated the request to the Kibana server. + */ + readonly asCurrentUser: ElasticsearchClient; +} + +/** @internal **/ +export class ScopedClusterClient implements IScopedClusterClient { + constructor( + public readonly asInternalUser: ElasticsearchClient, + public readonly asCurrentUser: ElasticsearchClient + ) {} +} diff --git a/src/core/server/elasticsearch/client/types.ts b/src/core/server/elasticsearch/client/types.ts new file mode 100644 index 00000000000000..934120c330e920 --- /dev/null +++ b/src/core/server/elasticsearch/client/types.ts @@ -0,0 +1,42 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import type { Client } from '@elastic/elasticsearch'; +import type { + ApiResponse, + TransportRequestOptions, + TransportRequestParams, +} from '@elastic/elasticsearch/lib/Transport'; + +/** + * Client used to query the elasticsearch cluster. + * + * @public + */ +export type ElasticsearchClient = Omit< + Client, + 'connectionPool' | 'transport' | 'serializer' | 'extend' | 'helpers' | 'child' | 'close' +> & { + transport: { + request( + params: TransportRequestParams, + options?: TransportRequestOptions + ): Promise; + }; +}; diff --git a/src/core/server/elasticsearch/elasticsearch_service.mock.ts b/src/core/server/elasticsearch/elasticsearch_service.mock.ts index f524781de4c7ed..b97f6df6b0afcf 100644 --- a/src/core/server/elasticsearch/elasticsearch_service.mock.ts +++ b/src/core/server/elasticsearch/elasticsearch_service.mock.ts @@ -19,6 +19,11 @@ import { BehaviorSubject } from 'rxjs'; import { ILegacyClusterClient, ILegacyCustomClusterClient } from './legacy'; +import { + elasticsearchClientMock, + ClusterClientMock, + CustomClusterClientMock, +} from './client/mocks'; import { legacyClientMock } from './legacy/mocks'; import { ElasticsearchConfig } from './elasticsearch_config'; import { ElasticsearchService } from './elasticsearch_service'; @@ -33,6 +38,13 @@ interface MockedElasticSearchServiceSetup { }; } +type MockedElasticSearchServiceStart = MockedElasticSearchServiceSetup; + +interface MockedInternalElasticSearchServiceStart extends MockedElasticSearchServiceStart { + client: ClusterClientMock; + createClient: jest.MockedFunction<() => CustomClusterClientMock>; +} + const createSetupContractMock = () => { const setupContract: MockedElasticSearchServiceSetup = { legacy: { @@ -47,8 +59,6 @@ const createSetupContractMock = () => { return setupContract; }; -type MockedElasticSearchServiceStart = MockedElasticSearchServiceSetup; - const createStartContractMock = () => { const startContract: MockedElasticSearchServiceStart = { legacy: { @@ -60,6 +70,17 @@ const createStartContractMock = () => { startContract.legacy.client.asScoped.mockReturnValue( legacyClientMock.createScopedClusterClient() ); + return startContract; +}; + +const createInternalStartContractMock = () => { + const startContract: MockedInternalElasticSearchServiceStart = { + ...createStartContractMock(), + client: elasticsearchClientMock.createClusterClient(), + createClient: jest.fn(), + }; + + startContract.createClient.mockReturnValue(elasticsearchClientMock.createCustomClusterClient()); return startContract; }; @@ -100,7 +121,7 @@ const createMock = () => { stop: jest.fn(), }; mocked.setup.mockResolvedValue(createInternalSetupContractMock()); - mocked.start.mockResolvedValueOnce(createStartContractMock()); + mocked.start.mockResolvedValueOnce(createInternalStartContractMock()); mocked.stop.mockResolvedValue(); return mocked; }; @@ -109,6 +130,7 @@ export const elasticsearchServiceMock = { create: createMock, createInternalSetup: createInternalSetupContractMock, createSetup: createSetupContractMock, + createInternalStart: createInternalStartContractMock, createStart: createStartContractMock, createLegacyClusterClient: legacyClientMock.createClusterClient, createLegacyCustomClusterClient: legacyClientMock.createCustomClusterClient, diff --git a/src/core/server/elasticsearch/elasticsearch_service.test.mocks.ts b/src/core/server/elasticsearch/elasticsearch_service.test.mocks.ts index c30230a7847a0e..955ab197ffce1e 100644 --- a/src/core/server/elasticsearch/elasticsearch_service.test.mocks.ts +++ b/src/core/server/elasticsearch/elasticsearch_service.test.mocks.ts @@ -17,5 +17,8 @@ * under the License. */ +export const MockLegacyClusterClient = jest.fn(); +jest.mock('./legacy/cluster_client', () => ({ LegacyClusterClient: MockLegacyClusterClient })); + export const MockClusterClient = jest.fn(); -jest.mock('./legacy/cluster_client', () => ({ LegacyClusterClient: MockClusterClient })); +jest.mock('./client/cluster_client', () => ({ ClusterClient: MockClusterClient })); diff --git a/src/core/server/elasticsearch/elasticsearch_service.test.ts b/src/core/server/elasticsearch/elasticsearch_service.test.ts index 8f3dc5688f6fc0..b36af2a7e4671f 100644 --- a/src/core/server/elasticsearch/elasticsearch_service.test.ts +++ b/src/core/server/elasticsearch/elasticsearch_service.test.ts @@ -19,7 +19,7 @@ import { first } from 'rxjs/operators'; -import { MockClusterClient } from './elasticsearch_service.test.mocks'; +import { MockLegacyClusterClient, MockClusterClient } from './elasticsearch_service.test.mocks'; import { BehaviorSubject } from 'rxjs'; import { Env } from '../config'; @@ -28,9 +28,11 @@ import { CoreContext } from '../core_context'; import { configServiceMock } from '../config/config_service.mock'; import { loggingSystemMock } from '../logging/logging_system.mock'; import { httpServiceMock } from '../http/http_service.mock'; +import { auditTrailServiceMock } from '../audit_trail/audit_trail_service.mock'; import { ElasticsearchConfig } from './elasticsearch_config'; import { ElasticsearchService } from './elasticsearch_service'; import { elasticsearchServiceMock } from './elasticsearch_service.mock'; +import { elasticsearchClientMock } from './client/mocks'; import { duration } from 'moment'; const delay = async (durationMs: number) => @@ -38,9 +40,12 @@ const delay = async (durationMs: number) => let elasticsearchService: ElasticsearchService; const configService = configServiceMock.create(); -const deps = { +const setupDeps = { http: httpServiceMock.createInternalSetupContract(), }; +const startDeps = { + auditTrail: auditTrailServiceMock.createStartContract(), +}; configService.atPath.mockReturnValue( new BehaviorSubject({ hosts: ['http://1.2.3.4'], @@ -56,49 +61,58 @@ configService.atPath.mockReturnValue( let env: Env; let coreContext: CoreContext; const logger = loggingSystemMock.create(); + +let mockClusterClientInstance: ReturnType; +let mockLegacyClusterClientInstance: ReturnType; + beforeEach(() => { env = Env.createDefault(getEnvOptions()); coreContext = { coreId: Symbol(), env, logger, configService: configService as any }; elasticsearchService = new ElasticsearchService(coreContext); + + MockLegacyClusterClient.mockClear(); + MockClusterClient.mockClear(); + + mockLegacyClusterClientInstance = elasticsearchServiceMock.createLegacyCustomClusterClient(); + MockLegacyClusterClient.mockImplementation(() => mockLegacyClusterClientInstance); + mockClusterClientInstance = elasticsearchClientMock.createCustomClusterClient(); + MockClusterClient.mockImplementation(() => mockClusterClientInstance); }); afterEach(() => jest.clearAllMocks()); describe('#setup', () => { it('returns legacy Elasticsearch config as a part of the contract', async () => { - const setupContract = await elasticsearchService.setup(deps); + const setupContract = await elasticsearchService.setup(setupDeps); await expect(setupContract.legacy.config$.pipe(first()).toPromise()).resolves.toBeInstanceOf( ElasticsearchConfig ); }); - it('returns elasticsearch client as a part of the contract', async () => { - const mockClusterClientInstance = elasticsearchServiceMock.createLegacyClusterClient(); - MockClusterClient.mockImplementationOnce(() => mockClusterClientInstance); - - const setupContract = await elasticsearchService.setup(deps); + it('returns legacy elasticsearch client as a part of the contract', async () => { + const setupContract = await elasticsearchService.setup(setupDeps); const client = setupContract.legacy.client; - expect(mockClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0); + expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0); await client.callAsInternalUser('any'); - expect(mockClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); }); - describe('#createClient', () => { + describe('#createLegacyClient', () => { it('allows to specify config properties', async () => { - const setupContract = await elasticsearchService.setup(deps); + const setupContract = await elasticsearchService.setup(setupDeps); - const mockClusterClientInstance = { close: jest.fn() }; - MockClusterClient.mockImplementation(() => mockClusterClientInstance); + // reset all mocks called during setup phase + MockLegacyClusterClient.mockClear(); const customConfig = { logQueries: true }; const clusterClient = setupContract.legacy.createClient('some-custom-type', customConfig); - expect(clusterClient).toBe(mockClusterClientInstance); + expect(clusterClient).toBe(mockLegacyClusterClientInstance); - expect(MockClusterClient).toHaveBeenCalledWith( + expect(MockLegacyClusterClient).toHaveBeenCalledWith( expect.objectContaining(customConfig), expect.objectContaining({ context: ['elasticsearch', 'some-custom-type'] }), expect.any(Function), @@ -107,9 +121,10 @@ describe('#setup', () => { }); it('falls back to elasticsearch default config values if property not specified', async () => { - const setupContract = await elasticsearchService.setup(deps); + const setupContract = await elasticsearchService.setup(setupDeps); + // reset all mocks called during setup phase - MockClusterClient.mockClear(); + MockLegacyClusterClient.mockClear(); const customConfig = { hosts: ['http://8.8.8.8'], @@ -118,7 +133,7 @@ describe('#setup', () => { }; setupContract.legacy.createClient('some-custom-type', customConfig); - const config = MockClusterClient.mock.calls[0][0]; + const config = MockLegacyClusterClient.mock.calls[0][0]; expect(config).toMatchInlineSnapshot(` Object { "healthCheckDelay": "PT0.01S", @@ -137,13 +152,14 @@ describe('#setup', () => { `); }); it('falls back to elasticsearch config if custom config not passed', async () => { - const setupContract = await elasticsearchService.setup(deps); + const setupContract = await elasticsearchService.setup(setupDeps); + // reset all mocks called during setup phase - MockClusterClient.mockClear(); + MockLegacyClusterClient.mockClear(); setupContract.legacy.createClient('another-type'); - const config = MockClusterClient.mock.calls[0][0]; + const config = MockLegacyClusterClient.mock.calls[0][0]; expect(config).toMatchInlineSnapshot(` Object { "healthCheckDelay": "PT0.01S", @@ -178,9 +194,10 @@ describe('#setup', () => { } as any) ); elasticsearchService = new ElasticsearchService(coreContext); - const setupContract = await elasticsearchService.setup(deps); + const setupContract = await elasticsearchService.setup(setupDeps); + // reset all mocks called during setup phase - MockClusterClient.mockClear(); + MockLegacyClusterClient.mockClear(); const customConfig = { hosts: ['http://8.8.8.8'], @@ -189,7 +206,7 @@ describe('#setup', () => { }; setupContract.legacy.createClient('some-custom-type', customConfig); - const config = MockClusterClient.mock.calls[0][0]; + const config = MockLegacyClusterClient.mock.calls[0][0]; expect(config).toMatchInlineSnapshot(` Object { "healthCheckDelay": "PT2S", @@ -210,66 +227,142 @@ describe('#setup', () => { }); it('esNodeVersionCompatibility$ only starts polling when subscribed to', async (done) => { - const clusterClientInstance = elasticsearchServiceMock.createLegacyClusterClient(); - MockClusterClient.mockImplementationOnce(() => clusterClientInstance); + mockLegacyClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error()); - clusterClientInstance.callAsInternalUser.mockRejectedValue(new Error()); - - const setupContract = await elasticsearchService.setup(deps); + const setupContract = await elasticsearchService.setup(setupDeps); await delay(10); - expect(clusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0); + expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0); setupContract.esNodesCompatibility$.subscribe(() => { - expect(clusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); done(); }); }); it('esNodeVersionCompatibility$ stops polling when unsubscribed from', async (done) => { - const mockClusterClientInstance = elasticsearchServiceMock.createLegacyClusterClient(); - MockClusterClient.mockImplementationOnce(() => mockClusterClientInstance); - - mockClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error()); + mockLegacyClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error()); - const setupContract = await elasticsearchService.setup(deps); + const setupContract = await elasticsearchService.setup(setupDeps); - expect(mockClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0); + expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0); const sub = setupContract.esNodesCompatibility$.subscribe(async () => { sub.unsubscribe(); await delay(100); - expect(mockClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); done(); }); }); }); -describe('#stop', () => { - it('stops both admin and data clients', async () => { - const mockClusterClientInstance = { close: jest.fn() }; - MockClusterClient.mockImplementationOnce(() => mockClusterClientInstance); +describe('#start', () => { + it('throws if called before `setup`', async () => { + expect(() => elasticsearchService.start(startDeps)).rejects.toMatchInlineSnapshot( + `[Error: ElasticsearchService needs to be setup before calling start]` + ); + }); + + it('returns elasticsearch client as a part of the contract', async () => { + await elasticsearchService.setup(setupDeps); + const startContract = await elasticsearchService.start(startDeps); + const client = startContract.client; + + expect(client.asInternalUser).toBe(mockClusterClientInstance.asInternalUser); + }); + + describe('#createClient', () => { + it('allows to specify config properties', async () => { + await elasticsearchService.setup(setupDeps); + const startContract = await elasticsearchService.start(startDeps); + + // reset all mocks called during setup phase + MockClusterClient.mockClear(); + + const customConfig = { logQueries: true }; + const clusterClient = startContract.createClient('custom-type', customConfig); + + expect(clusterClient).toBe(mockClusterClientInstance); + + expect(MockClusterClient).toHaveBeenCalledTimes(1); + expect(MockClusterClient).toHaveBeenCalledWith( + expect.objectContaining(customConfig), + expect.objectContaining({ context: ['elasticsearch', 'custom-type'] }), + expect.any(Function) + ); + }); + it('creates a new client on each call', async () => { + await elasticsearchService.setup(setupDeps); + const startContract = await elasticsearchService.start(startDeps); + + // reset all mocks called during setup phase + MockClusterClient.mockClear(); + + const customConfig = { logQueries: true }; + + startContract.createClient('custom-type', customConfig); + startContract.createClient('another-type', customConfig); + + expect(MockClusterClient).toHaveBeenCalledTimes(2); + }); + + it('falls back to elasticsearch default config values if property not specified', async () => { + await elasticsearchService.setup(setupDeps); + const startContract = await elasticsearchService.start(startDeps); + + // reset all mocks called during setup phase + MockClusterClient.mockClear(); + + const customConfig = { + hosts: ['http://8.8.8.8'], + logQueries: true, + ssl: { certificate: 'certificate-value' }, + }; + + startContract.createClient('some-custom-type', customConfig); + const config = MockClusterClient.mock.calls[0][0]; - await elasticsearchService.setup(deps); + expect(config).toMatchInlineSnapshot(` + Object { + "healthCheckDelay": "PT0.01S", + "hosts": Array [ + "http://8.8.8.8", + ], + "logQueries": true, + "requestHeadersWhitelist": Array [ + undefined, + ], + "ssl": Object { + "certificate": "certificate-value", + "verificationMode": "none", + }, + } + `); + }); + }); +}); + +describe('#stop', () => { + it('stops both legacy and new clients', async () => { + await elasticsearchService.setup(setupDeps); + await elasticsearchService.start(startDeps); await elasticsearchService.stop(); + expect(mockLegacyClusterClientInstance.close).toHaveBeenCalledTimes(1); expect(mockClusterClientInstance.close).toHaveBeenCalledTimes(1); }); it('stops pollEsNodeVersions even if there are active subscriptions', async (done) => { expect.assertions(2); - const mockClusterClientInstance = elasticsearchServiceMock.createLegacyCustomClusterClient(); - - MockClusterClient.mockImplementationOnce(() => mockClusterClientInstance); - mockClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error()); + mockLegacyClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error()); - const setupContract = await elasticsearchService.setup(deps); + const setupContract = await elasticsearchService.setup(setupDeps); setupContract.esNodesCompatibility$.subscribe(async () => { - expect(mockClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); await elasticsearchService.stop(); await delay(100); - expect(mockClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); done(); }); }); diff --git a/src/core/server/elasticsearch/elasticsearch_service.ts b/src/core/server/elasticsearch/elasticsearch_service.ts index 4ea10f6ae4e2ed..9b05fb9887a3b9 100644 --- a/src/core/server/elasticsearch/elasticsearch_service.ts +++ b/src/core/server/elasticsearch/elasticsearch_service.ts @@ -17,17 +17,8 @@ * under the License. */ -import { ConnectableObservable, Observable, Subscription, Subject } from 'rxjs'; -import { - filter, - first, - map, - publishReplay, - switchMap, - take, - shareReplay, - takeUntil, -} from 'rxjs/operators'; +import { Observable, Subject } from 'rxjs'; +import { first, map, shareReplay, takeUntil } from 'rxjs/operators'; import { CoreService } from '../../types'; import { merge } from '../../utils'; @@ -35,28 +26,17 @@ import { CoreContext } from '../core_context'; import { Logger } from '../logging'; import { LegacyClusterClient, - ILegacyClusterClient, ILegacyCustomClusterClient, LegacyElasticsearchClientConfig, - LegacyCallAPIOptions, } from './legacy'; +import { ClusterClient, ICustomClusterClient, ElasticsearchClientConfig } from './client'; import { ElasticsearchConfig, ElasticsearchConfigType } from './elasticsearch_config'; import { InternalHttpServiceSetup, GetAuthHeaders } from '../http/'; import { AuditTrailStart, AuditorFactory } from '../audit_trail'; -import { - InternalElasticsearchServiceSetup, - ElasticsearchServiceStart, - ScopeableRequest, -} from './types'; +import { InternalElasticsearchServiceSetup, InternalElasticsearchServiceStart } from './types'; import { pollEsNodesVersion } from './version_check/ensure_es_version'; import { calculateStatus$ } from './status'; -/** @internal */ -interface CoreClusterClients { - config: ElasticsearchConfig; - client: LegacyClusterClient; -} - interface SetupDeps { http: InternalHttpServiceSetup; } @@ -67,18 +47,21 @@ interface StartDeps { /** @internal */ export class ElasticsearchService - implements CoreService { + implements CoreService { private readonly log: Logger; private readonly config$: Observable; - private subscription?: Subscription; private auditorFactory?: AuditorFactory; private stop$ = new Subject(); private kibanaVersion: string; - private createClient?: ( + private getAuthHeaders?: GetAuthHeaders; + + private createLegacyCustomClient?: ( type: string, clientConfig?: Partial ) => ILegacyCustomClusterClient; - private client?: ILegacyClusterClient; + private legacyClient?: LegacyClusterClient; + + private client?: ClusterClient; constructor(private readonly coreContext: CoreContext) { this.kibanaVersion = coreContext.env.packageInfo.version; @@ -91,139 +74,86 @@ export class ElasticsearchService public async setup(deps: SetupDeps): Promise { this.log.debug('Setting up elasticsearch service'); - const clients$ = this.config$.pipe( - filter(() => { - if (this.subscription !== undefined) { - this.log.error('Clients cannot be changed after they are created'); - return false; - } - - return true; - }), - switchMap( - (config) => - new Observable((subscriber) => { - this.log.debug('Creating elasticsearch client'); - - const coreClients = { - config, - client: this.createClusterClient('data', config, deps.http.getAuthHeaders), - }; - - subscriber.next(coreClients); - - return () => { - this.log.debug('Closing elasticsearch client'); - - coreClients.client.close(); - }; - }) - ), - publishReplay(1) - ) as ConnectableObservable; - - this.subscription = clients$.connect(); - const config = await this.config$.pipe(first()).toPromise(); - const client$ = clients$.pipe(map((clients) => clients.client)); - - const client = { - async callAsInternalUser( - endpoint: string, - clientParams: Record = {}, - options?: LegacyCallAPIOptions - ) { - const _client = await client$.pipe(take(1)).toPromise(); - return await _client.callAsInternalUser(endpoint, clientParams, options); - }, - asScoped(request: ScopeableRequest) { - const _clientPromise = client$.pipe(take(1)).toPromise(); - return { - async callAsInternalUser( - endpoint: string, - clientParams: Record = {}, - options?: LegacyCallAPIOptions - ) { - const _client = await _clientPromise; - return await _client - .asScoped(request) - .callAsInternalUser(endpoint, clientParams, options); - }, - async callAsCurrentUser( - endpoint: string, - clientParams: Record = {}, - options?: LegacyCallAPIOptions - ) { - const _client = await _clientPromise; - return await _client - .asScoped(request) - .callAsCurrentUser(endpoint, clientParams, options); - }, - }; - }, - }; - - this.client = client; + this.getAuthHeaders = deps.http.getAuthHeaders; + this.legacyClient = this.createLegacyClusterClient('data', config); const esNodesCompatibility$ = pollEsNodesVersion({ - callWithInternalUser: client.callAsInternalUser, + callWithInternalUser: this.legacyClient.callAsInternalUser, log: this.log, ignoreVersionMismatch: config.ignoreVersionMismatch, esVersionCheckInterval: config.healthCheckDelay.asMilliseconds(), kibanaVersion: this.kibanaVersion, }).pipe(takeUntil(this.stop$), shareReplay({ refCount: true, bufferSize: 1 })); - this.createClient = ( - type: string, - clientConfig: Partial = {} - ) => { + this.createLegacyCustomClient = (type, clientConfig = {}) => { const finalConfig = merge({}, config, clientConfig); - return this.createClusterClient(type, finalConfig, deps.http.getAuthHeaders); + return this.createLegacyClusterClient(type, finalConfig); }; return { legacy: { - config$: clients$.pipe(map((clients) => clients.config)), - client, - createClient: this.createClient, + config$: this.config$, + client: this.legacyClient, + createClient: this.createLegacyCustomClient, }, esNodesCompatibility$, status$: calculateStatus$(esNodesCompatibility$), }; } - public async start({ auditTrail }: StartDeps) { + public async start({ auditTrail }: StartDeps): Promise { this.auditorFactory = auditTrail; - if (typeof this.client === 'undefined' || typeof this.createClient === 'undefined') { + if (!this.legacyClient || !this.createLegacyCustomClient) { throw new Error('ElasticsearchService needs to be setup before calling start'); - } else { - return { - legacy: { - client: this.client, - createClient: this.createClient, - }, - }; } + + const config = await this.config$.pipe(first()).toPromise(); + this.client = this.createClusterClient('data', config); + + const createClient = ( + type: string, + clientConfig: Partial = {} + ): ICustomClusterClient => { + const finalConfig = merge({}, config, clientConfig); + return this.createClusterClient(type, finalConfig); + }; + + return { + client: this.client, + createClient, + legacy: { + client: this.legacyClient, + createClient: this.createLegacyCustomClient, + }, + }; } public async stop() { this.log.debug('Stopping elasticsearch service'); - if (this.subscription !== undefined) { - this.subscription.unsubscribe(); - } this.stop$.next(); + if (this.client) { + this.client.close(); + } + if (this.legacyClient) { + this.legacyClient.close(); + } } - private createClusterClient( - type: string, - config: LegacyElasticsearchClientConfig, - getAuthHeaders?: GetAuthHeaders - ) { + private createClusterClient(type: string, config: ElasticsearchClientConfig) { + return new ClusterClient( + config, + this.coreContext.logger.get('elasticsearch', type), + this.getAuthHeaders + ); + } + + private createLegacyClusterClient(type: string, config: LegacyElasticsearchClientConfig) { return new LegacyClusterClient( config, this.coreContext.logger.get('elasticsearch', type), this.getAuditorFactory, - getAuthHeaders + this.getAuthHeaders ); } diff --git a/src/core/server/elasticsearch/index.ts b/src/core/server/elasticsearch/index.ts index f5f5f5cc7b6f88..8bb77b5dfdee07 100644 --- a/src/core/server/elasticsearch/index.ts +++ b/src/core/server/elasticsearch/index.ts @@ -25,7 +25,15 @@ export { ElasticsearchServiceStart, ElasticsearchStatusMeta, InternalElasticsearchServiceSetup, + InternalElasticsearchServiceStart, FakeRequest, ScopeableRequest, } from './types'; export * from './legacy'; +export { + IClusterClient, + ICustomClusterClient, + ElasticsearchClientConfig, + ElasticsearchClient, + IScopedClusterClient, +} from './client'; diff --git a/src/core/server/elasticsearch/types.ts b/src/core/server/elasticsearch/types.ts index 2b4ba4b0a0a550..40399aecbc4466 100644 --- a/src/core/server/elasticsearch/types.ts +++ b/src/core/server/elasticsearch/types.ts @@ -26,6 +26,7 @@ import { ILegacyClusterClient, ILegacyCustomClusterClient, } from './legacy'; +import { IClusterClient, ICustomClusterClient, ElasticsearchClientConfig } from './client'; import { NodesVersionCompatibility } from './version_check/ensure_es_version'; import { ServiceStatus } from '../status'; @@ -80,6 +81,16 @@ export interface ElasticsearchServiceSetup { }; } +/** @internal */ +export interface InternalElasticsearchServiceSetup { + // Required for the BWC with the legacy Kibana only. + readonly legacy: ElasticsearchServiceSetup['legacy'] & { + readonly config$: Observable; + }; + esNodesCompatibility$: Observable; + status$: Observable>; +} + /** * @public */ @@ -103,7 +114,7 @@ export interface ElasticsearchServiceStart { * * @example * ```js - * const client = elasticsearch.createCluster('my-app-name', config); + * const client = elasticsearch.legacy.createClient('my-app-name', config); * const data = await client.callAsInternalUser(); * ``` */ @@ -113,26 +124,51 @@ export interface ElasticsearchServiceStart { ) => ILegacyCustomClusterClient; /** - * A pre-configured Elasticsearch client. All Elasticsearch config value changes are processed under the hood. - * See {@link ILegacyClusterClient}. + * A pre-configured {@link ILegacyClusterClient | legacy Elasticsearch client}. * * @example * ```js - * const client = core.elasticsearch.client; + * const client = core.elasticsearch.legacy.client; * ``` */ readonly client: ILegacyClusterClient; }; } -/** @internal */ -export interface InternalElasticsearchServiceSetup { - // Required for the BWC with the legacy Kibana only. - readonly legacy: ElasticsearchServiceSetup['legacy'] & { - readonly config$: Observable; - }; - esNodesCompatibility$: Observable; - status$: Observable>; +/** + * @internal + */ +export interface InternalElasticsearchServiceStart extends ElasticsearchServiceStart { + /** + * A pre-configured {@link IClusterClient | Elasticsearch client} + * + * @example + * ```js + * const client = core.elasticsearch.client; + * ``` + */ + readonly client: IClusterClient; + /** + * Create application specific Elasticsearch cluster API client with customized config. See {@link IClusterClient}. + * + * @param type Unique identifier of the client + * @param clientConfig A config consists of Elasticsearch JS client options and + * valid sub-set of Elasticsearch service config. + * We fill all the missing properties in the `clientConfig` using the default + * Elasticsearch config so that we don't depend on default values set and + * controlled by underlying Elasticsearch JS client. + * We don't run validation against the passed config and expect it to be valid. + * + * @example + * ```js + * const client = elasticsearch.createClient('my-app-name', config); + * const data = await client.asInternalUser().search(); + * ``` + */ + readonly createClient: ( + type: string, + clientConfig?: Partial + ) => ICustomClusterClient; } /** @public */ diff --git a/src/core/server/elasticsearch/version_check/ensure_es_version.ts b/src/core/server/elasticsearch/version_check/ensure_es_version.ts index 3f562dac22a028..dc56d982d7b4a2 100644 --- a/src/core/server/elasticsearch/version_check/ensure_es_version.ts +++ b/src/core/server/elasticsearch/version_check/ensure_es_version.ts @@ -29,7 +29,7 @@ import { esVersionEqualsKibana, } from './es_kibana_version_compatability'; import { Logger } from '../../logging'; -import { LegacyAPICaller } from '..'; +import { LegacyAPICaller } from '../legacy'; export interface PollEsNodesVersionOptions { callWithInternalUser: LegacyAPICaller; diff --git a/src/core/server/internal_types.ts b/src/core/server/internal_types.ts index 24080f2529beb7..4f4bf50f07b8e1 100644 --- a/src/core/server/internal_types.ts +++ b/src/core/server/internal_types.ts @@ -22,7 +22,10 @@ import { Type } from '@kbn/config-schema'; import { CapabilitiesSetup, CapabilitiesStart } from './capabilities'; import { ConfigDeprecationProvider } from './config'; import { ContextSetup } from './context'; -import { InternalElasticsearchServiceSetup, ElasticsearchServiceStart } from './elasticsearch'; +import { + InternalElasticsearchServiceSetup, + InternalElasticsearchServiceStart, +} from './elasticsearch'; import { InternalHttpServiceSetup, InternalHttpServiceStart } from './http'; import { InternalSavedObjectsServiceSetup, @@ -58,7 +61,7 @@ export interface InternalCoreSetup { */ export interface InternalCoreStart { capabilities: CapabilitiesStart; - elasticsearch: ElasticsearchServiceStart; + elasticsearch: InternalElasticsearchServiceStart; http: InternalHttpServiceStart; metrics: InternalMetricsServiceStart; savedObjects: InternalSavedObjectsServiceStart; diff --git a/src/core/server/mocks.ts b/src/core/server/mocks.ts index 75ca88627814b3..a3dbb279d19ebc 100644 --- a/src/core/server/mocks.ts +++ b/src/core/server/mocks.ts @@ -177,7 +177,7 @@ function createInternalCoreSetupMock() { function createInternalCoreStartMock() { const startDeps: InternalCoreStart = { capabilities: capabilitiesServiceMock.createStartContract(), - elasticsearch: elasticsearchServiceMock.createStart(), + elasticsearch: elasticsearchServiceMock.createInternalStart(), http: httpServiceMock.createInternalStartContract(), metrics: metricsServiceMock.createStartContract(), savedObjects: savedObjectsServiceMock.createInternalStartContract(), diff --git a/src/core/server/plugins/plugin_context.ts b/src/core/server/plugins/plugin_context.ts index b0f9ff6fd5ebd6..a6dd13a12b5278 100644 --- a/src/core/server/plugins/plugin_context.ts +++ b/src/core/server/plugins/plugin_context.ts @@ -210,7 +210,9 @@ export function createPluginStartContext( capabilities: { resolveCapabilities: deps.capabilities.resolveCapabilities, }, - elasticsearch: deps.elasticsearch, + elasticsearch: { + legacy: deps.elasticsearch.legacy, + }, http: { auth: deps.http.auth, basePath: deps.http.basePath, diff --git a/src/core/server/server.api.md b/src/core/server/server.api.md index ea95329bf8fa42..107edf11bb6f43 100644 --- a/src/core/server/server.api.md +++ b/src/core/server/server.api.md @@ -4,6 +4,7 @@ ```ts +import { ApiResponse } from '@elastic/elasticsearch/lib/Transport'; import Boom from 'boom'; import { BulkIndexDocumentsParams } from 'elasticsearch'; import { CatAliasesParams } from 'elasticsearch'; @@ -21,6 +22,8 @@ import { CatTasksParams } from 'elasticsearch'; import { CatThreadPoolParams } from 'elasticsearch'; import { ClearScrollParams } from 'elasticsearch'; import { Client } from 'elasticsearch'; +import { Client as Client_2 } from '@elastic/elasticsearch'; +import { ClientOptions } from '@elastic/elasticsearch'; import { ClusterAllocationExplainParams } from 'elasticsearch'; import { ClusterGetSettingsParams } from 'elasticsearch'; import { ClusterHealthParams } from 'elasticsearch'; @@ -138,6 +141,8 @@ import { TasksCancelParams } from 'elasticsearch'; import { TasksGetParams } from 'elasticsearch'; import { TasksListParams } from 'elasticsearch'; import { TermvectorsParams } from 'elasticsearch'; +import { TransportRequestOptions } from '@elastic/elasticsearch/lib/Transport'; +import { TransportRequestParams } from '@elastic/elasticsearch/lib/Transport'; import { Type } from '@kbn/config-schema'; import { TypeOf } from '@kbn/config-schema'; import { UpdateDocumentByQueryParams } from 'elasticsearch'; diff --git a/x-pack/test/api_integration/apis/fleet/agents/acks.ts b/x-pack/test/api_integration/apis/fleet/agents/acks.ts index 45833012cb475e..e8381aa9d59ea6 100644 --- a/x-pack/test/api_integration/apis/fleet/agents/acks.ts +++ b/x-pack/test/api_integration/apis/fleet/agents/acks.ts @@ -22,7 +22,7 @@ export default function (providerContext: FtrProviderContext) { before(async () => { await esArchiver.loadIfNeeded('fleet/agents'); - const { body: apiKeyBody } = await esClient.security.createApiKey({ + const { body: apiKeyBody } = await esClient.security.createApiKey({ body: { name: `test access api key: ${uuid.v4()}`, }, diff --git a/x-pack/test/api_integration/apis/fleet/agents/checkin.ts b/x-pack/test/api_integration/apis/fleet/agents/checkin.ts index d24f7f495a06c0..8942deafdd83ce 100644 --- a/x-pack/test/api_integration/apis/fleet/agents/checkin.ts +++ b/x-pack/test/api_integration/apis/fleet/agents/checkin.ts @@ -22,7 +22,7 @@ export default function (providerContext: FtrProviderContext) { before(async () => { await esArchiver.loadIfNeeded('fleet/agents'); - const { body: apiKeyBody } = await esClient.security.createApiKey({ + const { body: apiKeyBody } = await esClient.security.createApiKey({ body: { name: `test access api key: ${uuid.v4()}`, }, diff --git a/x-pack/test/api_integration/apis/fleet/agents/enroll.ts b/x-pack/test/api_integration/apis/fleet/agents/enroll.ts index b4d23a23923207..e9f7471f6437ea 100644 --- a/x-pack/test/api_integration/apis/fleet/agents/enroll.ts +++ b/x-pack/test/api_integration/apis/fleet/agents/enroll.ts @@ -25,7 +25,7 @@ export default function (providerContext: FtrProviderContext) { before(async () => { await esArchiver.loadIfNeeded('fleet/agents'); - const { body: apiKeyBody } = await esClient.security.createApiKey({ + const { body: apiKeyBody } = await esClient.security.createApiKey({ body: { name: `test access api key: ${uuid.v4()}`, }, diff --git a/x-pack/test/upgrade_assistant_integration/upgrade_assistant/status.ts b/x-pack/test/upgrade_assistant_integration/upgrade_assistant/status.ts index 0d299889139ef3..29788e0cc751d4 100644 --- a/x-pack/test/upgrade_assistant_integration/upgrade_assistant/status.ts +++ b/x-pack/test/upgrade_assistant_integration/upgrade_assistant/status.ts @@ -6,6 +6,7 @@ import expect from '@kbn/expect'; import { FtrProviderContext } from '../../api_integration/ftr_provider_context'; +import { ClusterStateAPIResponse } from '../../../plugins/upgrade_assistant/common/types'; import { getIndexStateFromClusterState } from '../../../plugins/upgrade_assistant/common/get_index_state_from_cluster_state'; // eslint-disable-next-line import/no-default-export @@ -28,7 +29,7 @@ export default function ({ getService }: FtrProviderContext) { it('the _cluster/state endpoint is still what we expect', async () => { await esArchiver.load('upgrade_assistant/reindex'); await es.indices.close({ index: '6.0-data' }); - const result = await es.cluster.state({ + const result = await es.cluster.state({ index: '6.0-data', metric: 'metadata', }); diff --git a/yarn.lock b/yarn.lock index c6a508b68a2006..1c5b7ed6a93c4b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2236,6 +2236,17 @@ redux-immutable-state-invariant "^2.1.0" redux-logger "^3.0.6" +"@elastic/elasticsearch@7.8.0": + version "7.8.0" + resolved "https://registry.yarnpkg.com/@elastic/elasticsearch/-/elasticsearch-7.8.0.tgz#3f9ee54fe8ef79874ebd231db03825fa500a7111" + integrity sha512-rUOTNN1At0KoN0Fcjd6+J7efghuURnoMTB/od9EMK6Mcdebi6N3z5ulShTsKRn6OanS9Eq3l/OmheQY1Y+WLcg== + dependencies: + debug "^4.1.1" + decompress-response "^4.2.0" + ms "^2.1.1" + pump "^3.0.0" + secure-json-parse "^2.1.0" + "@elastic/elasticsearch@^7.4.0": version "7.4.0" resolved "https://registry.yarnpkg.com/@elastic/elasticsearch/-/elasticsearch-7.4.0.tgz#57f4066acf25e9d4e9b4f6376088433aae6f25d4" @@ -27841,6 +27852,11 @@ scss-tokenizer@^0.2.3: js-base64 "^2.1.8" source-map "^0.4.2" +secure-json-parse@^2.1.0: + version "2.1.0" + resolved "https://registry.yarnpkg.com/secure-json-parse/-/secure-json-parse-2.1.0.tgz#ae76f5624256b5c497af887090a5d9e156c9fb20" + integrity sha512-GckO+MS/wT4UogDyoI/H/S1L0MCcKS1XX/vp48wfmU7Nw4woBmb8mIpu4zPBQjKlRT88/bt9xdoV4111jPpNJA== + seedrandom@^3.0.5: version "3.0.5" resolved "https://registry.yarnpkg.com/seedrandom/-/seedrandom-3.0.5.tgz#54edc85c95222525b0c7a6f6b3543d8e0b3aa0a7"