Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend AsyncClient to support streaming requests / responses #353

Merged
merged 22 commits into from
Feb 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 87 additions & 1 deletion include/envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,45 @@ class AsyncClient {
};

/**
* An in-flight HTTP request
* Notifies caller of async HTTP stream status.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth having some exposition about the full-duplex nature of streams in the documentation. For instance StreamCallbacks can continue to receive events even if the Stream has had sendXXX(..., end_stream=true) called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* Note the HTTP stream is full-duplex, even if the local to remote stream has been ended
* by Stream.sendHeaders/sendData with end_stream=true or sendTrailers,
* StreamCallbacks can continue to receive events until the remote to local stream is closed,
* and vice versa.
*/
class StreamCallbacks {
public:
virtual ~StreamCallbacks() {}

/**
* Called when all headers get received on the async HTTP stream.
* @param headers the headers received
* @param end_stream whether the response is header only
*/
virtual void onHeaders(HeaderMapPtr&& headers, bool end_stream) PURE;

/**
* Called when a data frame get received on the async HTTP stream.
* This can be invoked multiple times if the data get streamed.
* @param data the data received
* @param end_stream whether the data is the last data frame
*/
virtual void onData(Buffer::Instance& data, bool end_stream) PURE;

/**
* Called when all trailers get received on the async HTTP stream.
* @param trailers the trailers received.
*/
virtual void onTrailers(HeaderMapPtr&& trailers) PURE;

/**
* Called when the async HTTP stream is reset.
*/
virtual void onReset() PURE;
};

