Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend AsyncClient to support streaming requests / responses #353

Merged
merged 22 commits into from
Feb 2, 2017
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 87 additions & 1 deletion include/envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,45 @@ class AsyncClient {
};

/**
* An in-flight HTTP request
* Notifies caller of async HTTP stream status.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth having some exposition about the full-duplex nature of streams in the documentation. For instance StreamCallbacks can continue to receive events even if the Stream has had sendXXX(..., end_stream=true) called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* Note the HTTP stream is full-duplex, even if the local to remote stream has been ended
* by Stream.sendHeaders/sendData with end_stream=true or sendTrailers,
* StreamCallbacks can continue to receive events until the remote to local stream is closed,
* and vice versa.
*/
class StreamCallbacks {
public:
virtual ~StreamCallbacks() {}

/**
* Called when all headers get received on the async HTTP stream.
* @param headers the headers received
* @param end_stream whether the response is header only
*/
virtual void onHeaders(HeaderMapPtr&& headers, bool end_stream) PURE;

/**
* Called when a data frame get received on the async HTTP stream.
* This can be invoked multiple times if the data get streamed.
* @param data the data received
* @param end_stream whether the data is the last data frame
*/
virtual void onData(Buffer::Instance& data, bool end_stream) PURE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be invoked multiple times if the data get streamed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link

@louiscryan louiscryan Jan 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is stream closure signalled even if no data was delivered? Seems cleaner to separate into a separate callback otherwise closure handling will end up being overloaded into each handler impl

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me the StreamDecoderFilter is an interface which already provide all the functionality. The Stream interface looks like an unnecessary wrapper on top of it. Use the StreamDecoderFilter directly by get the router (or a wrapper of router with active count, etc) is more straightforward.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I mostly agree about StreamDecoderFilter but there are many aspects of StreamDecoderFilterCallbacks that people are unlikely to need or want.

The fact that an explicit 'reset' handler must be installed

https://github.com/lyft/envoy/blob/89de442d82a1727aa7f53604748e953cc27b4849/include/envoy/http/filter.h#L69

does make the API less obvious than this one.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... the more I look at this the less convinced I am. The StreamXXXFilter methods are more like the ones I would expect a client to implement and the StreamXXXFilterCallbacks look more like the write interface.

@mattklein123 can probably shed some like on the relationships and why things are implemented this way.

For pure client use-cases I would expect an API more like the one @lizan wrote.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the simple interface that @lizan wrote is the one to use. Here is why this all works this way: The way AsyncClient is implemented is that it basically mimics ActiveStream inside the HTTP connection manager, and instantiates a router filter to do the work. On one hand, this is kind of hack, on the other hand, it's an implementation detail so it doesn't really matter. So, in the end of the data, internally, the async client implements the StreamXXXCallback methods, which is the write interface, and then translates those "writes" into async client request/stream callbacks, which is this interface.


/**
* Called when all trailers get received on the async HTTP stream.
* @param trailers the trailers received.
*/
virtual void onTrailers(HeaderMapPtr&& trailers) PURE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When all trailers get received.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/**
* Called when the async HTTP stream is reset.
*/
virtual void onReset() PURE;
};

