Skip to content
This repository has been archived by the owner on Mar 31, 2024. It is now read-only.

Commit

Permalink
Revert "Revert "[kbn-es] await native realm setup, error if there are…
Browse files Browse the repository at this point in the history
… failures (elastic#36290)""

This reverts commit a102ca1.
  • Loading branch information
spalger committed May 10, 2019
1 parent a102ca1 commit a17c301
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 129 deletions.
87 changes: 54 additions & 33 deletions packages/kbn-es/src/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ const { createCliError } = require('./errors');
const { promisify } = require('util');
const treeKillAsync = promisify(require('tree-kill'));

// listen to data on stream until map returns anything but undefined
const first = (stream, map) =>
new Promise(resolve => {
const onData = data => {
const result = map(data);
if (result !== undefined) {
resolve(result);
stream.removeListener('data', onData);
}
};
stream.on('data', onData);
});

exports.Cluster = class Cluster {
constructor(log = defaultLog) {
this._log = log;
Expand Down Expand Up @@ -158,14 +171,15 @@ exports.Cluster = class Cluster {
this._exec(installPath, options);

await Promise.race([
// await the "started" log message
new Promise(resolve => {
this._process.stdout.on('data', data => {
// wait for native realm to be setup and es to be started
Promise.all([
first(this._process.stdout, data => {
if (/started/.test(data)) {
resolve();
return true;
}
});
}),
}),
this._nativeRealmSetup,
]),

// await the outcome of the process in case it exits before starting
this._outcome.then(() => {
Expand All @@ -185,6 +199,12 @@ exports.Cluster = class Cluster {
async run(installPath, options = {}) {
this._exec(installPath, options);

// log native realm setup errors so they aren't uncaught
this._nativeRealmSetup.catch(error => {
this._log.error(error);
this.stop();
});

// await the final outcome of the process
await this._outcome;
}
Expand Down Expand Up @@ -241,42 +261,43 @@ exports.Cluster = class Cluster {
stdio: ['ignore', 'pipe', 'pipe'],
});

// parse log output to find http port
const httpPort = first(this._process.stdout, data => {
const match = data.toString('utf8').match(/HttpServer.+publish_address {[0-9.]+:([0-9]+)/);

if (match) {
return match[1];
}
});

// once the http port is available setup the native realm
this._nativeRealmSetup = httpPort.then(async port => {
const nativeRealm = new NativeRealm(options.password, port, this._log);
await nativeRealm.setPasswords(options);
});

// parse and forward es stdout to the log
this._process.stdout.on('data', data => {
const lines = parseEsLog(data.toString());
lines.forEach(line => {
this._log.info(line.formattedMessage);

// once we have the port we can stop checking for it
if (this.httpPort) {
return;
}

const httpAddressMatch = line.message.match(
/HttpServer.+publish_address {[0-9.]+:([0-9]+)/
);

if (httpAddressMatch) {
this.httpPort = httpAddressMatch[1];
new NativeRealm(options.password, this.httpPort, this._log).setPasswords(options);
}
});
});

// forward es stderr to the log
this._process.stderr.on('data', data => this._log.error(chalk.red(data.toString())));

this._outcome = new Promise((resolve, reject) => {
this._process.once('exit', code => {
if (this._stopCalled) {
resolve();
return;
}
// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat them as errors
if (code > 0 && !(code === 143 || code === 130)) {
reject(createCliError(`ES exited with code ${code}`));
} else {
resolve();
}
});
// observe the exit code of the process and reflect in _outcome promies
const exitCode = new Promise(resolve => this._process.once('exit', resolve));
this._outcome = exitCode.then(code => {
if (this._stopCalled) {
return;
}

// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat them as errors
if (code > 0 && !(code === 143 || code === 130)) {
throw createCliError(`ES exited with code ${code}`);
}
});
}
};
57 changes: 54 additions & 3 deletions packages/kbn-es/src/integration_tests/__fixtures__/es_bin.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,61 @@
* under the License.
*/

const { createServer } = require('http');
const { format: formatUrl } = require('url');
const { exitCode, start } = JSON.parse(process.argv[2]);

if (start) {
console.log('started');
process.exitCode = exitCode;

if (!start) {
return;
}

process.exitCode = exitCode;
let serverUrl;
const server = createServer((req, res) => {
const url = new URL(req.url, serverUrl);
const send = (code, body) => {
res.writeHead(code, { 'content-type': 'application/json' });
res.end(JSON.stringify(body));
};

if (url.pathname === '/_xpack') {
return send(400, {
error: {
reason: 'foo bar',
},
});
}

return send(404, {
error: {
reason: 'not found',
},
});
});

// setup server auto close after 1 second of silence
let serverCloseTimer;
const delayServerClose = () => {
clearTimeout(serverCloseTimer);
serverCloseTimer = setTimeout(() => server.close(), 1000);
};
server.on('request', delayServerClose);
server.on('listening', delayServerClose);

server.listen(0, '127.0.0.1', function() {
const { port, address: hostname } = server.address();
serverUrl = new URL(
formatUrl({
protocol: 'http:',
port,
hostname,
})
);

console.log(
`[o.e.h.AbstractHttpServerTransport] [computer] publish_address {127.0.0.1:${port}}, bound_addresses {[::1]:${port}}, {127.0.0.1:${port}}`
);

console.log('started');
});
165 changes: 83 additions & 82 deletions packages/kbn-es/src/utils/native_realm.js
Original file line number Diff line number Diff line change
@@ -1,82 +1,83 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

const { Client } = require('@elastic/elasticsearch');
const chalk = require('chalk');

const { log: defaultLog } = require('./log');

exports.NativeRealm = class NativeRealm {
constructor(elasticPassword, port, log = defaultLog) {
this._client = new Client({ node: `http://elastic:${elasticPassword}@localhost:${port}` });
this._elasticPassword = elasticPassword;
this._log = log;
}

async setPassword(username, password = this._elasticPassword) {
this._log.info(`setting ${chalk.bold(username)} password to ${chalk.bold(password)}`);

try {
await this._client.security.changePassword({
username,
refresh: 'wait_for',
body: {
password,
},
});
} catch (e) {
this._log.error(
chalk.red(`unable to set password for ${chalk.bold(username)}: ${e.message}`)
);
}
}

async setPasswords(options) {
if (!(await this.isSecurityEnabled())) {
this._log.info('security is not enabled, unable to set native realm passwords');
return;
}

(await this.getReservedUsers()).forEach(user => {
this.setPassword(user, options[`password.${user}`]);
});
}

async getReservedUsers() {
const users = await this._client.security.getUser();

return Object.keys(users.body).reduce((acc, user) => {
if (users.body[user].metadata._reserved === true) {
acc.push(user);
}
return acc;
}, []);
}

async isSecurityEnabled() {
try {
const {
body: { features },
} = await this._client.xpack.info({ categories: 'features' });
return features.security && features.security.enabled && features.security.available;
} catch (e) {
return false;
}
}
};
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

const { Client } = require('@elastic/elasticsearch');
const chalk = require('chalk');

const { log: defaultLog } = require('./log');

exports.NativeRealm = class NativeRealm {
constructor(elasticPassword, port, log = defaultLog) {
this._client = new Client({ node: `http://elastic:${elasticPassword}@localhost:${port}` });
this._elasticPassword = elasticPassword;
this._log = log;
}

async setPassword(username, password = this._elasticPassword) {
this._log.info(`setting ${chalk.bold(username)} password to ${chalk.bold(password)}`);

await this._client.security.changePassword({
username,
refresh: 'wait_for',
body: {
password,
},
});
}

async setPasswords(options) {
if (!(await this.isSecurityEnabled())) {
this._log.info('security is not enabled, unable to set native realm passwords');
return;
}

const reservedUsers = await this.getReservedUsers();
await Promise.all(
reservedUsers.map(async user => {
await this.setPassword(user, options[`password.${user}`]);
})
);
}

async getReservedUsers() {
const users = await this._client.security.getUser();

return Object.keys(users.body).reduce((acc, user) => {
if (users.body[user].metadata._reserved === true) {
acc.push(user);
}
return acc;
}, []);
}

async isSecurityEnabled() {
try {
const {
body: { features },
} = await this._client.xpack.info({ categories: 'features' });
return features.security && features.security.enabled && features.security.available;
} catch (error) {
if (error.meta && error.meta.statusCode === 400) {
return false;
}

throw error;
}
}
};
Loading

0 comments on commit a17c301

Please sign in to comment.