Skip to content

Commit

Permalink
Implement LegacyService. Use core to start legacy Kibana. (#22190)
Browse files Browse the repository at this point in the history
* Implement `LegacyService`. Use `core` to start legacy Kibana.

* Fix Worker tests

* Do not rely on kbnServer when testing mixins.
  • Loading branch information
azasypkin authored Sep 6, 2018
1 parent 5f96c90 commit 3d6de7c
Show file tree
Hide file tree
Showing 70 changed files with 2,152 additions and 1,711 deletions.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@
"@types/redux-actions": "^2.2.1",
"@types/sinon": "^5.0.0",
"@types/strip-ansi": "^3.0.0",
"@types/supertest": "^2.0.4",
"@types/supertest": "^2.0.5",
"@types/type-detect": "^4.0.1",
"angular-mocks": "1.4.7",
"babel-eslint": "8.1.2",
Expand Down Expand Up @@ -318,8 +318,8 @@
"simple-git": "1.37.0",
"sinon": "^5.0.7",
"strip-ansi": "^3.0.1",
"supertest": "3.0.0",
"supertest-as-promised": "4.0.2",
"supertest": "^3.1.0",
"supertest-as-promised": "^4.0.2",
"tree-kill": "^1.1.0",
"ts-jest": "^22.4.6",
"ts-loader": "^3.5.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
/* eslint-env jest */

import EventEmitter from 'events';
import { assign, random } from 'lodash';
import sinon from 'sinon';
import cluster from 'cluster';
import { delay } from 'bluebird';

export default class MockClusterFork extends EventEmitter {
constructor() {
class MockClusterFork extends EventEmitter {
constructor(cluster) {
super();

let dead = true;
Expand All @@ -35,7 +34,7 @@ export default class MockClusterFork extends EventEmitter {

assign(this, {
process: {
kill: sinon.spy(() => {
kill: jest.fn(() => {
(async () => {
await wait();
this.emit('disconnect');
Expand All @@ -46,13 +45,13 @@ export default class MockClusterFork extends EventEmitter {
})();
}),
},
isDead: sinon.spy(() => dead),
send: sinon.stub()
isDead: jest.fn(() => dead),
send: jest.fn()
});

sinon.spy(this, 'on');
sinon.spy(this, 'removeListener');
sinon.spy(this, 'emit');
jest.spyOn(this, 'on');
jest.spyOn(this, 'removeListener');
jest.spyOn(this, 'emit');

(async () => {
await wait();
Expand All @@ -61,3 +60,12 @@ export default class MockClusterFork extends EventEmitter {
})();
}
}

class MockCluster extends EventEmitter {
fork = jest.fn(() => new MockClusterFork(this));
setupMaster = jest.fn();
}

export function mockCluster() {
return new MockCluster();
}
59 changes: 36 additions & 23 deletions src/cli/cluster/cluster_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,46 @@

import { resolve } from 'path';
import { debounce, invoke, bindAll, once, uniq } from 'lodash';
import { fromEvent, race } from 'rxjs';
import { first } from 'rxjs/operators';

import Log from '../log';
import Worker from './worker';
import { Config } from '../../server/config/config';
import { transformDeprecations } from '../../server/config/transform_deprecations';
import { configureBasePathProxy } from './configure_base_path_proxy';

process.env.kbnWorkerType = 'managr';

export default class ClusterManager {
static async create(opts = {}, settings = {}) {
const transformedSettings = transformDeprecations(settings);
const config = Config.withDefaultSchema(transformedSettings);

const basePathProxy = opts.basePath
? await configureBasePathProxy(config)
: undefined;

return new ClusterManager(opts, config, basePathProxy);
static create(opts, settings = {}, basePathProxy) {
return new ClusterManager(
opts,
Config.withDefaultSchema(transformDeprecations(settings)),
basePathProxy
);
}

constructor(opts, config, basePathProxy) {
this.log = new Log(opts.quiet, opts.silent);
this.addedCount = 0;
this.inReplMode = !!opts.repl;
this.basePathProxy = basePathProxy;

const serverArgv = [];
const optimizerArgv = [
'--plugins.initialize=false',
'--server.autoListen=false',
];

if (basePathProxy) {
this.basePathProxy = basePathProxy;

if (this.basePathProxy) {
optimizerArgv.push(
`--server.basePath=${this.basePathProxy.getBasePath()}`,
`--server.basePath=${this.basePathProxy.basePath}`,
'--server.rewriteBasePath=true',
);

serverArgv.push(
`--server.port=${this.basePathProxy.getTargetPort()}`,
`--server.basePath=${this.basePathProxy.getBasePath()}`,
`--server.port=${this.basePathProxy.targetPort}`,
`--server.basePath=${this.basePathProxy.basePath}`,
'--server.rewriteBasePath=true',
);
}
Expand All @@ -82,12 +79,6 @@ export default class ClusterManager {
})
];

if (basePathProxy) {
// Pass server worker to the basepath proxy so that it can hold off the
// proxying until server worker is ready.
this.basePathProxy.serverWorker = this.server;
}

// broker messages between workers
this.workers.forEach((worker) => {
worker.on('broadcast', (msg) => {
Expand Down Expand Up @@ -130,7 +121,10 @@ export default class ClusterManager {
this.setupManualRestart();
invoke(this.workers, 'start');
if (this.basePathProxy) {
this.basePathProxy.start();
this.basePathProxy.start({
blockUntil: this.blockUntil.bind(this),
shouldRedirectFromOldBasePath: this.shouldRedirectFromOldBasePath.bind(this),
});
}
}

Expand Down Expand Up @@ -222,4 +216,23 @@ export default class ClusterManager {
this.log.bad('failed to watch files!\n', err.stack);
process.exit(1); // eslint-disable-line no-process-exit
}

shouldRedirectFromOldBasePath(path) {
const isApp = path.startsWith('app/');
const isKnownShortPath = ['login', 'logout', 'status'].includes(path);

return isApp || isKnownShortPath;
}

blockUntil() {
// Wait until `server` worker either crashes or starts to listen.
if (this.server.listening || this.server.crashed) {
return Promise.resolve();
}

return race(
fromEvent(this.server, 'listening'),
fromEvent(this.server, 'crashed')
).pipe(first()).toPromise();
}
}
146 changes: 127 additions & 19 deletions src/cli/cluster/cluster_manager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,43 @@
* under the License.
*/

import sinon from 'sinon';
import { mockCluster } from './__mocks__/cluster';
jest.mock('cluster', () => mockCluster());
jest.mock('readline', () => ({
createInterface: jest.fn(() => ({
on: jest.fn(),
prompt: jest.fn(),
setPrompt: jest.fn(),
})),
}));

import cluster from 'cluster';
import { sample } from 'lodash';

import ClusterManager from './cluster_manager';
import Worker from './worker';

describe('CLI cluster manager', function () {
const sandbox = sinon.createSandbox();

beforeEach(function () {
sandbox.stub(cluster, 'fork').callsFake(() => {
describe('CLI cluster manager', () => {
beforeEach(() => {
cluster.fork.mockImplementation(() => {
return {
process: {
kill: sinon.stub(),
kill: jest.fn(),
},
isDead: sinon.stub().returns(false),
removeListener: sinon.stub(),
on: sinon.stub(),
send: sinon.stub()
isDead: jest.fn().mockReturnValue(false),
removeListener: jest.fn(),
addListener: jest.fn(),
send: jest.fn()
};
});
});

afterEach(function () {
sandbox.restore();
afterEach(() => {
cluster.fork.mockReset();
});

it('has two workers', async function () {
const manager = await ClusterManager.create({});
test('has two workers', () => {
const manager = ClusterManager.create({});

expect(manager.workers).toHaveLength(2);
for (const worker of manager.workers) expect(worker).toBeInstanceOf(Worker);
Expand All @@ -55,8 +62,8 @@ describe('CLI cluster manager', function () {
expect(manager.server).toBeInstanceOf(Worker);
});

it('delivers broadcast messages to other workers', async function () {
const manager = await ClusterManager.create({});
test('delivers broadcast messages to other workers', () => {
const manager = ClusterManager.create({});

for (const worker of manager.workers) {
Worker.prototype.start.call(worker);// bypass the debounced start method
Expand All @@ -69,10 +76,111 @@ describe('CLI cluster manager', function () {
messenger.emit('broadcast', football);
for (const worker of manager.workers) {
if (worker === messenger) {
expect(worker.fork.send.callCount).toBe(0);
expect(worker.fork.send).not.toHaveBeenCalled();
} else {
expect(worker.fork.send.firstCall.args[0]).toBe(football);
expect(worker.fork.send).toHaveBeenCalledTimes(1);
expect(worker.fork.send).toHaveBeenCalledWith(football);
}
}
});

describe('interaction with BasePathProxy', () => {
test('correctly configures `BasePathProxy`.', async () => {
const basePathProxyMock = { start: jest.fn() };

ClusterManager.create({}, {}, basePathProxyMock);

expect(basePathProxyMock.start).toHaveBeenCalledWith({
shouldRedirectFromOldBasePath: expect.any(Function),
blockUntil: expect.any(Function),
});
});

describe('proxy is configured with the correct `shouldRedirectFromOldBasePath` and `blockUntil` functions.', () => {
let clusterManager;
let shouldRedirectFromOldBasePath;
let blockUntil;
beforeEach(async () => {
const basePathProxyMock = { start: jest.fn() };

clusterManager = ClusterManager.create({}, {}, basePathProxyMock);

jest.spyOn(clusterManager.server, 'addListener');
jest.spyOn(clusterManager.server, 'removeListener');

[[{ blockUntil, shouldRedirectFromOldBasePath }]] = basePathProxyMock.start.mock.calls;
});

test('`shouldRedirectFromOldBasePath()` returns `false` for unknown paths.', () => {
expect(shouldRedirectFromOldBasePath('')).toBe(false);
expect(shouldRedirectFromOldBasePath('some-path/')).toBe(false);
expect(shouldRedirectFromOldBasePath('some-other-path')).toBe(false);
});

test('`shouldRedirectFromOldBasePath()` returns `true` for `app` and other known paths.', () => {
expect(shouldRedirectFromOldBasePath('app/')).toBe(true);
expect(shouldRedirectFromOldBasePath('login')).toBe(true);
expect(shouldRedirectFromOldBasePath('logout')).toBe(true);
expect(shouldRedirectFromOldBasePath('status')).toBe(true);
});

test('`blockUntil()` resolves immediately if worker has already crashed.', async () => {
clusterManager.server.crashed = true;

await expect(blockUntil()).resolves.not.toBeDefined();
expect(clusterManager.server.addListener).not.toHaveBeenCalled();
expect(clusterManager.server.removeListener).not.toHaveBeenCalled();
});

test('`blockUntil()` resolves immediately if worker is already listening.', async () => {
clusterManager.server.listening = true;

await expect(blockUntil()).resolves.not.toBeDefined();
expect(clusterManager.server.addListener).not.toHaveBeenCalled();
expect(clusterManager.server.removeListener).not.toHaveBeenCalled();
});

test('`blockUntil()` resolves when worker crashes.', async () => {
const blockUntilPromise = blockUntil();

expect(clusterManager.server.addListener).toHaveBeenCalledTimes(2);
expect(clusterManager.server.addListener).toHaveBeenCalledWith(
'crashed',
expect.any(Function)
);

const [, [eventName, onCrashed]] = clusterManager.server.addListener.mock.calls;
// Check event name to make sure we call the right callback,
// in Jest 23 we could use `toHaveBeenNthCalledWith` instead.
expect(eventName).toBe('crashed');
expect(clusterManager.server.removeListener).not.toHaveBeenCalled();

onCrashed();
await expect(blockUntilPromise).resolves.not.toBeDefined();

expect(clusterManager.server.removeListener).toHaveBeenCalledTimes(2);
});

test('`blockUntil()` resolves when worker starts listening.', async () => {
const blockUntilPromise = blockUntil();

expect(clusterManager.server.addListener).toHaveBeenCalledTimes(2);
expect(clusterManager.server.addListener).toHaveBeenCalledWith(
'listening',
expect.any(Function)
);

const [[eventName, onListening]] = clusterManager.server.addListener.mock.calls;
// Check event name to make sure we call the right callback,
// in Jest 23 we could use `toHaveBeenNthCalledWith` instead.
expect(eventName).toBe('listening');
expect(clusterManager.server.removeListener).not.toHaveBeenCalled();

onListening();
await expect(blockUntilPromise).resolves.not.toBeDefined();

expect(clusterManager.server.removeListener).toHaveBeenCalledTimes(2);
});
});
});
});
Loading

0 comments on commit 3d6de7c

Please sign in to comment.