Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shutdown node in-flight #21283

Merged
merged 1 commit into from
Mar 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/api/callback.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ void InternalCallbackScope::Close() {
HandleScope handle_scope(env_->isolate());

if (!env_->can_call_into_js()) return;
if (failed_ && !env_->is_main_thread() && env_->is_stopping_worker()) {
if (failed_ && !env_->is_main_thread() && env_->is_stopping()) {
env_->async_hooks()->clear_async_id_stack();
}

Expand Down
2 changes: 1 addition & 1 deletion src/api/environment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ static bool ShouldAbortOnUncaughtException(Isolate* isolate) {
DebugSealHandleScope scope(isolate);
Environment* env = Environment::GetCurrent(isolate);
return env != nullptr &&
(env->is_main_thread() || !env->is_stopping_worker()) &&
(env->is_main_thread() || !env->is_stopping()) &&
env->should_abort_on_uncaught_toggle()[0] &&
!env->inside_should_not_abort_on_uncaught_scope();
}
Expand Down
8 changes: 3 additions & 5 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include "v8.h"
#include "node_perf_common.h"
#include "node_context_data.h"
#include "node_worker.h"

#include <cstddef>
#include <cstdint>
Expand Down Expand Up @@ -661,7 +660,7 @@ void Environment::SetUnrefImmediate(native_immediate_callback cb,
}

inline bool Environment::can_call_into_js() const {
return can_call_into_js_ && (is_main_thread() || !is_stopping_worker());
return can_call_into_js_ && !is_stopping();
}

inline void Environment::set_can_call_into_js(bool can_call_into_js) {
Expand Down Expand Up @@ -709,9 +708,8 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) {
sub_worker_contexts_.erase(context);
}

inline bool Environment::is_stopping_worker() const {
CHECK(!is_main_thread());
return worker_context_->is_stopped();
inline bool Environment::is_stopping() const {
return thread_stopper_.IsStopped();
addaleax marked this conversation as resolved.
Show resolved Hide resolved
}

inline performance::performance_state* Environment::performance_state() {
Expand Down
62 changes: 62 additions & 0 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,14 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_prepare_handle_));
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));

GetAsyncRequest()->Install(
this, static_cast<void*>(this), [](uv_async_t* handle) {
Environment* env = static_cast<Environment*>(handle->data);
uv_stop(env->event_loop());
});
GetAsyncRequest()->SetStopped(false);
uv_unref(reinterpret_cast<uv_handle_t*>(GetAsyncRequest()->GetHandle()));

// Register clean-up cb to be called to clean up the handles
// when the environment is freed, note that they are not cleaned in
// the one environment per process setup, but will be called in
Expand All @@ -355,6 +363,12 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
uv_key_set(&thread_local_env, this);
}

void Environment::ExitEnv() {
set_can_call_into_js(false);
GetAsyncRequest()->Stop();
isolate_->TerminateExecution();
}

MaybeLocal<Object> Environment::ProcessCliArgs(
const std::vector<std::string>& args,
const std::vector<std::string>& exec_args) {
Expand Down Expand Up @@ -519,6 +533,7 @@ void Environment::RunCleanup() {
started_cleanup_ = true;
TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment),
"RunCleanup", this);
GetAsyncRequest()->Uninstall();
CleanupHandles();

while (!cleanup_hooks_.empty()) {
Expand Down Expand Up @@ -932,6 +947,53 @@ char* Environment::Reallocate(char* data, size_t old_size, size_t size) {
return new_data;
}

void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
Mutex::ScopedLock lock(mutex_);
env_ = env;
async_ = new uv_async_t;
async_->data = data;
CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
}

void AsyncRequest::Uninstall() {
Mutex::ScopedLock lock(mutex_);
if (async_ != nullptr) {
env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
async_ = nullptr;
}
}

void AsyncRequest::Stop() {
Mutex::ScopedLock lock(mutex_);
stop_ = true;
if (async_ != nullptr) uv_async_send(async_);
}

void AsyncRequest::SetStopped(bool flag) {
Mutex::ScopedLock lock(mutex_);
stop_ = flag;
}

bool AsyncRequest::IsStopped() const {
Mutex::ScopedLock lock(mutex_);
return stop_;
}

uv_async_t* AsyncRequest::GetHandle() {
Mutex::ScopedLock lock(mutex_);
return async_;
}

void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
Mutex::ScopedLock lock(mutex_);
if (async_ != nullptr) tracker->TrackField("async_request", *async_);
}

AsyncRequest::~AsyncRequest() {
Mutex::ScopedLock lock(mutex_);
CHECK_NULL(async_);
}

