Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop reading from stdin after programmatic API finishes #253

Merged
merged 12 commits into from
Aug 8, 2021
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,19 @@ For more details, visit https://github.com/kimmobrunfeldt/concurrently
concurrently can be used programmatically by using the API documented below:

### `concurrently(commands[, options])`

- `commands`: an array of either strings (containing the commands to run) or objects
with the shape `{ command, name, prefixColor, env, cwd }`.

- `options` (optional): an object containing any of the below:
- `cwd`: the working directory to be used by all commands. Can be overriden per command.
Default: `process.cwd()`.
- `defaultInputTarget`: the default input target when reading from `inputStream`.
Default: `0`.
- `handleInput`: when `true`, reads input from `process.stdin`.
- `inputStream`: a [`Readable` stream](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_readable_streams)
to read the input from, eg `process.stdin`.
to read the input from. Should only be used in the rare instance you would like to stream anything other than `process.stdin`. Overrides `handleInput`.
- `pauseInputStreamOnFinish`: by default, pauses the input stream (`process.stdin` when `handleInput` is enabled, or `inputStream` if provided) when all of the processes have finished. If you need to read from the input stream after `concurrently` has finished, set this to `false`. ([#252](https://github.com/kimmobrunfeldt/concurrently/issues/252)).
- `killOthers`: an array of exitting conditions that will cause a process to kill others.
Can contain any of `success` or `failure`.
- `maxProcesses`: how many processes should run at once.
Expand Down
2 changes: 1 addition & 1 deletion bin/concurrently.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ concurrently(args._.map((command, index) => {
name: names[index]
};
}), {
inputStream: args.handleInput && process.stdin,
handleInput: args.handleInput,
defaultInputTarget: args.defaultInputTarget,
killOthers: args.killOthers
? ['success', 'failure']
Expand Down
6 changes: 3 additions & 3 deletions bin/concurrently.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ describe('--names', () => {
});

describe('--prefix', () => {
it('is alised to -p', done => {
it('is aliased to -p', done => {
const child = run('-p command "echo foo" "echo bar"');
child.log.pipe(buffer(child.close)).subscribe(lines => {
expect(lines).toContainEqual(expect.stringContaining('[echo foo] foo'));
Expand All @@ -213,7 +213,7 @@ describe('--prefix', () => {
});

describe('--prefix-length', () => {
it('is alised to -l', done => {
it('is aliased to -l', done => {
const child = run('-p command -l 5 "echo foo" "echo bar"');
child.log.pipe(buffer(child.close)).subscribe(lines => {
expect(lines).toContainEqual(expect.stringContaining('[ec..o] foo'));
Expand Down Expand Up @@ -247,7 +247,7 @@ describe('--restart-tries', () => {
});

describe('--kill-others', () => {
it('is alised to -k', done => {
it('is aliased to -k', done => {
const child = run('-k "sleep 10" "exit 0"');
child.log.pipe(buffer(child.close)).subscribe(lines => {
expect(lines).toContainEqual(expect.stringContaining('[1] exit 0 exited with code 0'));
Expand Down
3 changes: 2 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ module.exports = exports = (commands, options = {}) => {
new InputHandler({
logger,
defaultInputTarget: options.defaultInputTarget,
inputStream: options.inputStream,
inputStream: options.inputStream || (options.handleInput && process.stdin),
pauseInputStreamOnFinish: options.pauseInputStreamOnFinish,
}),
new KillOnSignal({ process }),
new RestartProcess({
Expand Down
19 changes: 15 additions & 4 deletions src/concurrently.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,29 @@ module.exports = (commands, options) => {
))
.value();

commands = options.controllers.reduce(
(prevCommands, controller) => controller.handle(prevCommands),
commands
const handleResult = options.controllers.reduce(
({ commands: prevCommands, onFinishCallbacks }, controller) => {
const { commands, onFinish } = controller.handle(prevCommands);
return {
commands,
onFinishCallbacks: _.concat(onFinishCallbacks, onFinish ? [onFinish] : [])
}
},
{ commands, onFinishCallbacks: [] }
);
commands = handleResult.commands

const commandsLeft = commands.slice();
const maxProcesses = Math.max(1, Number(options.maxProcesses) || commandsLeft.length);
for (let i = 0; i < maxProcesses; i++) {
maybeRunMore(commandsLeft);
}

return new CompletionListener({ successCondition: options.successCondition }).listen(commands);
return new CompletionListener({ successCondition: options.successCondition })
.listen(commands)
.finally(() => {
handleResult.onFinishCallbacks.forEach((onFinish) => onFinish());
});
};

function mapToCommandInfo(command) {
Expand Down
23 changes: 21 additions & 2 deletions src/concurrently.spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const EventEmitter = require('events');

const createFakeCommand = require('./flow-control/fixtures/fake-command');
const FakeHandler = require('./flow-control/fixtures/fake-handler');
const concurrently = require('./concurrently');

let spawn, kill, controllers, processes = [];
Expand All @@ -18,7 +19,7 @@ beforeEach(() => {
return process;
});
kill = jest.fn();
controllers = [{ handle: jest.fn(arg => arg) }, { handle: jest.fn(arg => arg) }];
controllers = [new FakeHandler(), new FakeHandler()];
});

it('fails if commands is not an array', () => {
Expand Down Expand Up @@ -85,7 +86,7 @@ it('runs commands with a name or prefix color', () => {

it('passes commands wrapped from a controller to the next one', () => {
const fakeCommand = createFakeCommand('banana', 'banana');
controllers[0].handle.mockReturnValue([fakeCommand]);
controllers[0].handle.mockReturnValue({ commands: [fakeCommand] });

create(['echo']);

Expand Down Expand Up @@ -165,3 +166,21 @@ it('uses overridden cwd option for each command if specified', () => {
cwd: 'foobar',
}));
});

it('runs onFinish hook after all commands run', async () => {
const promise = create(['foo', 'bar'], { maxProcesses: 1 });
expect(spawn).toHaveBeenCalledTimes(1);
expect(controllers[0].onFinish).not.toHaveBeenCalled();
expect(controllers[1].onFinish).not.toHaveBeenCalled();

processes[0].emit('close', 0, null);
expect(spawn).toHaveBeenCalledTimes(2);
expect(controllers[0].onFinish).not.toHaveBeenCalled();
expect(controllers[1].onFinish).not.toHaveBeenCalled();

processes[1].emit('close', 0, null);
await promise;

expect(controllers[0].onFinish).toHaveBeenCalled();
expect(controllers[1].onFinish).toHaveBeenCalled();
})
16 changes: 16 additions & 0 deletions src/flow-control/base-handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module.exports = class BaseHandler {
constructor(options = {}) {
const { logger } = options;

this.logger = logger;
}

handle(commands) {
return {
commands,
// an optional callback to call when all commands have finished
// (either successful or not)
onFinish: null,
};
}
};
13 changes: 13 additions & 0 deletions src/flow-control/fixtures/fake-handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
const BaseHandler = require('../base-handler')

module.exports = class FakeHandler extends BaseHandler {
constructor() {
super();

this.handle = jest.fn(commands => ({
commands,
onFinish: this.onFinish,
}));
this.onFinish = jest.fn();
}
};
21 changes: 16 additions & 5 deletions src/flow-control/input-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ const Rx = require('rxjs');
const { map } = require('rxjs/operators');

const defaults = require('../defaults');
const BaseHandler = require('./base-handler');

module.exports = class InputHandler extends BaseHandler {
constructor({ defaultInputTarget, inputStream, pauseInputStreamOnFinish, logger }) {
super({ logger });

module.exports = class InputHandler {
constructor({ defaultInputTarget, inputStream, logger }) {
this.defaultInputTarget = defaultInputTarget || defaults.defaultInputTarget;
this.inputStream = inputStream;
this.logger = logger;
this.pauseInputStreamOnFinish = pauseInputStreamOnFinish !== false;
}

handle(commands) {
if (!this.inputStream) {
return commands;
return { commands };
}

Rx.fromEvent(this.inputStream, 'data')
Expand All @@ -34,6 +37,14 @@ module.exports = class InputHandler {
}
});

return commands;
return {
commands,
onFinish: () => {
if (this.pauseInputStreamOnFinish) {
// https://github.com/kimmobrunfeldt/concurrently/issues/252
this.inputStream.pause();
}
},
};
}
};
40 changes: 31 additions & 9 deletions src/flow-control/input-handler.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const EventEmitter = require('events');
const stream = require('stream');
const { createMockInstance } = require('jest-create-mock-instance');

const Logger = require('../logger');
Expand All @@ -12,7 +12,7 @@ beforeEach(() => {
createFakeCommand('foo', 'echo foo', 0),
createFakeCommand('bar', 'echo bar', 1),
];
inputStream = new EventEmitter();
inputStream = new stream.PassThrough();
logger = createMockInstance(Logger);
controller = new InputHandler({
defaultInputTarget: 0,
Expand All @@ -22,16 +22,16 @@ beforeEach(() => {
});

it('returns same commands', () => {
expect(controller.handle(commands)).toBe(commands);
expect(controller.handle(commands)).toMatchObject({ commands });

controller = new InputHandler({ logger });
expect(controller.handle(commands)).toBe(commands);
expect(controller.handle(commands)).toMatchObject({ commands });
});

it('forwards input stream to default target ID', () => {
controller.handle(commands);

inputStream.emit('data', Buffer.from('something'));
inputStream.write('something');

expect(commands[0].stdin.write).toHaveBeenCalledTimes(1);
expect(commands[0].stdin.write).toHaveBeenCalledWith('something');
Expand All @@ -41,7 +41,7 @@ it('forwards input stream to default target ID', () => {
it('forwards input stream to target index specified in input', () => {
controller.handle(commands);

inputStream.emit('data', Buffer.from('1:something'));
inputStream.write('1:something');

expect(commands[0].stdin.write).not.toHaveBeenCalled();
expect(commands[1].stdin.write).toHaveBeenCalledTimes(1);
Expand All @@ -63,7 +63,7 @@ it('forwards input stream to target index specified in input when input contains
it('forwards input stream to target name specified in input', () => {
controller.handle(commands);

inputStream.emit('data', Buffer.from('bar:something'));
inputStream.write('bar:something');

expect(commands[0].stdin.write).not.toHaveBeenCalled();
expect(commands[1].stdin.write).toHaveBeenCalledTimes(1);
Expand All @@ -74,7 +74,7 @@ it('logs error if command has no stdin open', () => {
commands[0].stdin = null;
controller.handle(commands);

inputStream.emit('data', Buffer.from('something'));
inputStream.write('something');

expect(commands[1].stdin.write).not.toHaveBeenCalled();
expect(logger.logGlobalEvent).toHaveBeenCalledWith('Unable to find command 0, or it has no stdin open\n');
Expand All @@ -83,9 +83,31 @@ it('logs error if command has no stdin open', () => {
it('logs error if command is not found', () => {
controller.handle(commands);

inputStream.emit('data', Buffer.from('foobar:something'));
inputStream.write('foobar:something');

expect(commands[0].stdin.write).not.toHaveBeenCalled();
expect(commands[1].stdin.write).not.toHaveBeenCalled();
expect(logger.logGlobalEvent).toHaveBeenCalledWith('Unable to find command foobar, or it has no stdin open\n');
});

it('pauses input stream when finished', () => {
expect(inputStream.readableFlowing).toBeNull();

const { onFinish } = controller.handle(commands);
expect(inputStream.readableFlowing).toBe(true);

onFinish();
expect(inputStream.readableFlowing).toBe(false);
});

it('does not pause input stream when pauseInputStreamOnFinish is set to false', () => {
controller = new InputHandler({ inputStream, pauseInputStreamOnFinish: false });

expect(inputStream.readableFlowing).toBeNull();

const { onFinish } = controller.handle(commands);
expect(inputStream.readableFlowing).toBe(true);

onFinish();
expect(inputStream.readableFlowing).toBe(true);
});
29 changes: 17 additions & 12 deletions src/flow-control/kill-on-signal.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
const { map } = require('rxjs/operators');

const BaseHandler = require('./base-handler');

module.exports = class KillOnSignal {
module.exports = class KillOnSignal extends BaseHandler {
constructor({ process }) {
super();

this.process = process;
}

Expand All @@ -15,16 +18,18 @@ module.exports = class KillOnSignal {
});
});

return commands.map(command => {
const closeStream = command.close.pipe(map(exitInfo => {
const exitCode = caughtSignal === 'SIGINT' ? 0 : exitInfo.exitCode;
return Object.assign({}, exitInfo, { exitCode });
}));
return new Proxy(command, {
get(target, prop) {
return prop === 'close' ? closeStream : target[prop];
}
});
});
return {
commands: commands.map(command => {
const closeStream = command.close.pipe(map(exitInfo => {
const exitCode = caughtSignal === 'SIGINT' ? 0 : exitInfo.exitCode;
return Object.assign({}, exitInfo, { exitCode });
}));
return new Proxy(command, {
get(target, prop) {
return prop === 'close' ? closeStream : target[prop];
}
});
})
};
}
};
6 changes: 3 additions & 3 deletions src/flow-control/kill-on-signal.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ beforeEach(() => {
});

it('returns commands that keep non-close streams from original commands', () => {
const newCommands = controller.handle(commands);
const { commands: newCommands } = controller.handle(commands);
newCommands.forEach((newCommand, i) => {
expect(newCommand.close).not.toBe(commands[i].close);
expect(newCommand.error).toBe(commands[i].error);
Expand All @@ -24,7 +24,7 @@ it('returns commands that keep non-close streams from original commands', () =>
});

it('returns commands that map SIGINT to exit code 0', () => {
const newCommands = controller.handle(commands);
const { commands: newCommands } = controller.handle(commands);
expect(newCommands).not.toBe(commands);
expect(newCommands).toHaveLength(commands.length);

Expand All @@ -40,7 +40,7 @@ it('returns commands that map SIGINT to exit code 0', () => {
});

it('returns commands that keep non-SIGINT exit codes', () => {
const newCommands = controller.handle(commands);
const { commands: newCommands } = controller.handle(commands);
expect(newCommands).not.toBe(commands);
expect(newCommands).toHaveLength(commands.length);

Expand Down
Loading