Skip to content

Commit

Permalink
src: slim down stream_base-inl.h
Browse files Browse the repository at this point in the history
PR-URL: #46972
Refs: #43712
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: Juan José Arboleda <soyjuanarbol@gmail.com>
Reviewed-By: Debadree Chatterjee <debadree333@gmail.com>
  • Loading branch information
lilsweetcaligula authored and targos committed May 2, 2023
1 parent ba9ec91 commit adc1601
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 140 deletions.
135 changes: 0 additions & 135 deletions src/stream_base-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,30 +67,6 @@ void StreamResource::PushStreamListener(StreamListener* listener) {
listener_ = listener;
}

void StreamResource::RemoveStreamListener(StreamListener* listener) {
CHECK_NOT_NULL(listener);

StreamListener* previous;
StreamListener* current;

// Remove from the linked list.
for (current = listener_, previous = nullptr;
/* No loop condition because we want a crash if listener is not found */
; previous = current, current = current->previous_listener_) {
CHECK_NOT_NULL(current);
if (current == listener) {
if (previous != nullptr)
previous->previous_listener_ = current->previous_listener_;
else
listener_ = listener->previous_listener_;
break;
}
}

listener->stream_ = nullptr;
listener->previous_listener_ = nullptr;
}

uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) {
DebugSealHandleScope seal_handle_scope;
return listener_->OnStreamAlloc(suggested_size);
Expand Down Expand Up @@ -122,101 +98,6 @@ StreamBase::StreamBase(Environment* env) : env_(env) {
PushStreamListener(&default_listener_);
}

int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
Environment* env = stream_env();

v8::HandleScope handle_scope(env->isolate());

if (req_wrap_obj.IsEmpty()) {
if (!env->shutdown_wrap_template()
->NewInstance(env->context())
.ToLocal(&req_wrap_obj)) {
return UV_EBUSY;
}
StreamReq::ResetObject(req_wrap_obj);
}

BaseObjectPtr<AsyncWrap> req_wrap_ptr;
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
if (req_wrap != nullptr)
req_wrap_ptr.reset(req_wrap->GetAsyncWrap());
int err = DoShutdown(req_wrap);

if (err != 0 && req_wrap != nullptr) {
req_wrap->Dispose();
}

const char* msg = Error();
if (msg != nullptr) {
if (req_wrap_obj->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), msg)).IsNothing()) {
return UV_EBUSY;
}
ClearError();
}

return err;
}

StreamWriteResult StreamBase::Write(uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle,
v8::Local<v8::Object> req_wrap_obj,
bool skip_try_write) {
Environment* env = stream_env();
int err;

size_t total_bytes = 0;
for (size_t i = 0; i < count; ++i)
total_bytes += bufs[i].len;
bytes_written_ += total_bytes;

if (send_handle == nullptr && !skip_try_write) {
err = DoTryWrite(&bufs, &count);
if (err != 0 || count == 0) {
return StreamWriteResult { false, err, nullptr, total_bytes, {} };
}
}

v8::HandleScope handle_scope(env->isolate());

if (req_wrap_obj.IsEmpty()) {
if (!env->write_wrap_template()
->NewInstance(env->context())
.ToLocal(&req_wrap_obj)) {
return StreamWriteResult { false, UV_EBUSY, nullptr, 0, {} };
}
StreamReq::ResetObject(req_wrap_obj);
}

AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap());

err = DoWrite(req_wrap, bufs, count, send_handle);
bool async = err == 0;

if (!async) {
req_wrap->Dispose();
req_wrap = nullptr;
}

const char* msg = Error();
if (msg != nullptr) {
if (req_wrap_obj->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), msg)).IsNothing()) {
return StreamWriteResult { false, UV_EBUSY, nullptr, 0, {} };
}
ClearError();
}

return StreamWriteResult {
async, err, req_wrap, total_bytes, std::move(req_wrap_ptr) };
}

template <typename OtherBase>
SimpleShutdownWrap<OtherBase>::SimpleShutdownWrap(
StreamBase* stream,
Expand Down Expand Up @@ -278,22 +159,6 @@ void WriteWrap::SetBackingStore(std::unique_ptr<v8::BackingStore> bs) {
backing_store_ = std::move(bs);
}

void StreamReq::Done(int status, const char* error_str) {
AsyncWrap* async_wrap = GetAsyncWrap();
Environment* env = async_wrap->env();
if (error_str != nullptr) {
v8::HandleScope handle_scope(env->isolate());
if (async_wrap->object()->Set(
env->context(),
env->error_string(),
OneByteString(env->isolate(), error_str)).IsNothing()) {
return;
}
}

OnDone(status);
}

void StreamReq::ResetObject(v8::Local<v8::Object> obj) {
DCHECK_GT(obj->InternalFieldCount(), StreamReq::kStreamReqField);

Expand Down
138 changes: 138 additions & 0 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,103 @@ using v8::Signature;
using v8::String;
using v8::Value;

int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
Environment* env = stream_env();

v8::HandleScope handle_scope(env->isolate());

if (req_wrap_obj.IsEmpty()) {
if (!env->shutdown_wrap_template()
->NewInstance(env->context())
.ToLocal(&req_wrap_obj)) {
return UV_EBUSY;
}
StreamReq::ResetObject(req_wrap_obj);
}

BaseObjectPtr<AsyncWrap> req_wrap_ptr;
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
if (req_wrap != nullptr) req_wrap_ptr.reset(req_wrap->GetAsyncWrap());
int err = DoShutdown(req_wrap);

if (err != 0 && req_wrap != nullptr) {
req_wrap->Dispose();
}

const char* msg = Error();
if (msg != nullptr) {
if (req_wrap_obj
->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), msg))
.IsNothing()) {
return UV_EBUSY;
}
ClearError();
}

