Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async client: fix immediate response when request has body #411

Merged
merged 1 commit into from
Feb 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,16 @@ AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::C
new_request->moveIntoList(std::move(new_request), active_streams_);
return async_request;
} else {
new_request->cleanup();
return nullptr;
}
}

AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
std::unique_ptr<AsyncStreamImpl> new_stream{new AsyncStreamImpl(*this, callbacks, timeout)};

// The request may get immediately failed. If so, we will return nullptr.
if (!new_stream->remote_closed_) {
new_stream->moveIntoList(std::move(new_stream), active_streams_);
return active_streams_.front().get();
} else {
return nullptr;
}
new_stream->moveIntoList(std::move(new_stream), active_streams_);
return active_streams_.front().get();
}

AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
Expand Down Expand Up @@ -156,7 +151,7 @@ AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent
: AsyncStreamImpl(parent, *this, timeout), request_(std::move(request)), callbacks_(callbacks) {

sendHeaders(request_->headers(), !request_->body());
if (!complete() && request_->body()) {
if (!remoteClosed() && request_->body()) {
sendData(*request_->body(), true);
}
// TODO: Support request trailers.
Expand Down
5 changes: 3 additions & 2 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class AsyncStreamImpl : public AsyncClient::Stream,
void reset() override;

protected:
bool complete() { return local_closed_ && remote_closed_; }
bool remoteClosed() { return remote_closed_; }

AsyncClientImpl& parent_;

private:
Expand Down Expand Up @@ -148,9 +149,9 @@ class AsyncStreamImpl : public AsyncClient::Stream,
};

void cleanup();

void closeLocal(bool end_stream);
void closeRemote(bool end_stream);
bool complete() { return local_closed_ && remote_closed_; }

// Http::StreamDecoderFilterCallbacks
void addResetStreamCallback(std::function<void()> callback) override {
Expand Down
16 changes: 16 additions & 0 deletions test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,22 @@ TEST_F(AsyncClientImplTest, PoolFailure) {
EXPECT_EQ(1UL, cm_.cluster_.info_->stats_store_.counter("upstream_rq_503").value());
}

TEST_F(AsyncClientImplTest, PoolFailureWithBody) {
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](StreamDecoder&,
ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* {
callbacks.onPoolFailure(ConnectionPool::PoolFailureReason::Overflow, nullptr);
return nullptr;
}));

expectSuccess(503);
message_->body(Buffer::InstancePtr{new Buffer::OwnedImpl("hello")});
EXPECT_EQ(nullptr,
client_.send(std::move(message_), callbacks_, Optional<std::chrono::milliseconds>()));

EXPECT_EQ(1UL, cm_.cluster_.info_->stats_store_.counter("upstream_rq_503").value());
}

TEST_F(AsyncClientImplTest, StreamTimeout) {
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](StreamDecoder&, ConnectionPool::Callbacks& callbacks)
Expand Down