Skip to content

Commit

Permalink
[#4789] Remove single RPC timeout
Browse files Browse the repository at this point in the history
Summary:
We are using single RPC to identify dead node and retry request on a new one.
But after we added connection level detection of dead connection, we could avoid using such timeout for single RPC.
Because in case of dead node it will be identified and all RPCs will be aborted.
So we will be able to retry higher level operation.

Test Plan: Jenkins

Reviewers: timur

Reviewed By: timur

Subscribers: bogdan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D8686
  • Loading branch information
spolitov committed Jun 30, 2020
1 parent ab0bc32 commit 4142c34
Show file tree
Hide file tree
Showing 20 changed files with 41 additions and 107 deletions.
4 changes: 2 additions & 2 deletions ent/src/yb/cdc/cdc_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class CDCWriteRpc : public rpc::Rpc, public client::internal::TabletRpc {
private:
void SendRpcToTserver(int attempt_num) override {
InvokeAsync(invoker_.proxy().get(),
PrepareController(invoker_.client().default_rpc_timeout()),
PrepareController(),
std::bind(&CDCWriteRpc::Finished, this, Status::OK()));
}

Expand Down Expand Up @@ -219,7 +219,7 @@ class CDCReadRpc : public rpc::Rpc, public client::internal::TabletRpc {
&invoker_.client().proxy_cache(), invoker_.ProxyEndpoint());

InvokeAsync(cdc_proxy_.get(),
PrepareController(invoker_.client().default_rpc_timeout()),
PrepareController(),
std::bind(&CDCReadRpc::Finished, this, Status::OK()));
}

Expand Down
6 changes: 1 addition & 5 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/BasePgSQLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,7 @@ protected static int getPerfMaxRuntime(int releaseRuntime,
}

protected Map<String, String> getMasterAndTServerFlags() {
Map<String, String> flagMap = new TreeMap<>();
flagMap.put(
"retryable_rpc_single_call_timeout_ms",
String.valueOf(getRetryableRpcSingleCallTimeoutMs()));
return flagMap;
return new TreeMap<>();
}

protected Integer getYsqlPrefetchLimit() {
Expand Down
22 changes: 10 additions & 12 deletions src/yb/client/async_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ AsyncRpcMetrics::AsyncRpcMetrics(const scoped_refptr<yb::MetricEntity>& entity)
time_to_send(METRIC_handler_latency_yb_client_time_to_send.Instantiate(entity)) {
}

AsyncRpc::AsyncRpc(AsyncRpcData* data, YBConsistencyLevel yb_consistency_level, MonoDelta timeout)
AsyncRpc::AsyncRpc(AsyncRpcData* data, YBConsistencyLevel yb_consistency_level)
: Rpc(data->batcher->deadline(), data->batcher->messenger(), &data->batcher->proxy_cache()),
batcher_(data->batcher),
trace_(new Trace),
Expand All @@ -119,8 +119,7 @@ AsyncRpc::AsyncRpc(AsyncRpcData* data, YBConsistencyLevel yb_consistency_level,
trace_.get()),
ops_(std::move(data->ops)),
start_(MonoTime::Now()),
async_rpc_metrics_(data->batcher->async_rpc_metrics()),
timeout_(timeout) {
async_rpc_metrics_(data->batcher->async_rpc_metrics()) {

mutable_retrier()->mutable_controller()->set_allow_local_calls_in_curr_thread(
data->allow_local_calls_in_curr_thread);
Expand Down Expand Up @@ -271,9 +270,8 @@ void AsyncRpc::SendRpcToTserver(int attempt_num) {

template <class Req, class Resp>
AsyncRpcBase<Req, Resp>::AsyncRpcBase(AsyncRpcData* data,
YBConsistencyLevel consistency_level,
MonoDelta timeout)
: AsyncRpc(data, consistency_level, timeout) {
YBConsistencyLevel consistency_level)
: AsyncRpc(data, consistency_level) {

req_.set_tablet_id(tablet_invoker_.tablet()->tablet_id());
req_.set_include_trace(IsTracingEnabled());
Expand Down Expand Up @@ -350,8 +348,8 @@ void AsyncRpcBase<Req, Resp>::SendRpcToTserver(int attempt_num) {
AsyncRpc::SendRpcToTserver(attempt_num);
}

WriteRpc::WriteRpc(AsyncRpcData* data, MonoDelta timeout)
: AsyncRpcBase(data, YBConsistencyLevel::STRONG, timeout) {
WriteRpc::WriteRpc(AsyncRpcData* data)
: AsyncRpcBase(data, YBConsistencyLevel::STRONG) {

TRACE_TO(trace_, "WriteRpc initiated to $0", data->tablet->tablet_id());

Expand Down Expand Up @@ -444,7 +442,7 @@ void WriteRpc::CallRemoteMethod() {
ADOPT_TRACE(trace.get());

tablet_invoker_.proxy()->WriteAsync(
req_, &resp_, PrepareController(timeout_),
req_, &resp_, PrepareController(),
std::bind(&WriteRpc::Finished, this, Status::OK()));
TRACE_TO(trace, "RpcDispatched Asynchronously");
}
Expand Down Expand Up @@ -581,8 +579,8 @@ void WriteRpc::ProcessResponseFromTserver(const Status& status) {
SwapRequestsAndResponses(false);
}

ReadRpc::ReadRpc(AsyncRpcData* data, YBConsistencyLevel yb_consistency_level, MonoDelta timeout)
: AsyncRpcBase(data, yb_consistency_level, timeout) {
ReadRpc::ReadRpc(AsyncRpcData* data, YBConsistencyLevel yb_consistency_level)
: AsyncRpcBase(data, yb_consistency_level) {

TRACE_TO(trace_, "ReadRpc initiated to $0", data->tablet->tablet_id());
req_.set_consistency_level(yb_consistency_level);
Expand Down Expand Up @@ -658,7 +656,7 @@ void ReadRpc::CallRemoteMethod() {
ADOPT_TRACE(trace.get());

tablet_invoker_.proxy()->ReadAsync(
req_, &resp_, PrepareController(timeout_),
req_, &resp_, PrepareController(),
std::bind(&ReadRpc::Finished, this, Status::OK()));
TRACE_TO(trace, "RpcDispatched Asynchronously");
}
Expand Down
13 changes: 4 additions & 9 deletions src/yb/client/async_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ struct FlushExtraResult {
// This class deletes itself after Rpc returns and is processed.
class AsyncRpc : public rpc::Rpc, public TabletRpc {
public:
explicit AsyncRpc(AsyncRpcData* data, YBConsistencyLevel consistency_level, MonoDelta timeout);
explicit AsyncRpc(AsyncRpcData* data, YBConsistencyLevel consistency_level);

virtual ~AsyncRpc();

Expand Down Expand Up @@ -119,16 +119,12 @@ class AsyncRpc : public rpc::Rpc, public TabletRpc {
MonoTime start_;
std::shared_ptr<AsyncRpcMetrics> async_rpc_metrics_;
rpc::RpcCommandPtr retained_self_;

// Amount of time to wait for a given op, from start to finish.
MonoDelta timeout_;
};

template <class Req, class Resp>
class AsyncRpcBase : public AsyncRpc {
public:
explicit AsyncRpcBase(AsyncRpcData* data, YBConsistencyLevel consistency_level,
MonoDelta timeout);
explicit AsyncRpcBase(AsyncRpcData* data, YBConsistencyLevel consistency_level);

const Resp& resp() const { return resp_; }
Resp& resp() { return resp_; }
Expand All @@ -155,7 +151,7 @@ class AsyncRpcBase : public AsyncRpc {

class WriteRpc : public AsyncRpcBase<tserver::WriteRequestPB, tserver::WriteResponsePB> {
public:
explicit WriteRpc(AsyncRpcData* data, MonoDelta timeout = MonoDelta::kZero);
explicit WriteRpc(AsyncRpcData* data);

virtual ~WriteRpc();

Expand All @@ -168,8 +164,7 @@ class WriteRpc : public AsyncRpcBase<tserver::WriteRequestPB, tserver::WriteResp
class ReadRpc : public AsyncRpcBase<tserver::ReadRequestPB, tserver::ReadResponsePB> {
public:
explicit ReadRpc(
AsyncRpcData* data, YBConsistencyLevel yb_consistency_level = YBConsistencyLevel::STRONG,
MonoDelta timeout = MonoDelta::kZero);
AsyncRpcData* data, YBConsistencyLevel yb_consistency_level = YBConsistencyLevel::STRONG);

virtual ~ReadRpc();

Expand Down
14 changes: 3 additions & 11 deletions src/yb/client/batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,6 @@ void Batcher::SetTimeout(MonoDelta timeout) {
timeout_ = timeout;
}

void Batcher::SetSingleRpcTimeout(MonoDelta timeout) {
CHECK_GE(timeout, MonoDelta::kZero);
std::lock_guard<decltype(mutex_)> lock(mutex_);
single_rpc_timeout_ = timeout;
}

bool Batcher::HasPendingOperations() const {
std::lock_guard<decltype(mutex_)> lock(mutex_);
return !ops_.empty();
Expand Down Expand Up @@ -636,13 +630,11 @@ std::shared_ptr<AsyncRpc> Batcher::CreateRpc(
hybrid_time_for_write_, std::move(ops)};
switch (op_group) {
case OpGroup::kWrite:
return std::make_shared<WriteRpc>(&data, single_rpc_timeout_);
return std::make_shared<WriteRpc>(&data);
case OpGroup::kLeaderRead:
return std::make_shared<ReadRpc>(&data, YBConsistencyLevel::STRONG, single_rpc_timeout_);
return std::make_shared<ReadRpc>(&data, YBConsistencyLevel::STRONG);
case OpGroup::kConsistentPrefixRead:
return std::make_shared<ReadRpc>(&data,
YBConsistencyLevel::CONSISTENT_PREFIX,
single_rpc_timeout_);
return std::make_shared<ReadRpc>(&data, YBConsistencyLevel::CONSISTENT_PREFIX);
}
FATAL_INVALID_ENUM_VALUE(OpGroup, op_group);
}
Expand Down
5 changes: 0 additions & 5 deletions src/yb/client/batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
// may time out before even sending an op). TODO: implement that
void SetTimeout(MonoDelta timeout);

void SetSingleRpcTimeout(MonoDelta timeout);

// Add a new operation to the batch. Requires that the batch has not yet been flushed.
// TODO: in other flush modes, this may not be the case -- need to
// update this when they're implemented.
Expand Down Expand Up @@ -299,9 +297,6 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
// Set by SetTimeout.
MonoDelta timeout_;

// Timeout for the rpc.
MonoDelta single_rpc_timeout_;

// After flushing, the absolute deadline for all in-flight ops.
CoarseTimePoint deadline_;

Expand Down
2 changes: 1 addition & 1 deletion src/yb/client/meta_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ void LookupRpc::SendRpc() {
Finished(STATUS(TimedOut, "timed out after deadline expired"));
return;
}
mutable_retrier()->PrepareController(MonoDelta());
mutable_retrier()->PrepareController();

DoSendRpc();
}
Expand Down
2 changes: 0 additions & 2 deletions src/yb/client/ql-tablet-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ DECLARE_int32(leader_lease_duration_ms);
DECLARE_int64(db_write_buffer_size);
DECLARE_string(time_source);
DECLARE_int32(TEST_delay_execute_async_ms);
DECLARE_int64(retryable_rpc_single_call_timeout_ms);
DECLARE_int32(retryable_request_timeout_secs);
DECLARE_bool(enable_lease_revocation);
DECLARE_bool(rocksdb_disable_compactions);
Expand Down Expand Up @@ -757,7 +756,6 @@ TEST_F(QLTabletTest, LeaderChange) {
req->mutable_if_expr()->mutable_condition(), kValueColumn, QL_OP_EQUAL, kValue1);
req->mutable_column_refs()->add_ids(table.ColumnId(kValueColumn));
ASSERT_OK(session->Apply(write_op));
FLAGS_retryable_rpc_single_call_timeout_ms = 60000;

SetAtomicFlag(30000, &FLAGS_TEST_delay_execute_async_ms);
auto flush_future = session->FlushFuture();
Expand Down
7 changes: 3 additions & 4 deletions src/yb/client/ql-transaction-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ DECLARE_bool(flush_rocksdb_on_shutdown);
DECLARE_bool(transaction_disable_proactive_cleanup_in_tests);
DECLARE_uint64(aborted_intent_cleanup_ms);
DECLARE_int32(remote_bootstrap_max_chunk_size);
DECLARE_int32(TEST_master_inject_latency_on_transactional_tablet_lookups_ms);
DECLARE_bool(TEST_master_fail_transactional_tablet_lookups);
DECLARE_int64(transaction_rpc_timeout_ms);
DECLARE_bool(rocksdb_disable_compactions);
DECLARE_int32(TEST_delay_init_tablet_peer_ms);
Expand Down Expand Up @@ -113,13 +113,12 @@ TEST_F(QLTransactionTest, Simple) {
}

TEST_F(QLTransactionTest, LookupTabletFailure) {
FLAGS_TEST_master_inject_latency_on_transactional_tablet_lookups_ms =
TransactionRpcTimeout().ToMilliseconds() + 500;
FLAGS_TEST_master_fail_transactional_tablet_lookups = true;

auto txn = CreateTransaction();
auto result = WriteRow(CreateSession(txn), 0 /* key */, 1 /* value */);

ASSERT_TRUE(!result.ok() && result.status().IsTimedOut()) << "Result: " << result;
ASSERT_TRUE(!result.ok() && result.status().IsTimedOut()) << "Result: " << AsString(result);
}

TEST_F(QLTransactionTest, ReadWithTimeInFuture) {
Expand Down
11 changes: 0 additions & 11 deletions src/yb/client/session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,6 @@ void YBSession::SetTimeout(MonoDelta timeout) {
}
}

void YBSession::SetSingleRpcTimeout(MonoDelta timeout) {
CHECK_GE(timeout, MonoDelta::kZero);
single_rpc_timeout_ = timeout;
if (batcher_) {
batcher_->SetSingleRpcTimeout(timeout);
}
}

Status YBSession::Flush() {
Synchronizer s;
FlushAsync(s.AsStatusFunctor());
Expand Down Expand Up @@ -209,9 +201,6 @@ internal::Batcher& YBSession::Batcher() {
if (timeout_.Initialized()) {
batcher_->SetTimeout(timeout_);
}
if (single_rpc_timeout_.Initialized()) {
batcher_->SetSingleRpcTimeout(single_rpc_timeout_);
}
batcher_->SetRejectionScoreSource(rejection_score_source_);
if (hybrid_time_for_write_.is_valid()) {
batcher_->SetHybridTimeForWrite(hybrid_time_for_write_);
Expand Down
5 changes: 0 additions & 5 deletions src/yb/client/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ class YBSession : public std::enable_shared_from_this<YBSession> {
// Set the timeout for writes made in this session.
void SetTimeout(MonoDelta timeout);

void SetSingleRpcTimeout(MonoDelta timeout);

MonoDelta timeout() const {
return timeout_;
}
Expand Down Expand Up @@ -287,9 +285,6 @@ class YBSession : public std::enable_shared_from_this<YBSession> {
// Timeout for the next batch.
MonoDelta timeout_;

// Timeout for the rpcs. Initialized only on SetRpcTimeout.
MonoDelta single_rpc_timeout_;

// HybridTime for Write. Used for Index Backfill.
HybridTime hybrid_time_for_write_;

Expand Down
7 changes: 1 addition & 6 deletions src/yb/docdb/pgsql_operation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

DECLARE_bool(trace_docdb_calls);
DECLARE_bool(ysql_disable_index_backfill);
DECLARE_int64(retryable_rpc_single_call_timeout_ms);

DEFINE_double(ysql_scan_timeout_multiplier, 0.5,
"YSQL read scan timeout multipler of retryable_rpc_single_call_timeout_ms.");
Expand Down Expand Up @@ -504,9 +503,6 @@ Result<size_t> PgsqlReadOperation::ExecuteScalar(const common::YQLStorageIf& ql_

// Set scan start time.
bool scan_time_exceeded = false;
const int64 scan_time_limit =
FLAGS_retryable_rpc_single_call_timeout_ms * FLAGS_ysql_scan_timeout_multiplier;
const MonoTime start_time = MonoTime::Now();

// Fetching data.
int match_count = 0;
Expand Down Expand Up @@ -552,8 +548,7 @@ Result<size_t> PgsqlReadOperation::ExecuteScalar(const common::YQLStorageIf& ql_

// Check every row_count_limit matches whether we've exceeded our scan time.
if (match_count % row_count_limit == 0) {
const MonoDelta elapsed_time = MonoTime::Now().GetDeltaSince(start_time);
scan_time_exceeded = elapsed_time.ToMilliseconds() > scan_time_limit;
scan_time_exceeded = CoarseMonoClock::now() >= deadline;
}
}

Expand Down
19 changes: 11 additions & 8 deletions src/yb/master/master_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ DEFINE_int32(master_inject_latency_on_tablet_lookups_ms, 0,
TAG_FLAG(master_inject_latency_on_tablet_lookups_ms, unsafe);
TAG_FLAG(master_inject_latency_on_tablet_lookups_ms, hidden);

DEFINE_test_flag(int32, master_inject_latency_on_transactional_tablet_lookups_ms, 0,
"Number of milliseconds that the master will sleep before responding to "
"requests for transactional tablet locations.");
DEFINE_test_flag(bool, master_fail_transactional_tablet_lookups, false,
"Whether to fail all lookup requests to transactional table.");

DEFINE_double(master_slow_get_registration_probability, 0,
"Probability of injecting delay in GetMasterRegistration.");
Expand Down Expand Up @@ -221,7 +220,7 @@ void MasterServiceImpl::GetTabletLocations(const GetTabletLocationsRequestPB* re
if (PREDICT_FALSE(FLAGS_master_inject_latency_on_tablet_lookups_ms > 0)) {
SleepFor(MonoDelta::FromMilliseconds(FLAGS_master_inject_latency_on_tablet_lookups_ms));
}
if (PREDICT_FALSE(FLAGS_TEST_master_inject_latency_on_transactional_tablet_lookups_ms > 0)) {
if (PREDICT_FALSE(FLAGS_TEST_master_fail_transactional_tablet_lookups)) {
std::vector<scoped_refptr<TableInfo>> tables;
server_->catalog_manager()->GetAllTables(&tables);
const auto& tablet_id = req->tablet_ids(0);
Expand All @@ -230,10 +229,14 @@ void MasterServiceImpl::GetTabletLocations(const GetTabletLocationsRequestPB* re
table->GetAllTablets(&tablets);
for (const auto& tablet : tablets) {
if (tablet->tablet_id() == tablet_id) {
auto lock = table->LockForRead();
if (table->metadata().state().table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE) {
SleepFor(MonoDelta::FromMilliseconds(
FLAGS_TEST_master_inject_latency_on_transactional_tablet_lookups_ms));
TableType table_type;
{
auto lock = table->LockForRead();
table_type = table->metadata().state().table_type();
}
if (table_type == TableType::TRANSACTION_STATUS_TABLE_TYPE) {
rpc.RespondFailure(STATUS(InvalidCommand, "TEST: Artificial failure"));
return;
}
break;
}
Expand Down
10 changes: 2 additions & 8 deletions src/yb/rpc/rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ DEFINE_int64(rpcs_shutdown_timeout_ms, 15000 * yb::kTimeMultiplier,
"Timeout for a batch of multiple RPCs invoked in parallel to shutdown.");
DEFINE_int64(rpcs_shutdown_extra_delay_ms, 5000 * yb::kTimeMultiplier,
"Extra allowed time for a single RPC command to complete after its deadline.");
DEFINE_int64(retryable_rpc_single_call_timeout_ms, 2500 * yb::kTimeMultiplier,
"Timeout of single RPC call in retryable RPC command.");
DEFINE_int32(min_backoff_ms_exponent, 7,
"Min amount of backoff delay if the server responds with TOO BUSY (default: 128ms). "
"Set this to some amount, during which the server might have recovered.");
Expand Down Expand Up @@ -268,12 +266,8 @@ std::string RpcRetrier::ToString() const {
deadline_);
}

RpcController* RpcRetrier::PrepareController(MonoDelta single_call_timeout) {
if (!single_call_timeout) {
single_call_timeout = MonoDelta::FromMilliseconds(FLAGS_retryable_rpc_single_call_timeout_ms);
}
controller_.set_timeout(std::min<MonoDelta>(
deadline_ - CoarseMonoClock::now(), single_call_timeout));
RpcController* RpcRetrier::PrepareController() {
controller_.set_timeout(deadline_ - CoarseMonoClock::now());
return &controller_;
}

Expand Down
Loading

0 comments on commit 4142c34

Please sign in to comment.