Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend AsyncClient to support streaming requests / responses #353

Merged
merged 22 commits into from
Feb 2, 2017

Conversation

lizan
Copy link
Member

@lizan lizan commented Jan 13, 2017

  • Refactor AsyncRequestImpl to AsyncStreamImpl and add streaming requests / responses
  • Implements AsyncRequestImpl based on AsyncStreamImpl

Existing AsyncClientImplTest covers most of AsyncStreamImpl. Working on more tests for streams.

@qiwzhang PTAL.

Closes #317.

@mattklein123
Copy link
Member

@lizan I'm OOO tomorrow, will take a look next week when all tests are in place. At a very high level approach is fine.

@@ -294,6 +294,14 @@ class MockAsyncClient : public AsyncClient {

MOCK_METHOD3(send_, Request*(MessagePtr& request, Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout));

Stream* start(StreamCallbacks& callbacks,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to override method, you can simply
MOCK_METHOD2(start, Stream*(StreamCallbacks& callbacks,
const Optionalstd::chrono::milliseconds& timeout))
this should work just fine

public:
AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent, AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout);
virtual ~AsyncRequestImpl();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, AsyncRequestImpl is final there should not be a need for virtual destructor

Router::ProdFilter router_;
std::function<void()> reset_callback_;
AccessLog::RequestInfoImpl request_info_;
RouteImpl route_;
bool complete_{};
Buffer::Instance* decoding_buffer_{nullptr};
bool internal_header_inserted_{false};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: bool internal_header_inserted_{};
same for * decoding_buffer_{};

@@ -50,6 +83,39 @@ class AsyncClient {
virtual void cancel() PURE;
};

/**
* An in-flight HTTP stream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: full stop here and down below for comments on methods.

AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
AsyncStreamImpl* async_request = new AsyncStreamImpl(*this, callbacks, timeout);
std::unique_ptr<AsyncStreamImpl> new_request{async_request};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: combine into one line
std::unique_ptr new_request(new AsyncStreamImpl(*this, callbacks, timeout));

headers.insertEnvoyInternalRequest().value(Headers::get().EnvoyInternalRequestValues.True);
headers.insertForwardedFor().value(parent_.config_.local_info_.address());
router_.decodeHeaders(headers, end_stream);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line in between

@qiwzhang
Copy link
Contributor

This is cool. Just wondering if this stream can be shared by multiple threads. 1) can multiple thread write data to this stream. 2) When response data is received, which thread will be called.

@mattklein123
Copy link
Member

@qiwzhang no, as written, AsyncClient is thread local and cannot be safely shared between threads. It would be possible to surround it with enough locking to make it safe, but my general recommendation would be to reframe your problem as a thread local problem. I have not yet found a problem that cannot be solved cleanly with this pattern. Happy to discuss this offline with you.

@qiwzhang
Copy link
Contributor

@mattklein123 Thanks for your quick response. In this case, istio/proxy will need to create N gRPC streams to istio/mixer if it has N threads. I feel it will be better if they can share the same gRPC stream since some transport optimizations of mixer api are done based on the gRPC stream. We will go with one stream per thread for now to make thing easier, will visit this later if performance is an issue.

@mattklein123
Copy link
Member

@lizan can you tag me when the tests are finished and you address roman's comments. then I will take a look.

@lizan
Copy link
Member Author

lizan commented Jan 18, 2017

@mattklein123 sure, I'm making more changes since I discovered some problems in the tests. In the meanwhile it would be nice if you can take a look on the interfaces (i.e. async_client.h).

@mattklein123
Copy link
Member

@lizan sure will do.

/***
* Close the stream.
*/
virtual void close() PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do exactly? The above methods implicitly send end_stream. Is this a reset? If so can we call it reset() and provide a reset reason?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is comparable to Request::cancel() and its implementation are almost resetting stream and cleanup. I can rename it to reset().

The underlying Filter (i.e. router_)'s interface doesn't have reset reason, so I didn't provide that here. Do you think it is needed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry you are right, I just looked at the code again. The fact that there is no reason is due to how we are wrapping the router filter to do all of this. I would just rename to reset() for clarity since IMO that makes more sense in the streaming case.

* the handle can be used to send more messages or close the stream.
*/
virtual Stream* start(StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's questionable whether timeout makes sense on this interface, since the only reason I could see it really ever being used is for long lived streaming API requests like you are implementing it for. But I don't have a strong opinion and could go either way. I guess since it's optional, we can leave it. (Please update the comments though).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion either. I will update comments soon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicitly says if no timeout given, it's infinite.

Copy link

@louiscryan louiscryan Jan 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is behavior if timeout is hit, does it implicitly call Stream.reset() and by implication the Callback.onResetStream gets called & on what thread?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all single threaded. Yes, when timeout is hit the stream will be reset with callbacks called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When timeout, it will call StreamCallbacks.onHeader with :status 504 and onData, the header and data is coming from underlying filters.

See this test:
https://github.com/lyft/envoy/pull/353/files#diff-918cf37446b66e452d1114b88b1e61c4R578

@lizan
Copy link
Member Author

lizan commented Jan 19, 2017

@mattklein123 Thanks for reviewing it, and sorry I was busy on other things today, will update the PR with tests soon.

@lizan
Copy link
Member Author

lizan commented Jan 22, 2017

@mattklein123 This is ready to review, PTAL. Thanks!

@mattklein123
Copy link
Member

@RomanDzhabarov please review. I will take a final pass once you are done.

@RomanDzhabarov
Copy link
Member

I'll take another pass on this today.

virtual ~Stream() {}

/***
* Send headers to the stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicitly says the sendHeaders method cannot be invoked more than once?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Router::ProdFilter router_;
std::function<void()> reset_callback_;
AccessLog::RequestInfoImpl request_info_;
RouteImpl route_;
bool complete_{};
Buffer::Instance* decoding_buffer_{nullptr};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: decoding_buffer_{}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

void onResetStream() override { onResetStream_(); }

MOCK_METHOD2(onHeaders_, void(HeaderMap& headers, bool end_stream));
MOCK_METHOD2(onData_, void(Buffer::Instance& data, bool end_stream));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"void onData(Buffer::Instance& data, bool end_stream) override { onData_(data, end_stream); }" is not required, just define MOCK_METHOD2 properly for onData.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
void onData(Buffer::Instance& data, bool end_stream) override { onData_(data, end_stream); }
void onTrailers(HeaderMapPtr&& trailers) override { onTrailers_(*trailers); }
void onResetStream() override { onResetStream_(); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -317,6 +338,19 @@ class MockAsyncClientRequest : public AsyncClient::Request {

MockAsyncClient* client_;
};

class MockAsyncClientStream : public AsyncClient::Stream {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this used at all? and other added mocks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mock is not used, removed. What are others? MockAsyncClientStreamCallbacks is used in the AsyncClientImplTest.

@@ -38,7 +38,40 @@ class AsyncClient {
};

/**
* An in-flight HTTP request
* Notifies caller of async HTTP stream status.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth having some exposition about the full-duplex nature of streams in the documentation. For instance StreamCallbacks can continue to receive events even if the Stream has had sendXXX(..., end_stream=true) called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

void AsyncStreamImpl::closeLocal(bool end_stream) {
local_closed_ |= end_stream;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug assertion if local already closed and end_stream = true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


void AsyncRequestImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) {
response_.reset(new ResponseMessageImpl(std::move(headers)));
void AsyncStreamImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 this is some odd nomenclature, why are headers received from the transport in a method called 'encode...' ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained this above, this is an internal implementation detail, the response is being "encoded" into the client, which is then dispatching the appropriate callbacks.

@mattklein123
Copy link
Member

@lizan there are a huge amount of comments on this review. I'm fine with the overall approach. When everyone's concerns are satisfied or if there are other questions let me know and I can review in detail.

@lizan
Copy link
Member Author

lizan commented Jan 27, 2017

@fengli79 @louiscryan @RomanDzhabarov PTAL again, thanks!

}
}

AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
std::unique_ptr<AsyncRequestImpl> new_request{
new AsyncRequestImpl(std::move(request), *this, callbacks, timeout)};
AsyncRequestImpl* async_request =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just initialize this directly in unique_ptr and return front().get() like you do below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a raw pointer here is to avoid downcasting from AsyncStreamImpl to AsyncRequestImpl when it returns. AsyncStreamImpl is not a subclass of AsyncClient::Request.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok sorry sounds good

ASSERT(!(local_closed_ && end_stream));

local_closed_ |= end_stream;
if (complete())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: braces around all if statements


void AsyncStreamImpl::closeRemote(bool end_stream) {
remote_closed_ |= end_stream;
if (complete())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: braces around all if statements

void onTrailers(HeaderMapPtr&& trailers) override;
void onReset() override;
void onComplete();
MessagePtr request_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline before this line

void onData(Buffer::Instance& data, bool end_stream) override;
void onTrailers(HeaderMapPtr&& trailers) override;
void onReset() override;
void onComplete();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move to top of section

virtual void cancel() override;

private:
void onHeaders(HeaderMapPtr&& headers, bool end_stream) override;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefix overrides with // AsyncClient::StreamCallbacks

cleanup();
void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
router_.decodeData(data, end_stream);
decoding_buffer_ = &data;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not quite right and is kind of complicated. In short:

  1. When you call router_.decodeData(...) it will return a response code which we currently ignore which tells you whether to buffer or not. Buffering is only done in certain cases, for example, if we might retry, or shadow. In this case, the "connection manager" (AsyncClient) is expected to buffer the entire request.
  2. In the streaming case, it doesn't make sense to buffer. Therefore, I would probably check the return code of router_.decodeData(...) and if it asks for buffering, throw an exception of some kind to indicate programmer error.

so basically for the streaming case I think decodingBuffer() should return nullptr, in this normal case, it should return the request body.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry just to further clarify, buffering should continue to work in the AsyncRequestImpl case (non-streaming).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (!new_request->complete_) {
new_request->moveIntoList(std::move(new_request), active_requests_);
return active_requests_.front().get();
if (!new_request->complete()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading the code correctly, I think you actually want to check for remote_close_ here now, not complete_, or, arguably better, you need to set local_close_ in resetStream().

The reason for this is that the router might respond right away. If it responds before the request is complete, it will reset. In this case, you should return a null handle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry just looked at the code in detail (I haven't looked at this stuff in a while). The router will not actually call resetStream() on its own, it relies on the connection manager to reset if the request when the response is complete. (The main server makes the assumption that even in the streaming case, if the response is finished, the request should be reset if it is not finished).

So, in this case, I would probably just check for remote_close_ and if that is set, return a null handle and do not send any more data. You could also replicate the connection manager behavior, and if remote is closed before local is closed, close local and "reset."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P.S., this stuff is pretty complicated. If you want to chat in realtime find me in the public Gitter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looked at the code again, check remote_close_ sounds reasonable, done.

@lizan
Copy link
Member Author

lizan commented Feb 1, 2017

@mattklein123 Sorry for the delay, PTAL when you have time, thanks!

FilterDataStatus status = router_.decodeData(data, end_stream);

if (status == FilterDataStatus::StopIterationAndBuffer) {
decoding_buffer_ = &data;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still not right. To probably support buffering you actually need to buffer the data. For example, have Buffer::OwnedImpl, and add each data frame to it. I don't think this is worth supporting, as I don't think there is any reasonable case where for a streaming request we will need buffering. I would refactor the code slightly in such a way where in the base streaming case, you don't support buffering (throw an exception if something results in buffering being needed), but in the non-streaming case you do what we did previously, which is to just return the request's body data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, refactored it to throw exception in AsyncStreamImpl, and return body in AsyncRequestImpl.

@@ -164,7 +164,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,
AccessLog::RequestInfo& requestInfo() override { return request_info_; }
const std::string& downstreamAddress() override { return EMPTY_STRING; }
void continueDecoding() override { NOT_IMPLEMENTED; }
const Buffer::Instance* decodingBuffer() override { return decoding_buffer_; }
const Buffer::Instance* decodingBuffer() override { NOT_IMPLEMENTED; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is going to just crash. Can you throw an actual EnvoyException (which will also end up crashing the process but indirectly) and add a test for this case please.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@mattklein123 mattklein123 merged commit 14d3e97 into envoyproxy:master Feb 2, 2017
mattklein123 added a commit that referenced this pull request Feb 2, 2017
mattklein123 added a commit that referenced this pull request Feb 2, 2017
@lizan lizan mentioned this pull request Jun 13, 2017
jpsim pushed a commit that referenced this pull request Nov 28, 2022
Description: this PR fixes a couple bugs to allow e2e use of the Envoy Mobile library in the swift iOS demo.
Risk Level: low - fixing bugs
Testing: ./bazelw build --config=ios --xcode_version=10.3.0.10G8 //:ios_dist and then ./bazelw run --config=ios --xcode_version=10.3.0.10G8 //examples/swift/hello_world:app

Signed-off-by: Jose Nino <jnino@lyft.com>
Signed-off-by: JP Simard <jp@jpsim.com>
jpsim pushed a commit that referenced this pull request Nov 29, 2022
Description: this PR fixes a couple bugs to allow e2e use of the Envoy Mobile library in the swift iOS demo.
Risk Level: low - fixing bugs
Testing: ./bazelw build --config=ios --xcode_version=10.3.0.10G8 //:ios_dist and then ./bazelw run --config=ios --xcode_version=10.3.0.10G8 //examples/swift/hello_world:app

Signed-off-by: Jose Nino <jnino@lyft.com>
Signed-off-by: JP Simard <jp@jpsim.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants