Skip to content

Commit

Permalink
Basic pthreads support for Node.js Workers (#9745)
Browse files Browse the repository at this point in the history
This ended up tricky because of various node/web differences, like the global
scope, the event loop, APIs, etc. With help on #6567 I think this PR finally
manages to make it work, at least for an initial "hello world" type test.
Thanks to everyone on that issue and in particular @addaleax!

This moves some code from shell.js into side .js files, so that we can use it in
multiple places.

Fixes #6567 but as we add more tests I'm sure we'll find more issues, this just
shows basic functionality so far. We may not be able to easily reuse the
existing browser tests as they have browser-specific things in them, but we
can try.
  • Loading branch information
kripken committed Nov 1, 2019
1 parent 9483894 commit 5e3029d
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 101 deletions.
1 change: 1 addition & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Current Trunk
- Add `pthread_tryjoin_np`, which is a POSIX API similar to `pthread_join`
but without blocking.
- New function emscripten_has_asyncify()
- Add support for pthreads in Node.js, using Node Workers. See #9745

v1.39.1: 10/30/2019
-------------------
Expand Down
8 changes: 7 additions & 1 deletion src/compiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,16 @@ Runtime.QUANTUM_SIZE = 4;

var ENVIRONMENTS = ENVIRONMENT.split(',');
ENVIRONMENT_MAY_BE_WEB = !ENVIRONMENT || ENVIRONMENTS.indexOf('web') >= 0;
ENVIRONMENT_MAY_BE_WORKER = !ENVIRONMENT || ENVIRONMENTS.indexOf('worker') >= 0;
ENVIRONMENT_MAY_BE_NODE = !ENVIRONMENT || ENVIRONMENTS.indexOf('node') >= 0;
ENVIRONMENT_MAY_BE_SHELL = !ENVIRONMENT || ENVIRONMENTS.indexOf('shell') >= 0;

// The worker case also includes Node.js workers when pthreads are
// enabled and Node.js is one of the supported environments for the build to
// run on. Node.js workers are detected as a combination of
// ENVIRONMENT_IS_WORKER and ENVIRONMENT_HAS_NODE.
ENVIRONMENT_MAY_BE_WORKER = !ENVIRONMENT || ENVIRONMENTS.indexOf('worker') >= 0 ||
(ENVIRONMENT_MAY_BE_NODE && USE_PTHREADS);

if (ENVIRONMENT && !(ENVIRONMENT_MAY_BE_WEB || ENVIRONMENT_MAY_BE_WORKER || ENVIRONMENT_MAY_BE_NODE || ENVIRONMENT_MAY_BE_SHELL)) {
throw 'Invalid environment specified in "ENVIRONMENT": ' + ENVIRONMENT + '. Should be one of: web, worker, node, shell.';
}
Expand Down
18 changes: 18 additions & 0 deletions src/library_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,18 @@ var LibraryPThread = {
worker.onerror = function(e) {
err('pthread sent an error! ' + e.filename + ':' + e.lineno + ': ' + e.message);
};

if (ENVIRONMENT_HAS_NODE) {
worker.on('message', function(data) {
worker.onmessage({ data: data });
});
worker.on('error', function(data) {
worker.onerror(data.err);
});
worker.on('exit', function(data) {
console.log('worker exited - TODO: update the worker queue?');
});
}
}(worker));
} // for each worker
},
Expand Down Expand Up @@ -503,6 +515,7 @@ var LibraryPThread = {
if (ENVIRONMENT_IS_PTHREAD) throw 'Internal Error! _spawn_thread() can only ever be called from main application thread!';

var worker = PThread.getNewWorker();

if (worker.pthread !== undefined) throw 'Internal error!';
if (!threadParams.pthread_ptr) throw 'Internal error, no pthread ptr!';
PThread.runningWorkers.push(worker);
Expand Down Expand Up @@ -944,6 +957,11 @@ var LibraryPThread = {
else PThread.threadExit(status);
#if WASM_BACKEND
// pthread_exit is marked noReturn, so we must not return from it.
if (ENVIRONMENT_HAS_NODE) {
// exit the pthread properly on node, as a normal JS exception will halt
// the entire application.
process.exit(status);
}
throw 'pthread_exit';
#endif
},
Expand Down
25 changes: 25 additions & 0 deletions src/node_shell_read.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
read_ = function shell_read(filename, binary) {
var ret;
#if SUPPORT_BASE64_EMBEDDING
ret = tryParseAsDataURI(filename);
if (!ret) {
#endif
if (!nodeFS) nodeFS = require('fs');
if (!nodePath) nodePath = require('path');
filename = nodePath['normalize'](filename);
ret = nodeFS['readFileSync'](filename);
#if SUPPORT_BASE64_EMBEDDING
}
#endif
return binary ? ret : ret.toString();
};

readBinary = function readBinary(filename) {
var ret = read_(filename, true);
if (!ret.buffer) {
ret = new Uint8Array(ret);
}
assert(ret.buffer);
return ret;
};

8 changes: 7 additions & 1 deletion src/runtime_init_memory.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ if (ENVIRONMENT_IS_PTHREAD) {
#endif
});
#if USE_PTHREADS
assert(wasmMemory.buffer instanceof SharedArrayBuffer, 'requested a shared WebAssembly.Memory but the returned buffer is not a SharedArrayBuffer, indicating that while the browser has SharedArrayBuffer it does not have WebAssembly threads support - you may need to set a flag');
if (!(wasmMemory.buffer instanceof SharedArrayBuffer)) {
err('requested a shared WebAssembly.Memory but the returned buffer is not a SharedArrayBuffer, indicating that while the browser has SharedArrayBuffer it does not have WebAssembly threads support - you may need to set a flag');
if (ENVIRONMENT_HAS_NODE) {
console.log('(on node you may need: --experimental-wasm-threads --experimental-wasm-bulk-memory and also use a recent version)');
}
throw Error('bad memory');
}
#endif
}

