Skip to content

Commit

Permalink
[#10660] [YBASE] Refactor Traces: Define trace in RpcCommand so that …
Browse files Browse the repository at this point in the history
…it can be used across all defined Rpcs

Summary:
1) Define trace in RpcCommand so that it can be used across all defined RPCs
   - remove trace_ from transaction_rpc.cc because we already have it in the superclass
2) Add begin/end to demarcate the traces received from the server.
3) Plumb through the tracing for write ops. The initial traces in the handler threads were being
   missed earlier.
4) Adding traces to SharedLockManager. TO BE FIXED. Does not decode keys properly.
5) Add more tracing to the conflict resolution part
6) Add a tracing depth flag for controlling how many levels of children traces are to be printed.
7) Populate trace_buffer even upon write failures. Changes the write failure message to

23954 2021-11-12 22:57:42.347 UTC [12299] STATEMENT:  UPDATE t SET value = 'NEW' WHERE key = 0
23955 ../../src/yb/yql/pgwrapper/pg_mini-test.cc:214: Failure
23956 Failed
23957 Bad status: Network error (yb/yql/pgwrapper/libpq_utils.cc:226): Execute of 'UPDATE t SET value = 'NEW' WHERE key = 0' failed: 7, message: ERROR:  All transparent retries exhausted. Operation failed. Try again.: 84f3da10-d961-4251-97ef-96f197810acf Conflicts with higher priority transaction: 7420b446-874c-4ab      c-8d77-c51b33f93008
23958  (pgsql error 40001)
23959 W1112 22:57:43.135255 11959 outbound_call.cc:380] RPC callback for RPC call yb.tserver.TabletServerService.GetTransactionStatus -> { remote: 127.0.0.5:19056 idx: 2 protocol: 0x00007f0a2ba62ea0 -> tcpc } , state=FINISHED_SUCCESS. took 0.135s

Test Plan:
jenkins.
run a test
rb -- --cxx-test pg_mini-test --gtest_filter PgMiniTest.FollowerReadsWithWrite --test_args --collect_end_to_end_traces=trust_args --ybclient_print_trace_every_n=1 --test_args --tracing_level=1
and look for the traces.