// Not really any better place than env.cc at this moment.
void BaseObject::DeleteMe(void* data) {
BaseObject* self = static_cast<BaseObject*>(data);
Expand Down
29 changes: 28 additions & 1 deletion src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,27 @@ struct AllocatedBuffer {
friend class Environment;
};

class AsyncRequest : public MemoryRetainer {
public:
AsyncRequest() {}
~AsyncRequest();
void Install(Environment* env, void* data, uv_async_cb target);
void Uninstall();
void Stop();
void SetStopped(bool flag);
bool IsStopped() const;
uv_async_t* GetHandle();
void MemoryInfo(MemoryTracker* tracker) const override;
SET_MEMORY_INFO_NAME(AsyncRequest)
SET_SELF_SIZE(AsyncRequest)

private:
Environment* env_;
uv_async_t* async_ = nullptr;
mutable Mutex mutex_;
bool stop_ = true;
};

class Environment {
public:
class AsyncHooks {
Expand Down Expand Up @@ -695,6 +716,7 @@ class Environment {
void RegisterHandleCleanups();
void CleanupHandles();
void Exit(int code);
void ExitEnv();

// Register clean-up cb to be called on environment destruction.
inline void RegisterHandleCleanup(uv_handle_t* handle,
Expand Down Expand Up @@ -844,7 +866,7 @@ class Environment {
inline void add_sub_worker_context(worker::Worker* context);
inline void remove_sub_worker_context(worker::Worker* context);
void stop_sub_worker_contexts();
inline bool is_stopping_worker() const;
inline bool is_stopping() const;

inline void ThrowError(const char* errmsg);
inline void ThrowTypeError(const char* errmsg);
Expand Down Expand Up @@ -1018,6 +1040,7 @@ class Environment {
inline ExecutionMode execution_mode() { return execution_mode_; }

inline void set_execution_mode(ExecutionMode mode) { execution_mode_ = mode; }
inline AsyncRequest* GetAsyncRequest() { return &thread_stopper_; }

private:
inline void CreateImmediate(native_immediate_callback cb,
Expand Down Expand Up @@ -1174,6 +1197,10 @@ class Environment {
uint64_t cleanup_hook_counter_ = 0;
bool started_cleanup_ = false;

// A custom async abstraction (a pair of async handle and a state variable)
// Used by embedders to shutdown running Node instance.
AsyncRequest thread_stopper_;

static void EnvPromiseHook(v8::PromiseHookType type,
v8::Local<v8::Promise> promise,
v8::Local<v8::Value> parent);
Expand Down
2 changes: 1 addition & 1 deletion src/module_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ void ModuleWrap::Evaluate(const FunctionCallbackInfo<Value>& args) {

// Convert the termination exception into a regular exception.
if (timed_out || received_signal) {
if (!env->is_main_thread() && env->is_stopping_worker())
if (!env->is_main_thread() && env->is_stopping())
return;
env->isolate()->CancelTerminateExecution();
// It is possible that execution was terminated by another timeout in
Expand Down
10 changes: 7 additions & 3 deletions src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -832,15 +832,14 @@ inline int StartNodeWithIsolate(Isolate* isolate,
per_process::v8_platform.DrainVMTasks(isolate);

more = uv_loop_alive(env.event_loop());
if (more)
continue;
if (more && !env.GetAsyncRequest()->IsStopped()) continue;

RunBeforeExit(&env);

// Emit `beforeExit` if the loop became alive either after emitting
// event, or after running some callbacks.
more = uv_loop_alive(env.event_loop());
} while (more == true);
} while (more == true && !env.GetAsyncRequest()->IsStopped());
env.performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
}
Expand Down Expand Up @@ -977,6 +976,11 @@ int Start(int argc, char** argv) {
return exit_code;
}

int Stop(Environment* env) {
env->ExitEnv();
return 0;
}
addaleax marked this conversation as resolved.
Show resolved Hide resolved

} // namespace node

#if !HAVE_INSPECTOR
Expand Down
10 changes: 7 additions & 3 deletions src/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,17 @@ typedef intptr_t ssize_t;

namespace node {

class IsolateData;
class Environment;

// TODO(addaleax): Officially deprecate this and replace it with something
// better suited for a public embedder API.
NODE_EXTERN int Start(int argc, char* argv[]);

// Tear down Node.js while it is running (there are active handles
// in the loop and / or actively executing JavaScript code).
NODE_EXTERN int Stop(Environment* env);

// TODO(addaleax): Officially deprecate this and replace it with something
// better suited for a public embedder API.
NODE_EXTERN void Init(int* argc,
Expand Down Expand Up @@ -239,9 +246,6 @@ class NODE_EXTERN ArrayBufferAllocator : public v8::ArrayBuffer::Allocator {
NODE_EXTERN ArrayBufferAllocator* CreateArrayBufferAllocator();
NODE_EXTERN void FreeArrayBufferAllocator(ArrayBufferAllocator* allocator);

class IsolateData;
class Environment;

class NODE_EXTERN MultiIsolatePlatform : public v8::Platform {
public:
~MultiIsolatePlatform() override { }
Expand Down
2 changes: 1 addition & 1 deletion src/node_contextify.cc
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ bool ContextifyScript::EvalMachine(Environment* env,

// Convert the termination exception into a regular exception.
if (timed_out || received_signal) {
if (!env->is_main_thread() && env->is_stopping_worker())
if (!env->is_main_thread() && env->is_stopping())
return false;
env->isolate()->CancelTerminateExecution();
// It is possible that execution was terminated by another timeout in
Expand Down
Loading