/**
* An in-flight HTTP request.
*/
class Request {
public:
Expand All @@ -50,18 +88,66 @@ class AsyncClient {
virtual void cancel() PURE;
};

/**
* An in-flight HTTP stream.
*/
class Stream {
public:
virtual ~Stream() {}

/***
* Send headers to the stream. This method cannot be invoked more than once and
* need to be called before sendData.
* @param headers supplies the headers to send.
* @param end_stream supplies whether this is a header only request.
*/
virtual void sendHeaders(HeaderMap& headers, bool end_stream) PURE;

/***
* Send data to the stream. This method can be invoked multiple times if it get streamed.
* To end the stream without data, call this method with empty buffer.
* @param data supplies the data to send.
* @param end_stream supplies whether this is the last data.
*/
virtual void sendData(Buffer::Instance& data, bool end_stream) PURE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicitly says the sendData can be invoked multiple times if it get streamed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above about closing without data. I don't mind collapsing data write with closure here but then this API needs to document how to do that with no data, e.g. using an empty or null buffer?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... also what is behavior if a call is made after end_stream has already occurred?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's programming error. Looks like an error code should be returned.
Without changing the method signature, it should be ignored with an error log?

Copy link

@louiscryan louiscryan Jan 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we need code to distinguish between programming error when double local close/reset and transport failure (local close after remote reset.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things:

  1. The codec APIs generally follow the http/2 framing format. Since in that API, if you want to close the stream after headers are complete, but without data, you must send an empty data frame along w/ the end stream bit, this is what we do also, for better or worse. Feel free to update the docs to account for this.
  2. There are various asserts in different places to account for programmer error, but the main problem with this is due to object lifetime. Since streams may get destroyed right away, you are not guaranteed to actually be able to check vs. just be dealing with memory corruption / crash situation, so in general we just have a contract that says that once certain events happen you can't use the object anymore. I've tried to be very clear about this in codec.h and filter.h, but let me know if it's not clear enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, documented that should use sendData with empty buffer to half close the stream.


/***
* Send trailers. This method cannot be invoked more than once, and implicitly ends the stream.
* @param trailers supplies the trailers to send.
*/
virtual void sendTrailers(HeaderMap& trailers) PURE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as sendHeaders, sendTrailer cannot be invoked more than once.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/***
* Reset the stream.
*/
virtual void reset() PURE;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this call trigger StreamCallbacks.onResetStream or does that only occur if the RESET is received from the remote side. If we're thread local then it seems like it should call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does call, but this is an internal implementation detail of the async client / request, so I don't think we need to expose from this interface.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamCallbacks.onReset only occur if the RESET is received from the remote.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now reset() will call StreamCallbacks.onReset

};

virtual ~AsyncClient() {}

/**
* Send an HTTP request asynchronously
* @param request the request to send.
* @param callbacks the callbacks to be notified of request status.
* @param timeout supplies the request timeout
* @return a request handle or nullptr if no request could be created. NOTE: In this case
* onFailure() has already been called inline. The client owns the request and the
* handle should just be used to cancel.
*/
virtual Request* send(MessagePtr&& request, Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) PURE;

/**
* Start an HTTP stream asynchronously.
* @param callbacks the callbacks to be notified of stream status.
* @param timeout supplies the stream timeout, measured since when the frame with end_stream
* flag is sent until when the first frame is received.
* @return a stream handle or nullptr if no stream could be started. NOTE: In this case
* onResetStream() has already been called inline. The client owns the stream and
* the handle can be used to send more messages or close the stream.
*/
virtual Stream* start(StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's questionable whether timeout makes sense on this interface, since the only reason I could see it really ever being used is for long lived streaming API requests like you are implementing it for. But I don't have a strong opinion and could go either way. I guess since it's optional, we can leave it. (Please update the comments though).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion either. I will update comments soon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicitly says if no timeout given, it's infinite.

Copy link

@louiscryan louiscryan Jan 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is behavior if timeout is hit, does it implicitly call Stream.reset() and by implication the Callback.onResetStream gets called & on what thread?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all single threaded. Yes, when timeout is hit the stream will be reset with callbacks called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When timeout, it will call StreamCallbacks.onHeader with :status 504 and onData, the header and data is coming from underlying filters.

See this test:
https://github.com/lyft/envoy/pull/353/files#diff-918cf37446b66e452d1114b88b1e61c4R578

};

typedef std::unique_ptr<AsyncClient> AsyncClientPtr;
Expand Down
195 changes: 128 additions & 67 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
#include "async_client_impl.h"
#include "headers.h"

namespace Http {

const std::vector<std::reference_wrapper<const Router::RateLimitPolicyEntry>>
AsyncRequestImpl::NullRateLimitPolicy::rate_limit_policy_entry_;
const AsyncRequestImpl::NullRateLimitPolicy AsyncRequestImpl::RouteEntryImpl::rate_limit_policy_;
const AsyncRequestImpl::NullRetryPolicy AsyncRequestImpl::RouteEntryImpl::retry_policy_;
const AsyncRequestImpl::NullShadowPolicy AsyncRequestImpl::RouteEntryImpl::shadow_policy_;
const AsyncRequestImpl::NullVirtualHost AsyncRequestImpl::RouteEntryImpl::virtual_host_;
const AsyncRequestImpl::NullRateLimitPolicy AsyncRequestImpl::NullVirtualHost::rate_limit_policy_;
AsyncStreamImpl::NullRateLimitPolicy::rate_limit_policy_entry_;
const AsyncStreamImpl::NullRateLimitPolicy AsyncStreamImpl::RouteEntryImpl::rate_limit_policy_;
const AsyncStreamImpl::NullRetryPolicy AsyncStreamImpl::RouteEntryImpl::retry_policy_;
const AsyncStreamImpl::NullShadowPolicy AsyncStreamImpl::RouteEntryImpl::shadow_policy_;
const AsyncStreamImpl::NullVirtualHost AsyncStreamImpl::RouteEntryImpl::virtual_host_;
const AsyncStreamImpl::NullRateLimitPolicy AsyncStreamImpl::NullVirtualHost::rate_limit_policy_;

AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::Store& stats_store,
Event::Dispatcher& dispatcher,
Expand All @@ -22,120 +21,182 @@ AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::St
dispatcher_(dispatcher) {}

AsyncClientImpl::~AsyncClientImpl() {
while (!active_requests_.empty()) {
active_requests_.front()->failDueToClientDestroy();
while (!active_streams_.empty()) {
active_streams_.front()->reset();
}
}

AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
std::unique_ptr<AsyncRequestImpl> new_request{
new AsyncRequestImpl(std::move(request), *this, callbacks, timeout)};
AsyncRequestImpl* async_request =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just initialize this directly in unique_ptr and return front().get() like you do below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a raw pointer here is to avoid downcasting from AsyncStreamImpl to AsyncRequestImpl when it returns. AsyncStreamImpl is not a subclass of AsyncClient::Request.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok sorry sounds good

new AsyncRequestImpl(std::move(request), *this, callbacks, timeout);
std::unique_ptr<AsyncStreamImpl> new_request{async_request};

// The request may get immediately failed. If so, we will return nullptr.
if (!new_request->complete_) {
new_request->moveIntoList(std::move(new_request), active_requests_);
return active_requests_.front().get();
if (!new_request->complete()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading the code correctly, I think you actually want to check for remote_close_ here now, not complete_, or, arguably better, you need to set local_close_ in resetStream().

The reason for this is that the router might respond right away. If it responds before the request is complete, it will reset. In this case, you should return a null handle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry just looked at the code in detail (I haven't looked at this stuff in a while). The router will not actually call resetStream() on its own, it relies on the connection manager to reset if the request when the response is complete. (The main server makes the assumption that even in the streaming case, if the response is finished, the request should be reset if it is not finished).

So, in this case, I would probably just check for remote_close_ and if that is set, return a null handle and do not send any more data. You could also replicate the connection manager behavior, and if remote is closed before local is closed, close local and "reset."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P.S., this stuff is pretty complicated. If you want to chat in realtime find me in the public Gitter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looked at the code again, check remote_close_ sounds reasonable, done.

new_request->moveIntoList(std::move(new_request), active_streams_);
return async_request;
} else {
return nullptr;
}
}

AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent,
AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout)
: request_(std::move(request)), parent_(parent), callbacks_(callbacks),
stream_id_(parent.config_.random_.random()), router_(parent.config_),
request_info_(Protocol::Http11), route_(parent_.cluster_.name(), timeout) {
AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
std::unique_ptr<AsyncStreamImpl> new_stream{new AsyncStreamImpl(*this, callbacks, timeout)};

router_.setDecoderFilterCallbacks(*this);
request_->headers().insertEnvoyInternalRequest().value(
Headers::get().EnvoyInternalRequestValues.True);
request_->headers().insertForwardedFor().value(parent_.config_.local_info_.address());
router_.decodeHeaders(request_->headers(), !request_->body());
if (!complete_ && request_->body()) {
router_.decodeData(*request_->body(), true);
// The request may get immediately failed. If so, we will return nullptr.
if (!new_stream->complete()) {
new_stream->moveIntoList(std::move(new_stream), active_streams_);
return active_streams_.front().get();
} else {
return nullptr;
}
}

// TODO: Support request trailers.
AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout)
: parent_(parent), stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()),
router_(parent.config_), request_info_(Protocol::Http11),
route_(parent_.cluster_.name(), timeout) {

router_.setDecoderFilterCallbacks(*this);
// TODO: Correctly set protocol in request info when we support access logging.
}

AsyncRequestImpl::~AsyncRequestImpl() { ASSERT(!reset_callback_); }
AsyncStreamImpl::~AsyncStreamImpl() { ASSERT(!reset_callback_); }

void AsyncRequestImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) {
response_.reset(new ResponseMessageImpl(std::move(headers)));
void AsyncStreamImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 this is some odd nomenclature, why are headers received from the transport in a method called 'encode...' ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained this above, this is an internal implementation detail, the response is being "encoded" into the client, which is then dispatching the appropriate callbacks.

#ifndef NDEBUG
log_debug("async http request response headers (end_stream={}):", end_stream);
response_->headers().iterate([](const HeaderEntry& header, void*) -> void {
headers->iterate([](const HeaderEntry& header, void*) -> void {
log_debug(" '{}':'{}'", header.key().c_str(), header.value().c_str());
}, nullptr);
#endif

if (end_stream) {
onComplete();
}
stream_callbacks_.onHeaders(std::move(headers), end_stream);
closeRemote(end_stream);
}

void AsyncRequestImpl::encodeData(Buffer::Instance& data, bool end_stream) {
void AsyncStreamImpl::encodeData(Buffer::Instance& data, bool end_stream) {
log_trace("async http request response data (length={} end_stream={})", data.length(),
end_stream);
if (!response_->body()) {
response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()});
}
response_->body()->move(data);

if (end_stream) {
onComplete();
}
stream_callbacks_.onData(data, end_stream);
closeRemote(end_stream);
}

void AsyncRequestImpl::encodeTrailers(HeaderMapPtr&& trailers) {
response_->trailers(std::move(trailers));
void AsyncStreamImpl::encodeTrailers(HeaderMapPtr&& trailers) {
#ifndef NDEBUG
log_debug("async http request response trailers:");
response_->trailers()->iterate([](const HeaderEntry& header, void*) -> void {
trailers->iterate([](const HeaderEntry& header, void*) -> void {
log_debug(" '{}':'{}'", header.key().c_str(), header.value().c_str());
}, nullptr);
#endif

onComplete();
stream_callbacks_.onTrailers(std::move(trailers));
closeRemote(true);
}

void AsyncRequestImpl::cancel() {
reset_callback_();
cleanup();
void AsyncStreamImpl::sendHeaders(HeaderMap& headers, bool end_stream) {
headers.insertEnvoyInternalRequest().value(Headers::get().EnvoyInternalRequestValues.True);
headers.insertForwardedFor().value(parent_.config_.local_info_.address());
router_.decodeHeaders(headers, end_stream);
closeLocal(end_stream);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line in between


void AsyncRequestImpl::onComplete() {
complete_ = true;
callbacks_.onSuccess(std::move(response_));
cleanup();
void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
router_.decodeData(data, end_stream);
decoding_buffer_ = &data;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not quite right and is kind of complicated. In short:

  1. When you call router_.decodeData(...) it will return a response code which we currently ignore which tells you whether to buffer or not. Buffering is only done in certain cases, for example, if we might retry, or shadow. In this case, the "connection manager" (AsyncClient) is expected to buffer the entire request.
  2. In the streaming case, it doesn't make sense to buffer. Therefore, I would probably check the return code of router_.decodeData(...) and if it asks for buffering, throw an exception of some kind to indicate programmer error.

so basically for the streaming case I think decodingBuffer() should return nullptr, in this normal case, it should return the request body.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry just to further clarify, buffering should continue to work in the AsyncRequestImpl case (non-streaming).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

closeLocal(end_stream);
}

void AsyncStreamImpl::sendTrailers(HeaderMap& trailers) {
router_.decodeTrailers(trailers);
closeLocal(true);
}

void AsyncStreamImpl::closeLocal(bool end_stream) {
ASSERT(!(local_closed_ && end_stream));

local_closed_ |= end_stream;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug assertion if local already closed and end_stream = true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (complete())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: braces around all if statements

cleanup();
}

void AsyncStreamImpl::closeRemote(bool end_stream) {
remote_closed_ |= end_stream;
if (complete())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: braces around all if statements

cleanup();
}

void AsyncStreamImpl::reset() {
reset_callback_();
resetStream();
}

void AsyncRequestImpl::cleanup() {
response_.reset();
void AsyncStreamImpl::cleanup() {
reset_callback_ = nullptr;

// This will destroy us, but only do so if we are actually in a list. This does not happen in
// the immediate failure case.
if (inserted()) {
removeFromList(parent_.active_requests_);
removeFromList(parent_.active_streams_);
}
}

void AsyncRequestImpl::resetStream() {
// In this case we don't have a valid response so we do need to raise a failure.
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
void AsyncStreamImpl::resetStream() {
stream_callbacks_.onReset();
cleanup();
}

void AsyncRequestImpl::failDueToClientDestroy() {
// In this case we are going away because the client is being destroyed. We need to both reset
// the stream as well as raise a failure callback.
reset_callback_();
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
cleanup();
AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent,
AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout)
: AsyncStreamImpl(parent, *this, timeout), request_(std::move(request)), callbacks_(callbacks) {

sendHeaders(request_->headers(), !request_->body());
if (!complete() && request_->body()) {
sendData(*request_->body(), true);
}
// TODO: Support request trailers.
}

void AsyncRequestImpl::onComplete() { callbacks_.onSuccess(std::move(response_)); }

void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool end_stream) {
response_.reset(new ResponseMessageImpl(std::move(headers)));

if (end_stream) {
onComplete();
}
}

void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) {
if (!response_->body()) {
response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()});
}
response_->body()->move(data);

if (end_stream) {
onComplete();
}
}

void AsyncRequestImpl::onTrailers(HeaderMapPtr&& trailers) {
response_->trailers(std::move(trailers));
onComplete();
}

void AsyncRequestImpl::onReset() {
if (!cancelled_) {
// In this case we don't have a valid response so we do need to raise a failure.
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
}
}

void AsyncRequestImpl::cancel() {
cancelled_ = true;
reset();
}

} // Http
Loading