Example output:

        I1113 06:14:35.862326 16976 async_rpc.cc:160] Write(tablet: 80956799d32543be8c74bbfd97cae8e9, num_ops: 1, num_attempts: 1, txn: ca97464e-1ee3-4dc4-a4a1-aaf746a4efd8, subtxn: [none]) took 8000us. Trace:
        .. 1113 06:14:35.854683 (+     0us) async_rpc.cc:487] WriteRpc initiated
        .. 1113 06:14:35.854683 (+     0us) async_rpc.cc:488] Tablet 80956799d32543be8c74bbfd97cae8e9 table yugabyte.t
        .. 1113 06:14:35.854683 (+     0us) async_rpc.cc:169] SendRpc() called.
        .. 1113 06:14:35.854683 (+     0us) tablet_rpc.cc:101] SelectTabletServer()
        .. 1113 06:14:35.854683 (+     0us) tablet_rpc.cc:153] Selected { uuid: 53a481d52a984d1a8b7e96d249aff21f private: [host: "127.0.0.6" port: 26739] public: [host: "127.0.0.7.ip.yugabyte" port: 26739] cloud_info: placement_cloud: "cloud2" placement_region: "rack3" placement_zone: "zone"
        .. 1113 06:14:35.854683 (+     0us) async_rpc.cc:547] SendRpcToTserver
        .. 1113 06:14:35.855683 (+  1000us) async_rpc.cc:552] RpcDispatched Asynchronously
        .. 1113 06:14:35.861684 (+  6001us) tablet_rpc.cc:308] Done(OK)
        .. 1113 06:14:35.861684 (+     0us) async_rpc.cc:413] ProcessResponseFromTserver(Operation failed. Try again.: ca97464e-1ee3-4dc4-a4a1-aaf746a4efd8 Conflicts with higher priority transaction: 52ec72c4-c941-4f40-96ad-3478a3ca0528 (transaction error 3))
        .. 1113 06:14:35.861684 (+     0us) async_rpc.cc:415] Received from server:
         BEGIN
        .. 1113 06:14:35.855820 (+     0us) inbound_call.cc:90] Created InboundCall
        .. 1113 06:14:35.855820 (+     0us) service_pool.cc:180] Inserting onto call queue
        .. 1113 06:14:35.855820 (+     0us) service_pool.cc:261] Handling call Write
        .. 1113 06:14:35.855820 (+     0us) tablet_service.cc:1591] Start Write
        .. 1113 06:14:35.855820 (+     0us) tablet.cc:2019] AcquireLocksAndPerformDocOperations
        .. 1113 06:14:35.855820 (+     0us) tablet.cc:3776] Acquiring write permit
        .. 1113 06:14:35.856820 (+  1000us) operation_counter.cc:187] ScopedRWOperation Tablet schema
        .. 1113 06:14:35.856820 (+     0us) tablet.cc:3777] Acquiring write permit done
        .. 1113 06:14:35.856820 (+     0us) operation_counter.cc:199] Reset null
        .. 1113 06:14:35.856820 (+     0us) operation_counter.cc:199] Reset null
        .. 1113 06:14:35.856820 (+     0us) operation_counter.cc:187] ScopedRWOperation RocksDB non-abortable read/write operations
        .. 1113 06:14:35.856820 (+     0us) operation_counter.cc:199] Reset null
        .. 1113 06:14:35.856820 (+     0us) shared_lock_manager.cc:255] Locking a batch of 3 keys
        .. 1113 06:14:35.856820 (+     0us) shared_lock_manager.cc:295] Acquired a lock batch of 3 keys
        .. 1113 06:14:35.856820 (+     0us) conflict_resolution.cc:966] ResolveTransactionConflicts
        .. 1113 06:14:35.857821 (+  1001us) conflict_resolution.cc:401] FetchingTransactionStatus for 52ec72c4-c941-4f40-96ad-3478a3ca0528
        .. 1113 06:14:35.857821 (+     0us) running_transaction.cc:210] SendStatusRequest : 52ec72c4-c941-4f40-96ad-3478a3ca0528
        .. 1113 06:14:35.857821 (+     0us) conflict_resolution.cc:973] resolver->Resolve done
        .. 1113 06:14:35.857821 (+     0us) operation_counter.cc:199] Reset null
        .. 1113 06:14:35.857821 (+     0us) operation_counter.cc:199] Reset null
        .. .. Related trace:
        .. .. 1113 06:14:35.857821 (+     0us) transaction_rpc.cc:104] GetTransactionStatus
        .. .. 1113 06:14:35.857821 (+     0us) transaction_rpc.cc:105] tablet_id: "8d36c698a53745d99d0f4a15e6f52206" transaction_id: "R\354r\304\311AO@\226\2554x\243\312\005(" propagated_hybrid_time: 6704267574709948416
        .. .. 1113 06:14:35.857821 (+     0us) transaction_rpc.cc:56] SendRpc
        .. .. 1113 06:14:35.857821 (+     0us) tablet_rpc.cc:101] SelectTabletServer()
        .. .. 1113 06:14:35.857821 (+     0us) tablet_rpc.cc:153] Selected { uuid: 320973da990b4ce7ba3915b1075f0848 private: [host: "127.0.0.2" port: 22188] public: [host: "127.0.0.3.ip.yugabyte" port: 22188] cloud_info: placement_cloud: "cloud1" placement_region: "rack1" placement_zone: "zone"
        .. .. 1113 06:14:35.857821 (+     0us) transaction_rpc.cc:131] InvokeAsync
        .. .. 1113 06:14:35.860821 (+  3000us) transaction_rpc.cc:61] Finished
        .. .. 1113 06:14:35.860821 (+     0us) tablet_rpc.cc:308] Done(OK)
        .. .. 1113 06:14:35.860821 (+     0us) transaction_rpc.cc:124] InvokeCallback
        .. .. Related trace:
        .. .. 1113 06:14:35.857821 (+     0us) outbound_call.cc:165] yb.tserver.TabletServerService.GetTransactionStatus.
        .. .. 1113 06:14:35.857821 (+     0us) reactor.cc:736] Scheduled.
        .. .. 1113 06:14:35.857821 (+     0us) outbound_call.cc:441] Queued.
        .. .. 1113 06:14:35.857821 (+     0us) outbound_call.cc:451] Call Sent.
        .. .. 1113 06:14:35.860821 (+  3000us) outbound_call.cc:392] Response received.
        .. .. 1113 06:14:35.860821 (+     0us) outbound_call.cc:393] from { remote: 127.0.0.3:22188 idx: 7 protocol: 0x00007f529eaa6ea0 -> tcpc }
        .. .. 1113 06:14:35.860821 (+     0us) outbound_call.cc:354] Callback called asynchronously.
         END.
        .. .. Related trace:
        .. .. 1113 06:14:35.854683 (+     0us) outbound_call.cc:165] yb.tserver.TabletServerService.Write.
        .. .. 1113 06:14:35.855683 (+  1000us) reactor.cc:736] Scheduled.
        .. .. 1113 06:14:35.855683 (+     0us) outbound_call.cc:441] Queued.
        .. .. 1113 06:14:35.855683 (+     0us) outbound_call.cc:451] Call Sent.
        .. .. 1113 06:14:35.861684 (+  6001us) outbound_call.cc:392] Response received.
        .. .. 1113 06:14:35.861684 (+     0us) outbound_call.cc:393] from { remote: 127.0.0.7:26739 idx: 0 protocol: 0x00007f3147488cc0 -> tcp }
        .. .. 1113 06:14:35.861684 (+     0us) outbound_call.cc:354] Callback called asynchronously.