Expand Down
135 changes: 46 additions & 89 deletions src/shell.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ if (Module['ENVIRONMENT']) {
var _scriptDir = import.meta.url;
#else
var _scriptDir = (typeof document !== 'undefined' && document.currentScript) ? document.currentScript.src : undefined;

if (ENVIRONMENT_IS_NODE) {
_scriptDir = __filename;
}
#endif
#endif

Expand All @@ -118,6 +122,9 @@ var read_,
setWindowTitle;

#if ENVIRONMENT_MAY_BE_NODE
var nodeFS;
var nodePath;

if (ENVIRONMENT_IS_NODE) {
#if ENVIRONMENT
#if ASSERTIONS
Expand All @@ -126,35 +133,7 @@ if (ENVIRONMENT_IS_NODE) {
#endif
scriptDirectory = __dirname + '/';

// Expose functionality in the same simple way that the shells work
// Note that we pollute the global namespace here, otherwise we break in node
var nodeFS;
var nodePath;

read_ = function shell_read(filename, binary) {
var ret;
#if SUPPORT_BASE64_EMBEDDING
ret = tryParseAsDataURI(filename);
if (!ret) {
#endif
if (!nodeFS) nodeFS = require('fs');
if (!nodePath) nodePath = require('path');
filename = nodePath['normalize'](filename);
ret = nodeFS['readFileSync'](filename);
#if SUPPORT_BASE64_EMBEDDING
}
#endif
return binary ? ret : ret.toString();
};

readBinary = function readBinary(filename) {
var ret = read_(filename, true);
if (!ret.buffer) {
ret = new Uint8Array(ret);
}
assert(ret.buffer);
return ret;
};
#include "node_shell_read.js"

if (process['argv'].length > 1) {
thisProgram = process['argv'][1].replace(/\\/g, '/');
Expand Down Expand Up @@ -188,6 +167,18 @@ if (ENVIRONMENT_IS_NODE) {
};

Module['inspect'] = function () { return '[Emscripten Module object]'; };

#if USE_PTHREADS
var nodeWorkerThreads;
try {
nodeWorkerThreads = require('worker_threads');
} catch (e) {
console.error('The "worker_threads" module is not supported in this node.js build - perhaps a newer version is needed?');
throw e;
}
Worker = nodeWorkerThreads.Worker;
#endif

} else
#endif // ENVIRONMENT_MAY_BE_NODE
#if ENVIRONMENT_MAY_BE_SHELL
Expand Down Expand Up @@ -247,6 +238,10 @@ if (ENVIRONMENT_IS_SHELL) {
}
} else
#endif // ENVIRONMENT_MAY_BE_SHELL

// Note that this includes Node.js workers when relevant (pthreads is enabled).
// Node.js workers are detected as a combination of ENVIRONMENT_IS_WORKER and
// ENVIRONMENT_HAS_NODE.
#if ENVIRONMENT_MAY_BE_WEB || ENVIRONMENT_MAY_BE_WORKER
if (ENVIRONMENT_IS_WEB || ENVIRONMENT_IS_WORKER) {
if (ENVIRONMENT_IS_WORKER) { // Check worker, not web, since window could be polyfilled
Expand Down Expand Up @@ -277,68 +272,20 @@ if (ENVIRONMENT_IS_WEB || ENVIRONMENT_IS_WORKER) {
#endif
#endif

read_ = function shell_read(url) {
#if SUPPORT_BASE64_EMBEDDING
try {
#endif
var xhr = new XMLHttpRequest();
xhr.open('GET', url, false);
xhr.send(null);
return xhr.responseText;
#if SUPPORT_BASE64_EMBEDDING
} catch (err) {
var data = tryParseAsDataURI(url);
if (data) {
return intArrayToString(data);
}
throw err;
}
#endif
};
// Differentiate the Web Worker from the Node Worker case, as reading must
// be done differently.
#if USE_PTHREADS
if (ENVIRONMENT_HAS_NODE) {

if (ENVIRONMENT_IS_WORKER) {
readBinary = function readBinary(url) {
#if SUPPORT_BASE64_EMBEDDING
try {
#endif
var xhr = new XMLHttpRequest();
xhr.open('GET', url, false);
xhr.responseType = 'arraybuffer';
xhr.send(null);
return new Uint8Array(xhr.response);
#if SUPPORT_BASE64_EMBEDDING
} catch (err) {
var data = tryParseAsDataURI(url);
if (data) {
return data;
}
throw err;
}
#endif
};
}
#include "node_shell_read.js"

readAsync = function readAsync(url, onload, onerror) {
var xhr = new XMLHttpRequest();
xhr.open('GET', url, true);
xhr.responseType = 'arraybuffer';
xhr.onload = function xhr_onload() {
if (xhr.status == 200 || (xhr.status == 0 && xhr.response)) { // file URLs can return 0
onload(xhr.response);
return;
}
#if SUPPORT_BASE64_EMBEDDING
var data = tryParseAsDataURI(url);
if (data) {
onload(data.buffer);
return;
}
} else
#endif
onerror();
};
xhr.onerror = onerror;
xhr.send(null);
};
{

#include "web_or_worker_shell_read.js"

}

setWindowTitle = function(title) { document.title = title };
} else
Expand All @@ -349,6 +296,16 @@ if (ENVIRONMENT_IS_WEB || ENVIRONMENT_IS_WORKER) {
#endif // ASSERTIONS
}

#if ENVIRONMENT_MAY_BE_NODE && USE_PTHREADS
if (ENVIRONMENT_HAS_NODE) {
// Polyfill the performance object, which emscripten pthreads support
// depends on for good timing.
if (typeof performance === 'undefined') {
performance = require('perf_hooks').performance;
}
}
#endif

// Set up the out() and err() hooks, which are how we can print to stdout or
// stderr, respectively.
{{{ makeModuleReceiveWithVar('out', 'print', 'console.log.bind(console)', true) }}}
Expand Down Expand Up @@ -393,7 +350,7 @@ assert(typeof Module['setWindowTitle'] === 'undefined', 'Module.setWindowTitle o
{{{ makeRemovedFSAssert('NODEFS') }}}

#if USE_PTHREADS
assert(ENVIRONMENT_IS_WEB || ENVIRONMENT_IS_WORKER, 'Pthreads do not work in non-browser environments yet (need Web Workers, or an alternative to them)');
assert(ENVIRONMENT_IS_WEB || ENVIRONMENT_IS_WORKER || ENVIRONMENT_IS_NODE, 'Pthreads do not work in this environment yet (need Web Workers, or an alternative to them)');
#endif // USE_PTHREADS
#endif // ASSERTIONS

Expand Down
63 changes: 63 additions & 0 deletions src/web_or_worker_shell_read.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
read_ = function shell_read(url) {
#if SUPPORT_BASE64_EMBEDDING
try {
#endif
var xhr = new XMLHttpRequest();
xhr.open('GET', url, false);
xhr.send(null);
return xhr.responseText;
#if SUPPORT_BASE64_EMBEDDING
} catch (err) {
var data = tryParseAsDataURI(url);
if (data) {
return intArrayToString(data);
}
throw err;
}
#endif
};

if (ENVIRONMENT_IS_WORKER) {
readBinary = function readBinary(url) {
#if SUPPORT_BASE64_EMBEDDING
try {
#endif
var xhr = new XMLHttpRequest();
xhr.open('GET', url, false);
xhr.responseType = 'arraybuffer';
xhr.send(null);
return new Uint8Array(xhr.response);
#if SUPPORT_BASE64_EMBEDDING
} catch (err) {
var data = tryParseAsDataURI(url);
if (data) {
return data;
}
throw err;
}
#endif
};
}

readAsync = function readAsync(url, onload, onerror) {
var xhr = new XMLHttpRequest();
xhr.open('GET', url, true);
xhr.responseType = 'arraybuffer';
xhr.onload = function xhr_onload() {
if (xhr.status == 200 || (xhr.status == 0 && xhr.response)) { // file URLs can return 0
onload(xhr.response);
return;
}
#if SUPPORT_BASE64_EMBEDDING
var data = tryParseAsDataURI(url);
if (data) {
onload(data.buffer);
return;
}
#endif
onerror();
};
xhr.onerror = onerror;
xhr.send(null);
};

Loading

0 comments on commit 5e3029d

Please sign in to comment.