Skip to content

Commit

Permalink
Use ES API from start contract (elastic#66157)
Browse files Browse the repository at this point in the history
* watcher uses es api from start

* CCR uses ES API from start contract

* Rollup uses ES API from start contract

* Transform uses ES API from start contract

* Snapshot_restore uses ES API from start contract

* remove excessive logging. platform logs all the lifecycles

* file uploader uses ES API from start contract

* remove unnecessary async

* use async getter

* update rollup custom client usage

* address cj comment

* roll back changes. maps tests are failing

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
mshustov and elasticmachine committed May 16, 2020
1 parent e18895b commit ed736e8
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 41 deletions.
26 changes: 19 additions & 7 deletions x-pack/plugins/cross_cluster_replication/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { first } from 'rxjs/operators';
import { i18n } from '@kbn/i18n';
import {
CoreSetup,
ICustomClusterClient,
Plugin,
Logger,
PluginInitializerContext,
Expand All @@ -36,6 +37,13 @@ interface CrossClusterReplicationContext {
client: IScopedClusterClient;
}

async function getCustomEsClient(getStartServices: CoreSetup['getStartServices']) {
const [core] = await getStartServices();
// Extend the elasticsearchJs client with additional endpoints.
const esClientConfig = { plugins: [elasticsearchJsPlugin] };
return core.elasticsearch.legacy.createClient('crossClusterReplication', esClientConfig);
}

const ccrDataEnricher = async (indicesList: Index[], callWithRequest: APICaller) => {
if (!indicesList?.length) {
return indicesList;
Expand Down Expand Up @@ -69,6 +77,7 @@ export class CrossClusterReplicationServerPlugin implements Plugin<void, void, a
private readonly config$: Observable<CrossClusterReplicationConfig>;
private readonly license: License;
private readonly logger: Logger;
private ccrEsClient?: ICustomClusterClient;

constructor(initializerContext: PluginInitializerContext) {
this.logger = initializerContext.logger.get();
Expand All @@ -77,7 +86,7 @@ export class CrossClusterReplicationServerPlugin implements Plugin<void, void, a
}

setup(
{ http, elasticsearch }: CoreSetup,
{ http, getStartServices }: CoreSetup,
{ licensing, indexManagement, remoteClusters }: Dependencies
) {
this.config$
Expand Down Expand Up @@ -115,12 +124,10 @@ export class CrossClusterReplicationServerPlugin implements Plugin<void, void, a
}
);

// Extend the elasticsearchJs client with additional endpoints.
const esClientConfig = { plugins: [elasticsearchJsPlugin] };
const ccrEsClient = elasticsearch.createClient('crossClusterReplication', esClientConfig);
http.registerRouteHandlerContext('crossClusterReplication', (ctx, request) => {
http.registerRouteHandlerContext('crossClusterReplication', async (ctx, request) => {
this.ccrEsClient = this.ccrEsClient ?? (await getCustomEsClient(getStartServices));
return {
client: ccrEsClient.asScoped(request),
client: this.ccrEsClient.asScoped(request),
};
});

Expand All @@ -135,5 +142,10 @@ export class CrossClusterReplicationServerPlugin implements Plugin<void, void, a
}

start() {}
stop() {}

stop() {
if (this.ccrEsClient) {
this.ccrEsClient.close();
}
}
}
32 changes: 24 additions & 8 deletions x-pack/plugins/rollup/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { Observable } from 'rxjs';
import { first } from 'rxjs/operators';
import {
CoreSetup,
ICustomClusterClient,
Plugin,
Logger,
KibanaRequest,
Expand Down Expand Up @@ -42,11 +43,18 @@ import { mergeCapabilitiesWithFields } from './lib/merge_capabilities_with_field
interface RollupContext {
client: IScopedClusterClient;
}
async function getCustomEsClient(getStartServices: CoreSetup['getStartServices']) {
const [core] = await getStartServices();
// Extend the elasticsearchJs client with additional endpoints.
const esClientConfig = { plugins: [elasticsearchJsPlugin] };
return core.elasticsearch.legacy.createClient('rollup', esClientConfig);
}

export class RollupPlugin implements Plugin<void, void, any, any> {
private readonly logger: Logger;
private readonly globalConfig$: Observable<SharedGlobalConfig>;
private readonly license: License;
private rollupEsClient?: ICustomClusterClient;

constructor(initializerContext: PluginInitializerContext) {
this.logger = initializerContext.logger.get();
Expand All @@ -55,7 +63,7 @@ export class RollupPlugin implements Plugin<void, void, any, any> {
}

public setup(
{ http, uiSettings, elasticsearch }: CoreSetup,
{ http, uiSettings, getStartServices }: CoreSetup,
{ licensing, indexManagement, visTypeTimeseries, usageCollection }: Dependencies
) {
this.license.setup(
Expand All @@ -72,12 +80,10 @@ export class RollupPlugin implements Plugin<void, void, any, any> {
}
);

// Extend the elasticsearchJs client with additional endpoints.
const esClientConfig = { plugins: [elasticsearchJsPlugin] };
const rollupEsClient = elasticsearch.createClient('rollup', esClientConfig);
http.registerRouteHandlerContext('rollup', (context, request) => {
http.registerRouteHandlerContext('rollup', async (context, request) => {
this.rollupEsClient = this.rollupEsClient ?? (await getCustomEsClient(getStartServices));
return {
client: rollupEsClient.asScoped(request),
client: this.rollupEsClient.asScoped(request),
};
});

Expand Down Expand Up @@ -116,7 +122,12 @@ export class RollupPlugin implements Plugin<void, void, any, any> {
const callWithRequestFactoryShim = (
elasticsearchServiceShim: CallWithRequestFactoryShim,
request: KibanaRequest
): APICaller => rollupEsClient.asScoped(request).callAsCurrentUser;
): APICaller => {
return async (...args: Parameters<APICaller>) => {
this.rollupEsClient = this.rollupEsClient ?? (await getCustomEsClient(getStartServices));
return await this.rollupEsClient.asScoped(request).callAsCurrentUser(...args);
};
};

const { addSearchStrategy } = visTypeTimeseries;
registerRollupSearchStrategy(callWithRequestFactoryShim, addSearchStrategy);
Expand All @@ -140,5 +151,10 @@ export class RollupPlugin implements Plugin<void, void, any, any> {
}

start() {}
stop() {}

stop() {
if (this.rollupEsClient) {
this.rollupEsClient.close();
}
}
}
26 changes: 17 additions & 9 deletions x-pack/plugins/snapshot_restore/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { first } from 'rxjs/operators';
import { i18n } from '@kbn/i18n';
import {
CoreSetup,
ICustomClusterClient,
Plugin,
Logger,
PluginInitializerContext,
Expand All @@ -31,10 +32,17 @@ export interface SnapshotRestoreContext {
client: IScopedClusterClient;
}

async function getCustomEsClient(getStartServices: CoreSetup['getStartServices']) {
const [core] = await getStartServices();
const esClientConfig = { plugins: [elasticsearchJsPlugin] };
return core.elasticsearch.legacy.createClient('snapshotRestore', esClientConfig);
}

export class SnapshotRestoreServerPlugin implements Plugin<void, void, any, any> {
private readonly logger: Logger;
private readonly apiRoutes: ApiRoutes;
private readonly license: License;
private snapshotRestoreESClient?: ICustomClusterClient;

constructor(private context: PluginInitializerContext) {
const { logger } = this.context;
Expand All @@ -44,7 +52,7 @@ export class SnapshotRestoreServerPlugin implements Plugin<void, void, any, any>
}

public async setup(
{ http, elasticsearch }: CoreSetup,
{ http, getStartServices }: CoreSetup,
{ licensing, security, cloud }: Dependencies
): Promise<void> {
const pluginConfig = await this.context.config
Expand Down Expand Up @@ -72,11 +80,11 @@ export class SnapshotRestoreServerPlugin implements Plugin<void, void, any, any>
}
);

const esClientConfig = { plugins: [elasticsearchJsPlugin] };
const snapshotRestoreESClient = elasticsearch.createClient('snapshotRestore', esClientConfig);
http.registerRouteHandlerContext('snapshotRestore', (ctx, request) => {
http.registerRouteHandlerContext('snapshotRestore', async (ctx, request) => {
this.snapshotRestoreESClient =
this.snapshotRestoreESClient ?? (await getCustomEsClient(getStartServices));
return {
client: snapshotRestoreESClient.asScoped(request),
client: this.snapshotRestoreESClient.asScoped(request),
};
});

Expand All @@ -95,11 +103,11 @@ export class SnapshotRestoreServerPlugin implements Plugin<void, void, any, any>
});
}

public start() {
this.logger.debug('Starting plugin');
}
public start() {}

public stop() {
this.logger.debug('Stopping plugin');
if (this.snapshotRestoreESClient) {
this.snapshotRestoreESClient.close();
}
}
}
27 changes: 20 additions & 7 deletions x-pack/plugins/transform/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import { i18n } from '@kbn/i18n';
import {
CoreSetup,
ICustomClusterClient,
Plugin,
IScopedClusterClient,
Logger,
Expand Down Expand Up @@ -38,18 +39,26 @@ const PLUGIN = {
}),
};

async function getCustomEsClient(getStartServices: CoreSetup['getStartServices']) {
const [core] = await getStartServices();
return core.elasticsearch.legacy.createClient('transform', {
plugins: [elasticsearchJsPlugin],
});
}

export class TransformServerPlugin implements Plugin<{}, void, any, any> {
private readonly apiRoutes: ApiRoutes;
private readonly license: License;
private readonly logger: Logger;
private transformESClient?: ICustomClusterClient;

constructor(initContext: PluginInitializerContext) {
this.logger = initContext.logger.get();
this.apiRoutes = new ApiRoutes();
this.license = new License();
}

setup({ elasticsearch, http }: CoreSetup, { licensing }: Dependencies): {} {
setup({ http, getStartServices }: CoreSetup, { licensing }: Dependencies): {} {
const router = http.createRouter();

this.license.setup(
Expand All @@ -72,18 +81,22 @@ export class TransformServerPlugin implements Plugin<{}, void, any, any> {
});

// Can access via new platform router's handler function 'context' parameter - context.transform.client
const transformClient = elasticsearch.createClient('transform', {
plugins: [elasticsearchJsPlugin],
});
http.registerRouteHandlerContext('transform', (context, request) => {
http.registerRouteHandlerContext('transform', async (context, request) => {
this.transformESClient =
this.transformESClient ?? (await getCustomEsClient(getStartServices));
return {
dataClient: transformClient.asScoped(request),
dataClient: this.transformESClient.asScoped(request),
};
});

return {};
}

start() {}
stop() {}

stop() {
if (this.transformESClient) {
this.transformESClient.close();
}
}
}
30 changes: 20 additions & 10 deletions x-pack/plugins/watcher/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ declare module 'kibana/server' {

import {
CoreSetup,
ICustomClusterClient,
IScopedClusterClient,
Logger,
Plugin,
Expand All @@ -33,8 +34,15 @@ export interface WatcherContext {
client: IScopedClusterClient;
}

async function getCustomEsClient(getStartServices: CoreSetup['getStartServices']) {
const [core] = await getStartServices();
const esConfig = { plugins: [elasticsearchJsPlugin] };
return core.elasticsearch.legacy.createClient('watcher', esConfig);
}

export class WatcherServerPlugin implements Plugin<void, void, any, any> {
log: Logger;
private readonly log: Logger;
private watcherESClient?: ICustomClusterClient;

private licenseStatus: LicenseStatus = {
hasRequired: false,
Expand All @@ -44,21 +52,17 @@ export class WatcherServerPlugin implements Plugin<void, void, any, any> {
this.log = ctx.logger.get();
}

async setup(
{ http, elasticsearch: elasticsearchService }: CoreSetup,
{ licensing }: Dependencies
) {
async setup({ http, getStartServices }: CoreSetup, { licensing }: Dependencies) {
const router = http.createRouter();
const routeDependencies: RouteDependencies = {
router,
getLicenseStatus: () => this.licenseStatus,
};

const config = { plugins: [elasticsearchJsPlugin] };
const watcherESClient = elasticsearchService.createClient('watcher', config);
http.registerRouteHandlerContext('watcher', (ctx, request) => {
http.registerRouteHandlerContext('watcher', async (ctx, request) => {
this.watcherESClient = this.watcherESClient ?? (await getCustomEsClient(getStartServices));
return {
client: watcherESClient.asScoped(request),
client: this.watcherESClient.asScoped(request),
};
});

Expand Down Expand Up @@ -89,6 +93,12 @@ export class WatcherServerPlugin implements Plugin<void, void, any, any> {
}
});
}

start() {}
stop() {}

stop() {
if (this.watcherESClient) {
this.watcherESClient.close();
}
}
}

0 comments on commit ed736e8

Please sign in to comment.