Reviewers: hbhanawat, rthallam, rsami

Reviewed By: rsami

Subscribers: ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D13938
  • Loading branch information
amitanandaiyer committed Nov 21, 2021
1 parent a90b674 commit 9f63710
Show file tree
Hide file tree
Showing 17 changed files with 132 additions and 80 deletions.
11 changes: 3 additions & 8 deletions src/yb/client/async_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ METRIC_DEFINE_counter(server, consistent_prefix_failed_reads,
yb::MetricUnit::kRequests,
"Number of consistent prefix reads that failed to be served by the closest replica.");

DECLARE_bool(collect_end_to_end_traces);

DEFINE_int32(ybclient_print_trace_every_n, 0,
"Controls the rate at which traces from ybclient are printed. Setting this to 0 "
"disables printing the collected traces.");
Expand All @@ -84,9 +82,10 @@ DEFINE_bool(detect_duplicates_for_retryable_requests, true,
DEFINE_bool(ysql_forward_rpcs_to_local_tserver, false,
"When true, forward the PGSQL rpcs to the local tServer.");


DEFINE_CAPABILITY(PickReadTimeAtTabletServer, 0x8284d67b);

DECLARE_bool(collect_end_to_end_traces);

using namespace std::placeholders;

namespace yb {
Expand Down Expand Up @@ -134,7 +133,6 @@ AsyncRpc::AsyncRpc(
const AsyncRpcData& data, YBConsistencyLevel yb_consistency_level)
: Rpc(data.batcher->deadline(), data.batcher->messenger(), &data.batcher->proxy_cache()),
batcher_(data.batcher),
trace_(new Trace),
ops_(data.ops),
tablet_invoker_(LocalTabletServerOnly(ops_),
yb_consistency_level == YBConsistencyLevel::CONSISTENT_PREFIX,
Expand All @@ -149,9 +147,6 @@ AsyncRpc::AsyncRpc(
async_rpc_metrics_(data.batcher->async_rpc_metrics()) {
mutable_retrier()->mutable_controller()->set_allow_local_calls_in_curr_thread(
data.allow_local_calls_in_curr_thread);
if (Trace::CurrentTrace()) {
Trace::CurrentTrace()->AddChildTrace(trace_.get());
}
}

AsyncRpc::~AsyncRpc() {
Expand Down Expand Up @@ -416,7 +411,7 @@ template <class Req, class Resp>
void AsyncRpcBase<Req, Resp>::ProcessResponseFromTserver(const Status& status) {
TRACE_TO(trace_, "ProcessResponseFromTserver($0)", status.ToString(false));
if (resp_.has_trace_buffer()) {
TRACE_TO(trace_, "Received from server: $0", resp_.trace_buffer());
TRACE_TO(trace_, "Received from server: \n BEGIN\n$0 END.", resp_.trace_buffer());
}
NotifyBatcher(status);
if (!CommonResponseCheck(status)) {
Expand Down
4 changes: 0 additions & 4 deletions src/yb/client/async_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ class AsyncRpc : public rpc::Rpc, public TabletRpc {
std::shared_ptr<const YBTable> table() const;
const RemoteTablet& tablet() const { return *tablet_invoker_.tablet(); }
const InFlightOps& ops() const { return ops_; }
Trace *trace() { return trace_.get(); }

protected:
void Finished(const Status& status) override;
Expand Down Expand Up @@ -117,9 +116,6 @@ class AsyncRpc : public rpc::Rpc, public TabletRpc {
// completes, regardless of success or failure.
BatcherPtr batcher_;

// The trace buffer.
scoped_refptr<Trace> trace_;

// Operations which were batched into this RPC.
// These operations are in kRequestSent state.
InFlightOps ops_;
Expand Down
3 changes: 1 addition & 2 deletions src/yb/client/transaction_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class TransactionRpcBase : public rpc::Rpc, public internal::TabletRpc {
internal::RemoteTablet* tablet,
YBClient* client)
: rpc::Rpc(deadline, client->messenger(), &client->proxy_cache()),
trace_(new Trace),
invoker_(false /* local_tserver_only */,
false /* consistent_prefix */,
client,
Expand Down Expand Up @@ -84,7 +83,6 @@ class TransactionRpcBase : public rpc::Rpc, public internal::TabletRpc {
rpc::RpcController* controller,
rpc::ResponseCallback callback) = 0;

TracePtr trace_;
internal::TabletInvoker invoker_;
};

Expand All @@ -100,6 +98,7 @@ class TransactionRpc : public TransactionRpcBase {
: TransactionRpcBase(deadline, tablet, client),
callback_(std::move(callback)) {
req_.Swap(req);
TRACE_TO(trace_, Traits::kName);
}

virtual ~TransactionRpc() {}
Expand Down
6 changes: 6 additions & 0 deletions src/yb/docdb/conflict_resolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,14 @@ class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> {

MUST_USE_RESULT bool CheckResolutionDone(const Result<bool>& result) {
if (!result.ok()) {
TRACE("Abort: $0", result.status().ToString());
VLOG_WITH_PREFIX(4) << "Abort: " << result.status();
InvokeCallback(result.status());
return true;
}

if (result.get()) {
TRACE("No conflicts.");
VLOG_WITH_PREFIX(4) << "No conflicts: " << context_->GetResolutionHt();
InvokeCallback(context_->GetResolutionHt());
return true;
Expand All @@ -259,6 +261,8 @@ class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> {
void ResolveConflicts() {
VLOG_WITH_PREFIX(3) << "Conflicts: " << yb::ToString(conflicts_);
if (conflicts_.empty()) {
VTRACE(1, LogPrefix());
TRACE("No conflicts.");
InvokeCallback(context_->GetResolutionHt());
return;
}
Expand Down Expand Up @@ -393,6 +397,7 @@ class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> {
pending_requests_.store(remaining_transactions_);
for (auto& i : RemainingTransactions()) {
auto& transaction = i;
TRACE("FetchingTransactionStatus for $0", yb::ToString(transaction.id));
StatusRequest request = {
&transaction.id,
context_->GetResolutionHt(),
Expand Down Expand Up @@ -426,6 +431,7 @@ class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> {
pending_requests_.store(remaining_transactions_);
for (auto& i : RemainingTransactions()) {
auto& transaction = i;
TRACE("Aborting $0", yb::ToString(transaction.id));
status_manager().Abort(
transaction.id,
[self, &transaction](Result<TransactionStatusResult> result) {
Expand Down
4 changes: 2 additions & 2 deletions src/yb/docdb/docdb_rocksdb_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ void SeekPossiblyUsingNext(rocksdb::Iterator* iter, const Slice& seek_key,
int* next_count, int* seek_count) {
for (int nexts = FLAGS_max_nexts_to_avoid_seek; nexts-- > 0;) {
if (!iter->Valid() || iter->key().compare(seek_key) >= 0) {
VTRACE(2, "Did $0 Next(s) instead of a Seek", nexts);
VTRACE(3, "Did $0 Next(s) instead of a Seek", nexts);
return;
}
VLOG(4) << "Skipping: " << SubDocKey::DebugSliceToString(iter->key());
Expand All @@ -256,7 +256,7 @@ void SeekPossiblyUsingNext(rocksdb::Iterator* iter, const Slice& seek_key,
++*next_count;
}

VTRACE(2, "Forced to do an actual Seek after $0 Next(s)", FLAGS_max_nexts_to_avoid_seek);
VTRACE(3, "Forced to do an actual Seek after $0 Next(s)", FLAGS_max_nexts_to_avoid_seek);
iter->Seek(seek_key);
++*seek_count;
}
Expand Down
1 change: 1 addition & 0 deletions src/yb/rpc/local_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ LocalOutboundCall::LocalOutboundCall(
: OutboundCall(remote_method, outbound_call_metrics, /* method_metrics= */ nullptr,
response_storage, controller, rpc_metrics, std::move(callback),
/* callback_thread_pool= */ nullptr) {
TRACE_TO(trace_, "LocalOutboundCall");
}

Status LocalOutboundCall::SetRequestParam(
Expand Down
12 changes: 6 additions & 6 deletions src/yb/rpc/outbound_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,15 @@ OutboundCall::OutboundCall(const RemoteMethod* remote_method,
start_(CoarseMonoClock::Now()),
controller_(DCHECK_NOTNULL(controller)),
response_(DCHECK_NOTNULL(response_storage)),
trace_(new Trace),
call_id_(NextCallId()),
remote_method_(remote_method),
callback_(std::move(callback)),
callback_thread_pool_(callback_thread_pool),
trace_(new Trace),
outbound_call_metrics_(outbound_call_metrics),
rpc_metrics_(rpc_metrics),
method_metrics_(std::move(method_metrics)) {
// Avoid expensive conn_id.ToString() in production.
TRACE_TO_WITH_TIME(trace_, start_, "Outbound Call initiated.");
TRACE_TO_WITH_TIME(trace_, start_, "$0.", remote_method_->ToString());

if (Trace::CurrentTrace()) {
Trace::CurrentTrace()->AddChildTrace(trace_.get());
Expand Down Expand Up @@ -349,10 +348,10 @@ void OutboundCall::InvokeCallback() {
if (callback_thread_pool_) {
callback_task_.SetOutboundCall(shared_from(this));
callback_thread_pool_->Enqueue(&callback_task_);
TRACE_TO(trace_, "Callback called asynchronously.");
TRACE_TO(trace_, "Callback will be called asynchronously.");
} else {
InvokeCallbackSync();
TRACE_TO(trace_, "Callback called.");
TRACE_TO(trace_, "Callback called synchronously.");
}
}

Expand Down Expand Up @@ -388,6 +387,8 @@ void OutboundCall::SetResponse(CallResponse&& resp) {

auto now = CoarseMonoClock::Now();
TRACE_TO_WITH_TIME(trace_, now, "Response received.");
// Avoid expensive conn_id.ToString() in production.
VTRACE_TO(1, trace_, "from $0", conn_id_.ToString());
// Track time taken to be responded.

if (outbound_call_metrics_) {
Expand Down Expand Up @@ -459,7 +460,6 @@ void OutboundCall::SetFinished() {
if (SetState(FINISHED_SUCCESS)) {
InvokeCallback();
}
TRACE_TO(trace_, "Callback called.");
}

void OutboundCall::SetFailed(const Status &status, std::unique_ptr<ErrorStatusPB> err_pb) {
Expand Down
6 changes: 3 additions & 3 deletions src/yb/rpc/outbound_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ class OutboundCall : public RpcCall {
// Can be used only while callback_ object is alive.
google::protobuf::Message* response_;

// The trace buffer.
scoped_refptr<Trace> trace_;

private:
friend class RpcController;

Expand Down Expand Up @@ -400,9 +403,6 @@ class OutboundCall : public RpcCall {
// Once a response has been received for this call, contains that response.
CallResponse call_response_;

// The trace buffer.
scoped_refptr<Trace> trace_;

std::shared_ptr<OutboundCallMetrics> outbound_call_metrics_;

RpcMetrics* rpc_metrics_;
Expand Down
19 changes: 17 additions & 2 deletions src/yb/rpc/rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "yb/util/flag_tags.h"
#include "yb/util/random_util.h"
#include "yb/util/tsan_util.h"
#include "yb/util/trace.h"

using namespace std::literals;
using namespace std::placeholders;
Expand Down Expand Up @@ -77,6 +78,14 @@ using strings::SubstituteAndAppend;

namespace rpc {

RpcCommand::RpcCommand() : trace_(new Trace) {
if (Trace::CurrentTrace()) {
Trace::CurrentTrace()->AddChildTrace(trace_.get());
}
}

RpcCommand::~RpcCommand() {}

RpcRetrier::RpcRetrier(CoarseTimePoint deadline, Messenger* messenger, ProxyCache *proxy_cache)
: start_(CoarseMonoClock::now()),
deadline_(deadline),
Expand Down Expand Up @@ -193,8 +202,10 @@ void RpcRetrier::DoRetry(RpcCommand* rpc, const Status& status) {
}
task_id_ = kInvalidTaskId;
if (!run) {
rpc->Finished(STATUS_FORMAT(
Aborted, "$0 aborted: $1", rpc->ToString(), yb::rpc::ToString(expected_state)));
auto status = STATUS_FORMAT(
Aborted, "$0 aborted: $1", rpc->ToString(), yb::rpc::ToString(expected_state));
VTRACE_TO(1, rpc->trace(), "Rpc Finished with status $0", status.ToString());
rpc->Finished(status);
return;
}
Status new_status = status;
Expand All @@ -214,13 +225,15 @@ void RpcRetrier::DoRetry(RpcCommand* rpc, const Status& status) {
}
if (new_status.ok()) {
controller_.Reset();
VTRACE_TO(1, rpc->trace(), "Sending Rpc");
rpc->SendRpc();
} else {
// Service unavailable here means that we failed to to schedule delayed task, i.e. reactor
// is shutted down.
if (new_status.IsServiceUnavailable()) {
new_status = STATUS_FORMAT(Aborted, "Aborted because of $0", new_status);
}
VTRACE_TO(1, rpc->trace(), "Rpc Finished with status $0", new_status.ToString());
rpc->Finished(new_status);
}
expected_state = RpcRetrierState::kRunning;
Expand Down Expand Up @@ -275,6 +288,7 @@ void Rpc::ScheduleRetry(const Status& status) {
auto retry_status = mutable_retrier()->DelayedRetry(this, status);
if (!retry_status.ok()) {
LOG(WARNING) << "Failed to schedule retry: " << retry_status;
VTRACE_TO(1, trace(), "Rpc Finished with status $0", retry_status.ToString());
Finished(retry_status);
}
}
Expand Down Expand Up @@ -348,6 +362,7 @@ bool Rpcs::RegisterAndStart(RpcCommandPtr call, Handle* handle) {
return false;
}

VTRACE_TO(1, (***handle).trace(), "Sending Rpc");
(***handle).SendRpc();
return true;
}
Expand Down
12 changes: 10 additions & 2 deletions src/yb/rpc/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@

namespace yb {

class Trace;

namespace rpc {

class Messenger;
Expand All @@ -58,6 +60,8 @@ class Rpc;
// The command that could be retried by RpcRetrier.
class RpcCommand : public std::enable_shared_from_this<RpcCommand> {
public:
RpcCommand();

// Asynchronously sends the RPC to the remote end.
//
// Subclasses should use Finished() below as the callback function.
Expand All @@ -74,8 +78,13 @@ class RpcCommand : public std::enable_shared_from_this<RpcCommand> {

virtual CoarseTimePoint deadline() const = 0;

Trace* trace() { return trace_.get(); }

protected:
~RpcCommand() {}
virtual ~RpcCommand();

// The trace buffer.
scoped_refptr<Trace> trace_;
};

YB_DEFINE_ENUM(RpcRetrierState, (kIdle)(kRunning)(kScheduling)(kWaiting)(kFinished));
Expand Down Expand Up @@ -204,7 +213,6 @@ class Rpc : public RpcCommand {
}

void ScheduleRetry(const Status& status);

protected:
const RpcRetrier& retrier() const { return retrier_; }
RpcRetrier* mutable_retrier() { return &retrier_; }
Expand Down
2 changes: 1 addition & 1 deletion src/yb/rpc/service_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class ServicePoolImpl final : public InboundCallHandler {
} else if (PREDICT_FALSE(ShouldDropRequestDuringHighLoad(incoming))) {
error_message = "The server is overloaded. Call waited in the queue past max_time_in_queue.";
} else {
TRACE_TO(incoming->trace(), "Handling call");
TRACE_TO(incoming->trace(), "Handling call $0", yb::ToString(incoming->method_name()));

if (incoming->TryStartProcessing()) {
service_->Handle(std::move(incoming));
Expand Down
2 changes: 1 addition & 1 deletion src/yb/rpc/yb_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ void YBInboundCall::LogTrace() const {
// The traces may also be too large to fit in a log message.
LOG(WARNING) << ToString() << " took " << total_time << "ms (client timeout "
<< header_.timeout_ms << "ms).";
std::string s = trace_->DumpToString("==>", true);
std::string s = trace_->DumpToString(1, true);
if (!s.empty()) {
LOG(WARNING) << "Trace:\n" << s;
}
Expand Down
1 change: 1 addition & 0 deletions src/yb/tablet/operations/operation_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ void OperationDriver::ExecuteAsync() {
VLOG_WITH_PREFIX(4) << "ExecuteAsync()";
TRACE_EVENT_FLOW_BEGIN0("operation", "ExecuteAsync", this);
ADOPT_TRACE(trace());
TRACE_FUNC();

auto delay = GetAtomicFlag(&FLAGS_TEST_delay_execute_async_ms);
if (delay != 0 &&
Expand Down
4 changes: 4 additions & 0 deletions src/yb/tablet/running_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "yb/util/flag_tags.h"
#include "yb/util/tsan_util.h"
#include "yb/util/trace.h"
#include "yb/util/yb_pg_errcodes.h"

using namespace std::placeholders;
Expand Down Expand Up @@ -206,6 +207,8 @@ boost::optional<TransactionStatus> RunningTransaction::GetStatusAt(

void RunningTransaction::SendStatusRequest(
int64_t serial_no, const RunningTransactionPtr& shared_self) {
TRACE_FUNC();
VTRACE(1, yb::ToString(metadata_.transaction_id));
tserver::GetTransactionStatusRequestPB req;
req.set_tablet_id(metadata_.status_tablet);
req.add_transaction_id()->assign(
Expand Down Expand Up @@ -268,6 +271,7 @@ void RunningTransaction::DoStatusReceived(const Status& status,
const tserver::GetTransactionStatusResponsePB& response,
int64_t serial_no,
const RunningTransactionPtr& shared_self) {
TRACE("$0: $1", __func__, response.ShortDebugString());
VLOG_WITH_PREFIX(4) << __func__ << "(" << status << ", " << response.ShortDebugString() << ", "
<< serial_no << ")";

Expand Down
Loading

0 comments on commit 9f63710

Please sign in to comment.