/**
* An in-flight HTTP request.
*/
class Request {
public:
Expand All @@ -50,18 +88,66 @@ class AsyncClient {
virtual void cancel() PURE;
};

/**
* An in-flight HTTP stream.
*/
class Stream {
public:
virtual ~Stream() {}

/***
* Send headers to the stream. This method cannot be invoked more than once and
* need to be called before sendData.
* @param headers supplies the headers to send.
* @param end_stream supplies whether this is a header only request.
*/
virtual void sendHeaders(HeaderMap& headers, bool end_stream) PURE;

/***
* Send data to the stream. This method can be invoked multiple times if it get streamed.
* To end the stream without data, call this method with empty buffer.
* @param data supplies the data to send.
* @param end_stream supplies whether this is the last data.
*/
virtual void sendData(Buffer::Instance& data, bool end_stream) PURE;

/***
* Send trailers. This method cannot be invoked more than once, and implicitly ends the stream.
* @param trailers supplies the trailers to send.
*/
virtual void sendTrailers(HeaderMap& trailers) PURE;

/***
* Reset the stream.
*/
virtual void reset() PURE;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this call trigger StreamCallbacks.onResetStream or does that only occur if the RESET is received from the remote side. If we're thread local then it seems like it should call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does call, but this is an internal implementation detail of the async client / request, so I don't think we need to expose from this interface.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamCallbacks.onReset only occur if the RESET is received from the remote.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now reset() will call StreamCallbacks.onReset

};

virtual ~AsyncClient() {}

/**
* Send an HTTP request asynchronously
* @param request the request to send.
* @param callbacks the callbacks to be notified of request status.
* @param timeout supplies the request timeout
* @return a request handle or nullptr if no request could be created. NOTE: In this case
* onFailure() has already been called inline. The client owns the request and the
* handle should just be used to cancel.
*/
virtual Request* send(MessagePtr&& request, Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) PURE;

/**
* Start an HTTP stream asynchronously.
* @param callbacks the callbacks to be notified of stream status.
* @param timeout supplies the stream timeout, measured since when the frame with end_stream
* flag is sent until when the first frame is received.
* @return a stream handle or nullptr if no stream could be started. NOTE: In this case
* onResetStream() has already been called inline. The client owns the stream and
* the handle can be used to send more messages or close the stream.
*/
virtual Stream* start(StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's questionable whether timeout makes sense on this interface, since the only reason I could see it really ever being used is for long lived streaming API requests like you are implementing it for. But I don't have a strong opinion and could go either way. I guess since it's optional, we can leave it. (Please update the comments though).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion either. I will update comments soon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicitly says if no timeout given, it's infinite.

Copy link

@louiscryan louiscryan Jan 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is behavior if timeout is hit, does it implicitly call Stream.reset() and by implication the Callback.onResetStream gets called & on what thread?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all single threaded. Yes, when timeout is hit the stream will be reset with callbacks called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When timeout, it will call StreamCallbacks.onHeader with :status 504 and onData, the header and data is coming from underlying filters.

See this test:
https://github.com/lyft/envoy/pull/353/files#diff-918cf37446b66e452d1114b88b1e61c4R578

};

typedef std::unique_ptr<AsyncClient> AsyncClientPtr;
Expand Down
196 changes: 129 additions & 67 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
#include "async_client_impl.h"
#include "headers.h"

namespace Http {

const std::vector<std::reference_wrapper<const Router::RateLimitPolicyEntry>>
AsyncRequestImpl::NullRateLimitPolicy::rate_limit_policy_entry_;
const AsyncRequestImpl::NullRateLimitPolicy AsyncRequestImpl::RouteEntryImpl::rate_limit_policy_;
const AsyncRequestImpl::NullRetryPolicy AsyncRequestImpl::RouteEntryImpl::retry_policy_;
const AsyncRequestImpl::NullShadowPolicy AsyncRequestImpl::RouteEntryImpl::shadow_policy_;
const AsyncRequestImpl::NullVirtualHost AsyncRequestImpl::RouteEntryImpl::virtual_host_;
const AsyncRequestImpl::NullRateLimitPolicy AsyncRequestImpl::NullVirtualHost::rate_limit_policy_;
AsyncStreamImpl::NullRateLimitPolicy::rate_limit_policy_entry_;
const AsyncStreamImpl::NullRateLimitPolicy AsyncStreamImpl::RouteEntryImpl::rate_limit_policy_;
const AsyncStreamImpl::NullRetryPolicy AsyncStreamImpl::RouteEntryImpl::retry_policy_;
const AsyncStreamImpl::NullShadowPolicy AsyncStreamImpl::RouteEntryImpl::shadow_policy_;
const AsyncStreamImpl::NullVirtualHost AsyncStreamImpl::RouteEntryImpl::virtual_host_;
const AsyncStreamImpl::NullRateLimitPolicy AsyncStreamImpl::NullVirtualHost::rate_limit_policy_;

AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::Store& stats_store,
Event::Dispatcher& dispatcher,
Expand All @@ -22,120 +21,183 @@ AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::St
dispatcher_(dispatcher) {}

AsyncClientImpl::~AsyncClientImpl() {
while (!active_requests_.empty()) {
active_requests_.front()->failDueToClientDestroy();
while (!active_streams_.empty()) {
active_streams_.front()->reset();
}
}

AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
std::unique_ptr<AsyncRequestImpl> new_request{
new AsyncRequestImpl(std::move(request), *this, callbacks, timeout)};
AsyncRequestImpl* async_request =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just initialize this directly in unique_ptr and return front().get() like you do below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a raw pointer here is to avoid downcasting from AsyncStreamImpl to AsyncRequestImpl when it returns. AsyncStreamImpl is not a subclass of AsyncClient::Request.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok sorry sounds good

new AsyncRequestImpl(std::move(request), *this, callbacks, timeout);
std::unique_ptr<AsyncStreamImpl> new_request{async_request};

// The request may get immediately failed. If so, we will return nullptr.
if (!new_request->complete_) {
new_request->moveIntoList(std::move(new_request), active_requests_);
return active_requests_.front().get();
if (!new_request->remote_closed_) {
new_request->moveIntoList(std::move(new_request), active_streams_);
return async_request;
} else {
return nullptr;
}
}

AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent,
AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout)
: request_(std::move(request)), parent_(parent), callbacks_(callbacks),
stream_id_(parent.config_.random_.random()), router_(parent.config_),
request_info_(Protocol::Http11), route_(parent_.cluster_.name(), timeout) {
AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
std::unique_ptr<AsyncStreamImpl> new_stream{new AsyncStreamImpl(*this, callbacks, timeout)};

router_.setDecoderFilterCallbacks(*this);
request_->headers().insertEnvoyInternalRequest().value(
Headers::get().EnvoyInternalRequestValues.True);
request_->headers().insertForwardedFor().value(parent_.config_.local_info_.address());
router_.decodeHeaders(request_->headers(), !request_->body());
if (!complete_ && request_->body()) {
router_.decodeData(*request_->body(), true);
// The request may get immediately failed. If so, we will return nullptr.
if (!new_stream->remote_closed_) {
new_stream->moveIntoList(std::move(new_stream), active_streams_);
return active_streams_.front().get();
} else {
return nullptr;
}
}

// TODO: Support request trailers.
AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout)
: parent_(parent), stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()),
router_(parent.config_), request_info_(Protocol::Http11),
route_(parent_.cluster_.name(), timeout) {

router_.setDecoderFilterCallbacks(*this);
// TODO: Correctly set protocol in request info when we support access logging.
}

AsyncRequestImpl::~AsyncRequestImpl() { ASSERT(!reset_callback_); }
AsyncStreamImpl::~AsyncStreamImpl() { ASSERT(!reset_callback_); }

void AsyncRequestImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) {
response_.reset(new ResponseMessageImpl(std::move(headers)));
void AsyncStreamImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 this is some odd nomenclature, why are headers received from the transport in a method called 'encode...' ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained this above, this is an internal implementation detail, the response is being "encoded" into the client, which is then dispatching the appropriate callbacks.

#ifndef NDEBUG
log_debug("async http request response headers (end_stream={}):", end_stream);
response_->headers().iterate([](const HeaderEntry& header, void*) -> void {
headers->iterate([](const HeaderEntry& header, void*) -> void {
log_debug(" '{}':'{}'", header.key().c_str(), header.value().c_str());
}, nullptr);
#endif

if (end_stream) {
onComplete();
}
stream_callbacks_.onHeaders(std::move(headers), end_stream);
closeRemote(end_stream);
}

void AsyncRequestImpl::encodeData(Buffer::Instance& data, bool end_stream) {
void AsyncStreamImpl::encodeData(Buffer::Instance& data, bool end_stream) {
log_trace("async http request response data (length={} end_stream={})", data.length(),
end_stream);
if (!response_->body()) {
response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()});
}
response_->body()->move(data);

if (end_stream) {
onComplete();
}
stream_callbacks_.onData(data, end_stream);
closeRemote(end_stream);
}

void AsyncRequestImpl::encodeTrailers(HeaderMapPtr&& trailers) {
response_->trailers(std::move(trailers));
void AsyncStreamImpl::encodeTrailers(HeaderMapPtr&& trailers) {
#ifndef NDEBUG
log_debug("async http request response trailers:");
response_->trailers()->iterate([](const HeaderEntry& header, void*) -> void {
trailers->iterate([](const HeaderEntry& header, void*) -> void {
log_debug(" '{}':'{}'", header.key().c_str(), header.value().c_str());
}, nullptr);
#endif

onComplete();
stream_callbacks_.onTrailers(std::move(trailers));
closeRemote(true);
}

void AsyncRequestImpl::cancel() {
reset_callback_();
cleanup();
void AsyncStreamImpl::sendHeaders(HeaderMap& headers, bool end_stream) {
headers.insertEnvoyInternalRequest().value(Headers::get().EnvoyInternalRequestValues.True);
headers.insertForwardedFor().value(parent_.config_.local_info_.address());
router_.decodeHeaders(headers, end_stream);
closeLocal(end_stream);
}

void AsyncRequestImpl::onComplete() {
complete_ = true;
callbacks_.onSuccess(std::move(response_));
cleanup();
void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
router_.decodeData(data, end_stream);
closeLocal(end_stream);
}

void AsyncStreamImpl::sendTrailers(HeaderMap& trailers) {
router_.decodeTrailers(trailers);
closeLocal(true);
}

void AsyncStreamImpl::closeLocal(bool end_stream) {
ASSERT(!(local_closed_ && end_stream));

local_closed_ |= end_stream;
if (complete()) {
cleanup();
}
}

void AsyncStreamImpl::closeRemote(bool end_stream) {
remote_closed_ |= end_stream;
if (complete()) {
cleanup();
}
}

void AsyncRequestImpl::cleanup() {
response_.reset();
void AsyncStreamImpl::reset() {
reset_callback_();
resetStream();
}

void AsyncStreamImpl::cleanup() {
reset_callback_ = nullptr;

// This will destroy us, but only do so if we are actually in a list. This does not happen in
// the immediate failure case.
if (inserted()) {
removeFromList(parent_.active_requests_);
removeFromList(parent_.active_streams_);
}
}

void AsyncRequestImpl::resetStream() {
// In this case we don't have a valid response so we do need to raise a failure.
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
void AsyncStreamImpl::resetStream() {
stream_callbacks_.onReset();
cleanup();
}

void AsyncRequestImpl::failDueToClientDestroy() {
// In this case we are going away because the client is being destroyed. We need to both reset
// the stream as well as raise a failure callback.
reset_callback_();
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
cleanup();
AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent,
AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout)
: AsyncStreamImpl(parent, *this, timeout), request_(std::move(request)), callbacks_(callbacks) {

sendHeaders(request_->headers(), !request_->body());
if (!complete() && request_->body()) {
sendData(*request_->body(), true);
}
// TODO: Support request trailers.
}

void AsyncRequestImpl::onComplete() { callbacks_.onSuccess(std::move(response_)); }

void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool end_stream) {
response_.reset(new ResponseMessageImpl(std::move(headers)));

if (end_stream) {
onComplete();
}
}

void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) {
if (!response_->body()) {
response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()});
}
response_->body()->move(data);

if (end_stream) {
onComplete();
}
}

void AsyncRequestImpl::onTrailers(HeaderMapPtr&& trailers) {
response_->trailers(std::move(trailers));
onComplete();
}

void AsyncRequestImpl::onReset() {
if (!cancelled_) {
// In this case we don't have a valid response so we do need to raise a failure.
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
}
}

void AsyncRequestImpl::cancel() {
cancelled_ = true;
reset();
}

} // Http
Loading