Skip to content

Commit

Permalink
remove clusterManager.worker changes
Browse files Browse the repository at this point in the history
  • Loading branch information
spalger committed Nov 20, 2020
1 parent 31e43f2 commit c77521b
Showing 1 changed file with 31 additions and 10 deletions.
41 changes: 31 additions & 10 deletions src/dev/cli_workers/cluster_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ export type SomeCliArgs = Pick<
>;

const firstAllTrue = (...sources: Array<Rx.Observable<boolean>>) =>
Rx.combineLatest(sources).pipe(
Rx.combineLatest(...sources).pipe(
filter((values) => values.every((v) => v === true)),
take(1),
mapTo(undefined)
);

export class ClusterManager {
public server: Worker;
public workers: Worker[];

private watcher: FSWatcher | null = null;
private basePathProxy: BasePathProxyServer | undefined;
Expand Down Expand Up @@ -99,12 +100,14 @@ export class ClusterManager {
);
}

this.server = new Worker({
type: 'server',
log: this.log,
argv: serverArgv,
apmServiceName: 'kibana',
});
this.workers = [
(this.server = new Worker({
type: 'server',
log: this.log,
argv: serverArgv,
apmServiceName: 'kibana',
})),
];

// write server status to the serverReady$ subject
Rx.merge(
Expand All @@ -115,13 +118,27 @@ export class ClusterManager {
.pipe(startWith(this.server.listening || this.server.crashed))
.subscribe(this.serverReady$);

// broker messages between workers
this.workers.forEach((worker) => {
worker.on('broadcast', (msg) => {
this.workers.forEach((to) => {
if (to !== worker && to.online) {
to.fork!.send(msg);
}
});
});
});

// When receive that event from server worker
// forward a reloadLoggingConfig message to master
// and all workers. This is only used by LogRotator service
// when the cluster mode is enabled
this.server.on('reloadLoggingConfigFromServerWorker', () => {
process.emit('message' as any, { reloadLoggingConfig: true } as any);
this.server.fork?.send({ reloadLoggingConfig: true });

this.workers.forEach((worker) => {
worker.fork!.send({ reloadLoggingConfig: true });
});
});

if (opts.watch) {
Expand Down Expand Up @@ -154,7 +171,9 @@ export class ClusterManager {

startCluster() {
this.setupManualRestart();
this.server.start();
for (const worker of this.workers) {
worker.start();
}
if (this.basePathProxy) {
this.basePathProxy.start({
delayUntil: () => firstAllTrue(this.serverReady$, this.kbnOptimizerReady$),
Expand Down Expand Up @@ -291,7 +310,9 @@ export class ClusterManager {
};

onWatcherChange = (e: any, path: string) => {
this.server.onChange(path);
for (const worker of this.workers) {
worker.onChange(path);
}
};

onWatcherError = (err: any) => {
Expand Down

0 comments on commit c77521b

Please sign in to comment.