Skip to content

Commit

Permalink
Implement LegacyService. Use core to start legacy Kibana.
Browse files Browse the repository at this point in the history
  • Loading branch information
azasypkin committed Aug 24, 2018
1 parent 51fb213 commit f9103f3
Show file tree
Hide file tree
Showing 56 changed files with 1,711 additions and 1,264 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
/* eslint-env jest */

import EventEmitter from 'events';
import { assign, random } from 'lodash';
import sinon from 'sinon';
import cluster from 'cluster';
import { delay } from 'bluebird';

export default class MockClusterFork extends EventEmitter {
constructor() {
class MockClusterFork extends EventEmitter {
constructor(cluster) {
super();

let dead = true;
Expand All @@ -35,7 +34,7 @@ export default class MockClusterFork extends EventEmitter {

assign(this, {
process: {
kill: sinon.spy(() => {
kill: jest.fn(() => {
(async () => {
await wait();
this.emit('disconnect');
Expand All @@ -46,13 +45,13 @@ export default class MockClusterFork extends EventEmitter {
})();
}),
},
isDead: sinon.spy(() => dead),
send: sinon.stub()
isDead: jest.fn(() => dead),
send: jest.fn()
});

sinon.spy(this, 'on');
sinon.spy(this, 'removeListener');
sinon.spy(this, 'emit');
jest.spyOn(this, 'on');
jest.spyOn(this, 'removeListener');
jest.spyOn(this, 'emit');

(async () => {
await wait();
Expand All @@ -61,3 +60,12 @@ export default class MockClusterFork extends EventEmitter {
})();
}
}

class MockCluster extends EventEmitter {
fork = jest.fn(() => new MockClusterFork(this));
setupMaster = jest.fn();
}

export function mockCluster() {
return new MockCluster();
}
60 changes: 37 additions & 23 deletions src/cli/cluster/cluster_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,47 @@

import { resolve } from 'path';
import { debounce, invoke, bindAll, once, uniq } from 'lodash';
import { fromEvent, race } from 'rxjs';
import { first } from 'rxjs/operators';

import Log from '../log';
import Worker from './worker';
import { Config } from '../../server/config/config';
import { transformDeprecations } from '../../server/config/transform_deprecations';
import { configureBasePathProxy } from './configure_base_path_proxy';

process.env.kbnWorkerType = 'managr';

export default class ClusterManager {
static async create(opts = {}, settings = {}) {
const transformedSettings = transformDeprecations(settings);
const config = Config.withDefaultSchema(transformedSettings);

const basePathProxy = opts.basePath
? await configureBasePathProxy(config)
: undefined;

return new ClusterManager(opts, config, basePathProxy);
static create(opts, settings = {}, basePathProxy) {
return new ClusterManager(
opts,
Config.withDefaultSchema(transformDeprecations(settings)),
basePathProxy
);
}

constructor(opts, config, basePathProxy) {
this.log = new Log(opts.quiet, opts.silent);
this.addedCount = 0;
this.inReplMode = !!opts.repl;
this.basePathProxy = basePathProxy;
this.config = config;

const serverArgv = [];
const optimizerArgv = [
'--plugins.initialize=false',
'--server.autoListen=false',
];

if (basePathProxy) {
this.basePathProxy = basePathProxy;

if (this.basePathProxy) {
optimizerArgv.push(
`--server.basePath=${this.basePathProxy.getBasePath()}`,
`--server.basePath=${this.basePathProxy.basePath}`,
'--server.rewriteBasePath=true',
);

serverArgv.push(
`--server.port=${this.basePathProxy.getTargetPort()}`,
`--server.basePath=${this.basePathProxy.getBasePath()}`,
`--server.port=${this.basePathProxy.targetPort}`,
`--server.basePath=${this.basePathProxy.basePath}`,
'--server.rewriteBasePath=true',
);
}
Expand All @@ -82,12 +80,6 @@ export default class ClusterManager {
})
];

if (basePathProxy) {
// Pass server worker to the basepath proxy so that it can hold off the
// proxying until server worker is ready.
this.basePathProxy.serverWorker = this.server;
}

// broker messages between workers
this.workers.forEach((worker) => {
worker.on('broadcast', (msg) => {
Expand Down Expand Up @@ -130,7 +122,10 @@ export default class ClusterManager {
this.setupManualRestart();
invoke(this.workers, 'start');
if (this.basePathProxy) {
this.basePathProxy.start();
this.basePathProxy.start({
blockUntil: this.blockUntil.bind(this),
shouldRedirectFromOldBasePath: this.shouldRedirectFromOldBasePath.bind(this),
});
}
}

Expand Down Expand Up @@ -222,4 +217,23 @@ export default class ClusterManager {
this.log.bad('failed to watch files!\n', err.stack);
process.exit(1); // eslint-disable-line no-process-exit
}

shouldRedirectFromOldBasePath(path) {
const isApp = path.startsWith('app/');
const isKnownShortPath = ['login', 'logout', 'status'].includes(path);

return isApp || isKnownShortPath;
}

blockUntil() {
// Wait until `server` worker either crashes or starts to listen.
if (this.server.listening || this.server.crashed) {
return Promise.resolve();
}

return race(
fromEvent(this.server, 'listening'),
fromEvent(this.server, 'crashed')
).pipe(first()).toPromise();
}
}
146 changes: 127 additions & 19 deletions src/cli/cluster/cluster_manager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,43 @@
* under the License.
*/

import sinon from 'sinon';
import { mockCluster }from './__mocks__/cluster';
jest.mock('cluster', () => mockCluster());
jest.mock('readline', () => ({
createInterface: jest.fn(() => ({
on: jest.fn(),
prompt: jest.fn(),
setPrompt: jest.fn(),
})),
}));

import cluster from 'cluster';
import { sample } from 'lodash';

import ClusterManager from './cluster_manager';
import Worker from './worker';

describe('CLI cluster manager', function () {
const sandbox = sinon.createSandbox();

beforeEach(function () {
sandbox.stub(cluster, 'fork').callsFake(() => {
describe('CLI cluster manager', () => {
beforeEach(() => {
cluster.fork.mockImplementation(() => {
return {
process: {
kill: sinon.stub(),
kill: jest.fn(),
},
isDead: sinon.stub().returns(false),
removeListener: sinon.stub(),
on: sinon.stub(),
send: sinon.stub()
isDead: jest.fn().mockReturnValue(false),
removeListener: jest.fn(),
addListener: jest.fn(),
send: jest.fn()
};
});
});

afterEach(function () {
sandbox.restore();
afterEach(() => {
cluster.fork.mockReset();
});

it('has two workers', async function () {
const manager = await ClusterManager.create({});
test('has two workers', () => {
const manager = ClusterManager.create({});

expect(manager.workers).toHaveLength(2);
for (const worker of manager.workers) expect(worker).toBeInstanceOf(Worker);
Expand All @@ -55,8 +62,8 @@ describe('CLI cluster manager', function () {
expect(manager.server).toBeInstanceOf(Worker);
});

it('delivers broadcast messages to other workers', async function () {
const manager = await ClusterManager.create({});
test('delivers broadcast messages to other workers', () => {
const manager = ClusterManager.create({});

for (const worker of manager.workers) {
Worker.prototype.start.call(worker);// bypass the debounced start method
Expand All @@ -69,10 +76,111 @@ describe('CLI cluster manager', function () {
messenger.emit('broadcast', football);
for (const worker of manager.workers) {
if (worker === messenger) {
expect(worker.fork.send.callCount).toBe(0);
expect(worker.fork.send).not.toHaveBeenCalled();
} else {
expect(worker.fork.send.firstCall.args[0]).toBe(football);
expect(worker.fork.send).toHaveBeenCalledTimes(1);
expect(worker.fork.send).toHaveBeenCalledWith(football);
}
}
});

describe('interaction with BasePathProxy', () => {
test('correctly configures `BasePathProxy`.', async () => {
const basePathProxyMock = { start: jest.fn() };

ClusterManager.create({}, {}, basePathProxyMock);

expect(basePathProxyMock.start).toHaveBeenCalledWith({
shouldRedirectFromOldBasePath: expect.any(Function),
blockUntil: expect.any(Function),
});
});

describe('proxy is configured with the correct `shouldRedirectFromOldBasePath` and `blockUntil` functions.', () => {
let clusterManager;
let shouldRedirectFromOldBasePath;
let blockUntil;
beforeEach(async () => {
const basePathProxyMock = { start: jest.fn() };

clusterManager = ClusterManager.create({}, {}, basePathProxyMock);

jest.spyOn(clusterManager.server, 'addListener');
jest.spyOn(clusterManager.server, 'removeListener');

[[{ blockUntil, shouldRedirectFromOldBasePath }]] = basePathProxyMock.start.mock.calls;
});

test('`shouldRedirectFromOldBasePath()` returns `false` for unknown paths.', () => {
expect(shouldRedirectFromOldBasePath('')).toBe(false);
expect(shouldRedirectFromOldBasePath('some-path/')).toBe(false);
expect(shouldRedirectFromOldBasePath('some-other-path')).toBe(false);
});

test('`shouldRedirectFromOldBasePath()` returns `true` for `app` and other known paths.', () => {
expect(shouldRedirectFromOldBasePath('app/')).toBe(true);
expect(shouldRedirectFromOldBasePath('login')).toBe(true);
expect(shouldRedirectFromOldBasePath('logout')).toBe(true);
expect(shouldRedirectFromOldBasePath('status')).toBe(true);
});

test('`blockUntil()` resolves immediately if worker has already crashed.', async () => {
clusterManager.server.crashed = true;

await expect(blockUntil()).resolves.not.toBeDefined();
expect(clusterManager.server.addListener).not.toHaveBeenCalled();
expect(clusterManager.server.removeListener).not.toHaveBeenCalled();
});

test('`blockUntil()` resolves immediately if worker is already listening.', async () => {
clusterManager.server.listening = true;

await expect(blockUntil()).resolves.not.toBeDefined();
expect(clusterManager.server.addListener).not.toHaveBeenCalled();
expect(clusterManager.server.removeListener).not.toHaveBeenCalled();
});

test('`blockUntil()` resolves when worker crashes.', async () => {
const blockUntilPromise = blockUntil();

expect(clusterManager.server.addListener).toHaveBeenCalledTimes(2);
expect(clusterManager.server.addListener).toHaveBeenCalledWith(
'crashed',
expect.any(Function)
);

const [, [eventName, onCrashed]] = clusterManager.server.addListener.mock.calls;
// Check event name to make sure we call the right callback,
// in Jest 23 we could use `toHaveBeenNthCalledWith` instead.
expect(eventName).toBe('crashed');
expect(clusterManager.server.removeListener).not.toHaveBeenCalled();

onCrashed();
await expect(blockUntilPromise).resolves.not.toBeDefined();

expect(clusterManager.server.removeListener).toHaveBeenCalledTimes(2);
});

test('`blockUntil()` resolves when worker starts listening.', async () => {
const blockUntilPromise = blockUntil();

expect(clusterManager.server.addListener).toHaveBeenCalledTimes(2);
expect(clusterManager.server.addListener).toHaveBeenCalledWith(
'listening',
expect.any(Function)
);

const [[eventName, onListening]] = clusterManager.server.addListener.mock.calls;
// Check event name to make sure we call the right callback,
// in Jest 23 we could use `toHaveBeenNthCalledWith` instead.
expect(eventName).toBe('listening');
expect(clusterManager.server.removeListener).not.toHaveBeenCalled();

onListening();
await expect(blockUntilPromise).resolves.not.toBeDefined();

expect(clusterManager.server.removeListener).toHaveBeenCalledTimes(2);
});
});
});
});
Loading

0 comments on commit f9103f3

Please sign in to comment.