return err;
}

StreamWriteResult StreamBase::Write(uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle,
v8::Local<v8::Object> req_wrap_obj,
bool skip_try_write) {
Environment* env = stream_env();
int err;

size_t total_bytes = 0;
for (size_t i = 0; i < count; ++i) total_bytes += bufs[i].len;
bytes_written_ += total_bytes;

if (send_handle == nullptr && !skip_try_write) {
err = DoTryWrite(&bufs, &count);
if (err != 0 || count == 0) {
return StreamWriteResult{false, err, nullptr, total_bytes, {}};
}
}

v8::HandleScope handle_scope(env->isolate());

if (req_wrap_obj.IsEmpty()) {
if (!env->write_wrap_template()
->NewInstance(env->context())
.ToLocal(&req_wrap_obj)) {
return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
}
StreamReq::ResetObject(req_wrap_obj);
}

AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap());

err = DoWrite(req_wrap, bufs, count, send_handle);
bool async = err == 0;

if (!async) {
req_wrap->Dispose();
req_wrap = nullptr;
}

const char* msg = Error();
if (msg != nullptr) {
if (req_wrap_obj
->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), msg))
.IsNothing()) {
return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
}
ClearError();
}

return StreamWriteResult{
async, err, req_wrap, total_bytes, std::move(req_wrap_ptr)};
}

template int StreamBase::WriteString<ASCII>(
const FunctionCallbackInfo<Value>& args);
template int StreamBase::WriteString<UTF8>(
Expand Down Expand Up @@ -680,6 +777,30 @@ StreamResource::~StreamResource() {
}
}

void StreamResource::RemoveStreamListener(StreamListener* listener) {
CHECK_NOT_NULL(listener);

StreamListener* previous;
StreamListener* current;

// Remove from the linked list.
// No loop condition because we want a crash if listener is not found.
for (current = listener_, previous = nullptr;;
previous = current, current = current->previous_listener_) {
CHECK_NOT_NULL(current);
if (current == listener) {
if (previous != nullptr)
previous->previous_listener_ = current->previous_listener_;
else
listener_ = listener->previous_listener_;
break;
}
}

listener->stream_ = nullptr;
listener->previous_listener_ = nullptr;
}

ShutdownWrap* StreamBase::CreateShutdownWrap(
Local<Object> object) {
auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
Expand All @@ -694,4 +815,21 @@ WriteWrap* StreamBase::CreateWriteWrap(
return wrap;
}

void StreamReq::Done(int status, const char* error_str) {
AsyncWrap* async_wrap = GetAsyncWrap();
Environment* env = async_wrap->env();
if (error_str != nullptr) {
v8::HandleScope handle_scope(env->isolate());
if (async_wrap->object()
->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), error_str))
.IsNothing()) {
return;
}
}

OnDone(status);
}

} // namespace node
9 changes: 4 additions & 5 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class StreamReq {

// TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
// if there is a pending exception/termination.
inline void Done(int status, const char* error_str = nullptr);
void Done(int status, const char* error_str = nullptr);
inline void Dispose();

StreamBase* stream() const { return stream_; }
Expand Down Expand Up @@ -276,7 +276,7 @@ class StreamResource {
inline void PushStreamListener(StreamListener* listener);
// Remove a listener, and, if this was the currently active one,
// transfer ownership back to the previous listener.
inline void RemoveStreamListener(StreamListener* listener);
void RemoveStreamListener(StreamListener* listener);

protected:
// Call the current listener's OnStreamAlloc() method.
Expand Down Expand Up @@ -339,8 +339,7 @@ class StreamBase : public StreamResource {
// ShutdownWrap object (that was created in JS), or a new one will be created.
// Returns 1 in case of a synchronous completion, 0 in case of asynchronous
// completion, and a libuv error case in case of synchronous failure.
inline int Shutdown(
v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
int Shutdown(v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());

// TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
// if there is a pending exception/termination.
Expand All @@ -353,7 +352,7 @@ class StreamBase : public StreamResource {
// write is too large to finish synchronously.
// If the return value indicates a synchronous completion, no callback will
// be invoked.
inline StreamWriteResult Write(
StreamWriteResult Write(
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle = nullptr,
Expand Down

0 comments on commit adc1601

Please sign in to comment.