From 9f63710168c478461ac93e924d314f67390124e5 Mon Sep 17 00:00:00 2001 From: Amitanand Aiyer Date: Fri, 12 Nov 2021 10:20:07 -0800 Subject: [PATCH] [#10660] [YBASE] Refactor Traces: Define trace in RpcCommand so that it can be used across all defined Rpcs Summary: 1) Define trace in RpcCommand so that it can be used across all defined RPCs - remove trace_ from transaction_rpc.cc because we already have it in the superclass 2) Add begin/end to demarcate the traces received from the server. 3) Plumb through the tracing for write ops. The initial traces in the handler threads were being missed earlier. 4) Adding traces to SharedLockManager. TO BE FIXED. Does not decode keys properly. 5) Add more tracing to the conflict resolution part 6) Add a tracing depth flag for controlling how many levels of children traces are to be printed. 7) Populate trace_buffer even upon write failures. Changes the write failure message to 23954 2021-11-12 22:57:42.347 UTC [12299] STATEMENT: UPDATE t SET value = 'NEW' WHERE key = 0 23955 ../../src/yb/yql/pgwrapper/pg_mini-test.cc:214: Failure 23956 Failed 23957 Bad status: Network error (yb/yql/pgwrapper/libpq_utils.cc:226): Execute of 'UPDATE t SET value = 'NEW' WHERE key = 0' failed: 7, message: ERROR: All transparent retries exhausted. Operation failed. Try again.: 84f3da10-d961-4251-97ef-96f197810acf Conflicts with higher priority transaction: 7420b446-874c-4ab c-8d77-c51b33f93008 23958 (pgsql error 40001) 23959 W1112 22:57:43.135255 11959 outbound_call.cc:380] RPC callback for RPC call yb.tserver.TabletServerService.GetTransactionStatus -> { remote: 127.0.0.5:19056 idx: 2 protocol: 0x00007f0a2ba62ea0 -> tcpc } , state=FINISHED_SUCCESS. took 0.135s Test Plan: jenkins. run a test rb -- --cxx-test pg_mini-test --gtest_filter PgMiniTest.FollowerReadsWithWrite --test_args --collect_end_to_end_traces=trust_args --ybclient_print_trace_every_n=1 --test_args --tracing_level=1 and look for the traces. Example output: I1113 06:14:35.862326 16976 async_rpc.cc:160] Write(tablet: 80956799d32543be8c74bbfd97cae8e9, num_ops: 1, num_attempts: 1, txn: ca97464e-1ee3-4dc4-a4a1-aaf746a4efd8, subtxn: [none]) took 8000us. Trace: .. 1113 06:14:35.854683 (+ 0us) async_rpc.cc:487] WriteRpc initiated .. 1113 06:14:35.854683 (+ 0us) async_rpc.cc:488] Tablet 80956799d32543be8c74bbfd97cae8e9 table yugabyte.t .. 1113 06:14:35.854683 (+ 0us) async_rpc.cc:169] SendRpc() called. .. 1113 06:14:35.854683 (+ 0us) tablet_rpc.cc:101] SelectTabletServer() .. 1113 06:14:35.854683 (+ 0us) tablet_rpc.cc:153] Selected { uuid: 53a481d52a984d1a8b7e96d249aff21f private: [host: "127.0.0.6" port: 26739] public: [host: "127.0.0.7.ip.yugabyte" port: 26739] cloud_info: placement_cloud: "cloud2" placement_region: "rack3" placement_zone: "zone" .. 1113 06:14:35.854683 (+ 0us) async_rpc.cc:547] SendRpcToTserver .. 1113 06:14:35.855683 (+ 1000us) async_rpc.cc:552] RpcDispatched Asynchronously .. 1113 06:14:35.861684 (+ 6001us) tablet_rpc.cc:308] Done(OK) .. 1113 06:14:35.861684 (+ 0us) async_rpc.cc:413] ProcessResponseFromTserver(Operation failed. Try again.: ca97464e-1ee3-4dc4-a4a1-aaf746a4efd8 Conflicts with higher priority transaction: 52ec72c4-c941-4f40-96ad-3478a3ca0528 (transaction error 3)) .. 1113 06:14:35.861684 (+ 0us) async_rpc.cc:415] Received from server: BEGIN .. 1113 06:14:35.855820 (+ 0us) inbound_call.cc:90] Created InboundCall .. 1113 06:14:35.855820 (+ 0us) service_pool.cc:180] Inserting onto call queue .. 1113 06:14:35.855820 (+ 0us) service_pool.cc:261] Handling call Write .. 1113 06:14:35.855820 (+ 0us) tablet_service.cc:1591] Start Write .. 1113 06:14:35.855820 (+ 0us) tablet.cc:2019] AcquireLocksAndPerformDocOperations .. 1113 06:14:35.855820 (+ 0us) tablet.cc:3776] Acquiring write permit .. 1113 06:14:35.856820 (+ 1000us) operation_counter.cc:187] ScopedRWOperation Tablet schema .. 1113 06:14:35.856820 (+ 0us) tablet.cc:3777] Acquiring write permit done .. 1113 06:14:35.856820 (+ 0us) operation_counter.cc:199] Reset null .. 1113 06:14:35.856820 (+ 0us) operation_counter.cc:199] Reset null .. 1113 06:14:35.856820 (+ 0us) operation_counter.cc:187] ScopedRWOperation RocksDB non-abortable read/write operations .. 1113 06:14:35.856820 (+ 0us) operation_counter.cc:199] Reset null .. 1113 06:14:35.856820 (+ 0us) shared_lock_manager.cc:255] Locking a batch of 3 keys .. 1113 06:14:35.856820 (+ 0us) shared_lock_manager.cc:295] Acquired a lock batch of 3 keys .. 1113 06:14:35.856820 (+ 0us) conflict_resolution.cc:966] ResolveTransactionConflicts .. 1113 06:14:35.857821 (+ 1001us) conflict_resolution.cc:401] FetchingTransactionStatus for 52ec72c4-c941-4f40-96ad-3478a3ca0528 .. 1113 06:14:35.857821 (+ 0us) running_transaction.cc:210] SendStatusRequest : 52ec72c4-c941-4f40-96ad-3478a3ca0528 .. 1113 06:14:35.857821 (+ 0us) conflict_resolution.cc:973] resolver->Resolve done .. 1113 06:14:35.857821 (+ 0us) operation_counter.cc:199] Reset null .. 1113 06:14:35.857821 (+ 0us) operation_counter.cc:199] Reset null .. .. Related trace: .. .. 1113 06:14:35.857821 (+ 0us) transaction_rpc.cc:104] GetTransactionStatus .. .. 1113 06:14:35.857821 (+ 0us) transaction_rpc.cc:105] tablet_id: "8d36c698a53745d99d0f4a15e6f52206" transaction_id: "R\354r\304\311AO@\226\2554x\243\312\005(" propagated_hybrid_time: 6704267574709948416 .. .. 1113 06:14:35.857821 (+ 0us) transaction_rpc.cc:56] SendRpc .. .. 1113 06:14:35.857821 (+ 0us) tablet_rpc.cc:101] SelectTabletServer() .. .. 1113 06:14:35.857821 (+ 0us) tablet_rpc.cc:153] Selected { uuid: 320973da990b4ce7ba3915b1075f0848 private: [host: "127.0.0.2" port: 22188] public: [host: "127.0.0.3.ip.yugabyte" port: 22188] cloud_info: placement_cloud: "cloud1" placement_region: "rack1" placement_zone: "zone" .. .. 1113 06:14:35.857821 (+ 0us) transaction_rpc.cc:131] InvokeAsync .. .. 1113 06:14:35.860821 (+ 3000us) transaction_rpc.cc:61] Finished .. .. 1113 06:14:35.860821 (+ 0us) tablet_rpc.cc:308] Done(OK) .. .. 1113 06:14:35.860821 (+ 0us) transaction_rpc.cc:124] InvokeCallback .. .. Related trace: .. .. 1113 06:14:35.857821 (+ 0us) outbound_call.cc:165] yb.tserver.TabletServerService.GetTransactionStatus. .. .. 1113 06:14:35.857821 (+ 0us) reactor.cc:736] Scheduled. .. .. 1113 06:14:35.857821 (+ 0us) outbound_call.cc:441] Queued. .. .. 1113 06:14:35.857821 (+ 0us) outbound_call.cc:451] Call Sent. .. .. 1113 06:14:35.860821 (+ 3000us) outbound_call.cc:392] Response received. .. .. 1113 06:14:35.860821 (+ 0us) outbound_call.cc:393] from { remote: 127.0.0.3:22188 idx: 7 protocol: 0x00007f529eaa6ea0 -> tcpc } .. .. 1113 06:14:35.860821 (+ 0us) outbound_call.cc:354] Callback called asynchronously. END. .. .. Related trace: .. .. 1113 06:14:35.854683 (+ 0us) outbound_call.cc:165] yb.tserver.TabletServerService.Write. .. .. 1113 06:14:35.855683 (+ 1000us) reactor.cc:736] Scheduled. .. .. 1113 06:14:35.855683 (+ 0us) outbound_call.cc:441] Queued. .. .. 1113 06:14:35.855683 (+ 0us) outbound_call.cc:451] Call Sent. .. .. 1113 06:14:35.861684 (+ 6001us) outbound_call.cc:392] Response received. .. .. 1113 06:14:35.861684 (+ 0us) outbound_call.cc:393] from { remote: 127.0.0.7:26739 idx: 0 protocol: 0x00007f3147488cc0 -> tcp } .. .. 1113 06:14:35.861684 (+ 0us) outbound_call.cc:354] Callback called asynchronously. Reviewers: hbhanawat, rthallam, rsami Reviewed By: rsami Subscribers: ybase, bogdan Differential Revision: https://phabricator.dev.yugabyte.com/D13938 --- src/yb/client/async_rpc.cc | 11 +-- src/yb/client/async_rpc.h | 4 - src/yb/client/transaction_rpc.cc | 3 +- src/yb/docdb/conflict_resolution.cc | 6 ++ src/yb/docdb/docdb_rocksdb_util.cc | 4 +- src/yb/rpc/local_call.cc | 1 + src/yb/rpc/outbound_call.cc | 12 +-- src/yb/rpc/outbound_call.h | 6 +- src/yb/rpc/rpc.cc | 19 +++- src/yb/rpc/rpc.h | 12 ++- src/yb/rpc/service_pool.cc | 2 +- src/yb/rpc/yb_rpc.cc | 2 +- src/yb/tablet/operations/operation_driver.cc | 1 + src/yb/tablet/running_transaction.cc | 4 + src/yb/tserver/tablet_service.cc | 19 +++- src/yb/util/trace.cc | 91 +++++++++++--------- src/yb/util/trace.h | 15 ++-- 17 files changed, 132 insertions(+), 80 deletions(-) diff --git a/src/yb/client/async_rpc.cc b/src/yb/client/async_rpc.cc index 06c093cae58d..265b1df45b42 100644 --- a/src/yb/client/async_rpc.cc +++ b/src/yb/client/async_rpc.cc @@ -63,8 +63,6 @@ METRIC_DEFINE_counter(server, consistent_prefix_failed_reads, yb::MetricUnit::kRequests, "Number of consistent prefix reads that failed to be served by the closest replica."); -DECLARE_bool(collect_end_to_end_traces); - DEFINE_int32(ybclient_print_trace_every_n, 0, "Controls the rate at which traces from ybclient are printed. Setting this to 0 " "disables printing the collected traces."); @@ -84,9 +82,10 @@ DEFINE_bool(detect_duplicates_for_retryable_requests, true, DEFINE_bool(ysql_forward_rpcs_to_local_tserver, false, "When true, forward the PGSQL rpcs to the local tServer."); - DEFINE_CAPABILITY(PickReadTimeAtTabletServer, 0x8284d67b); +DECLARE_bool(collect_end_to_end_traces); + using namespace std::placeholders; namespace yb { @@ -134,7 +133,6 @@ AsyncRpc::AsyncRpc( const AsyncRpcData& data, YBConsistencyLevel yb_consistency_level) : Rpc(data.batcher->deadline(), data.batcher->messenger(), &data.batcher->proxy_cache()), batcher_(data.batcher), - trace_(new Trace), ops_(data.ops), tablet_invoker_(LocalTabletServerOnly(ops_), yb_consistency_level == YBConsistencyLevel::CONSISTENT_PREFIX, @@ -149,9 +147,6 @@ AsyncRpc::AsyncRpc( async_rpc_metrics_(data.batcher->async_rpc_metrics()) { mutable_retrier()->mutable_controller()->set_allow_local_calls_in_curr_thread( data.allow_local_calls_in_curr_thread); - if (Trace::CurrentTrace()) { - Trace::CurrentTrace()->AddChildTrace(trace_.get()); - } } AsyncRpc::~AsyncRpc() { @@ -416,7 +411,7 @@ template void AsyncRpcBase::ProcessResponseFromTserver(const Status& status) { TRACE_TO(trace_, "ProcessResponseFromTserver($0)", status.ToString(false)); if (resp_.has_trace_buffer()) { - TRACE_TO(trace_, "Received from server: $0", resp_.trace_buffer()); + TRACE_TO(trace_, "Received from server: \n BEGIN\n$0 END.", resp_.trace_buffer()); } NotifyBatcher(status); if (!CommonResponseCheck(status)) { diff --git a/src/yb/client/async_rpc.h b/src/yb/client/async_rpc.h index 9c684930e3e2..f7069ff5dc63 100644 --- a/src/yb/client/async_rpc.h +++ b/src/yb/client/async_rpc.h @@ -86,7 +86,6 @@ class AsyncRpc : public rpc::Rpc, public TabletRpc { std::shared_ptr table() const; const RemoteTablet& tablet() const { return *tablet_invoker_.tablet(); } const InFlightOps& ops() const { return ops_; } - Trace *trace() { return trace_.get(); } protected: void Finished(const Status& status) override; @@ -117,9 +116,6 @@ class AsyncRpc : public rpc::Rpc, public TabletRpc { // completes, regardless of success or failure. BatcherPtr batcher_; - // The trace buffer. - scoped_refptr trace_; - // Operations which were batched into this RPC. // These operations are in kRequestSent state. InFlightOps ops_; diff --git a/src/yb/client/transaction_rpc.cc b/src/yb/client/transaction_rpc.cc index 664a564f88b2..ebfcc4d8cb87 100644 --- a/src/yb/client/transaction_rpc.cc +++ b/src/yb/client/transaction_rpc.cc @@ -38,7 +38,6 @@ class TransactionRpcBase : public rpc::Rpc, public internal::TabletRpc { internal::RemoteTablet* tablet, YBClient* client) : rpc::Rpc(deadline, client->messenger(), &client->proxy_cache()), - trace_(new Trace), invoker_(false /* local_tserver_only */, false /* consistent_prefix */, client, @@ -84,7 +83,6 @@ class TransactionRpcBase : public rpc::Rpc, public internal::TabletRpc { rpc::RpcController* controller, rpc::ResponseCallback callback) = 0; - TracePtr trace_; internal::TabletInvoker invoker_; }; @@ -100,6 +98,7 @@ class TransactionRpc : public TransactionRpcBase { : TransactionRpcBase(deadline, tablet, client), callback_(std::move(callback)) { req_.Swap(req); + TRACE_TO(trace_, Traits::kName); } virtual ~TransactionRpc() {} diff --git a/src/yb/docdb/conflict_resolution.cc b/src/yb/docdb/conflict_resolution.cc index ec0972e3b005..590a15b94731 100644 --- a/src/yb/docdb/conflict_resolution.cc +++ b/src/yb/docdb/conflict_resolution.cc @@ -242,12 +242,14 @@ class ConflictResolver : public std::enable_shared_from_this { MUST_USE_RESULT bool CheckResolutionDone(const Result& result) { if (!result.ok()) { + TRACE("Abort: $0", result.status().ToString()); VLOG_WITH_PREFIX(4) << "Abort: " << result.status(); InvokeCallback(result.status()); return true; } if (result.get()) { + TRACE("No conflicts."); VLOG_WITH_PREFIX(4) << "No conflicts: " << context_->GetResolutionHt(); InvokeCallback(context_->GetResolutionHt()); return true; @@ -259,6 +261,8 @@ class ConflictResolver : public std::enable_shared_from_this { void ResolveConflicts() { VLOG_WITH_PREFIX(3) << "Conflicts: " << yb::ToString(conflicts_); if (conflicts_.empty()) { + VTRACE(1, LogPrefix()); + TRACE("No conflicts."); InvokeCallback(context_->GetResolutionHt()); return; } @@ -393,6 +397,7 @@ class ConflictResolver : public std::enable_shared_from_this { pending_requests_.store(remaining_transactions_); for (auto& i : RemainingTransactions()) { auto& transaction = i; + TRACE("FetchingTransactionStatus for $0", yb::ToString(transaction.id)); StatusRequest request = { &transaction.id, context_->GetResolutionHt(), @@ -426,6 +431,7 @@ class ConflictResolver : public std::enable_shared_from_this { pending_requests_.store(remaining_transactions_); for (auto& i : RemainingTransactions()) { auto& transaction = i; + TRACE("Aborting $0", yb::ToString(transaction.id)); status_manager().Abort( transaction.id, [self, &transaction](Result result) { diff --git a/src/yb/docdb/docdb_rocksdb_util.cc b/src/yb/docdb/docdb_rocksdb_util.cc index 165477d797a5..ccc319ff65a4 100644 --- a/src/yb/docdb/docdb_rocksdb_util.cc +++ b/src/yb/docdb/docdb_rocksdb_util.cc @@ -247,7 +247,7 @@ void SeekPossiblyUsingNext(rocksdb::Iterator* iter, const Slice& seek_key, int* next_count, int* seek_count) { for (int nexts = FLAGS_max_nexts_to_avoid_seek; nexts-- > 0;) { if (!iter->Valid() || iter->key().compare(seek_key) >= 0) { - VTRACE(2, "Did $0 Next(s) instead of a Seek", nexts); + VTRACE(3, "Did $0 Next(s) instead of a Seek", nexts); return; } VLOG(4) << "Skipping: " << SubDocKey::DebugSliceToString(iter->key()); @@ -256,7 +256,7 @@ void SeekPossiblyUsingNext(rocksdb::Iterator* iter, const Slice& seek_key, ++*next_count; } - VTRACE(2, "Forced to do an actual Seek after $0 Next(s)", FLAGS_max_nexts_to_avoid_seek); + VTRACE(3, "Forced to do an actual Seek after $0 Next(s)", FLAGS_max_nexts_to_avoid_seek); iter->Seek(seek_key); ++*seek_count; } diff --git a/src/yb/rpc/local_call.cc b/src/yb/rpc/local_call.cc index 22429d98281f..391b6fe42ba8 100644 --- a/src/yb/rpc/local_call.cc +++ b/src/yb/rpc/local_call.cc @@ -31,6 +31,7 @@ LocalOutboundCall::LocalOutboundCall( : OutboundCall(remote_method, outbound_call_metrics, /* method_metrics= */ nullptr, response_storage, controller, rpc_metrics, std::move(callback), /* callback_thread_pool= */ nullptr) { + TRACE_TO(trace_, "LocalOutboundCall"); } Status LocalOutboundCall::SetRequestParam( diff --git a/src/yb/rpc/outbound_call.cc b/src/yb/rpc/outbound_call.cc index f5acf872111f..2dc64c5e9a35 100644 --- a/src/yb/rpc/outbound_call.cc +++ b/src/yb/rpc/outbound_call.cc @@ -151,16 +151,15 @@ OutboundCall::OutboundCall(const RemoteMethod* remote_method, start_(CoarseMonoClock::Now()), controller_(DCHECK_NOTNULL(controller)), response_(DCHECK_NOTNULL(response_storage)), + trace_(new Trace), call_id_(NextCallId()), remote_method_(remote_method), callback_(std::move(callback)), callback_thread_pool_(callback_thread_pool), - trace_(new Trace), outbound_call_metrics_(outbound_call_metrics), rpc_metrics_(rpc_metrics), method_metrics_(std::move(method_metrics)) { - // Avoid expensive conn_id.ToString() in production. - TRACE_TO_WITH_TIME(trace_, start_, "Outbound Call initiated."); + TRACE_TO_WITH_TIME(trace_, start_, "$0.", remote_method_->ToString()); if (Trace::CurrentTrace()) { Trace::CurrentTrace()->AddChildTrace(trace_.get()); @@ -349,10 +348,10 @@ void OutboundCall::InvokeCallback() { if (callback_thread_pool_) { callback_task_.SetOutboundCall(shared_from(this)); callback_thread_pool_->Enqueue(&callback_task_); - TRACE_TO(trace_, "Callback called asynchronously."); + TRACE_TO(trace_, "Callback will be called asynchronously."); } else { InvokeCallbackSync(); - TRACE_TO(trace_, "Callback called."); + TRACE_TO(trace_, "Callback called synchronously."); } } @@ -388,6 +387,8 @@ void OutboundCall::SetResponse(CallResponse&& resp) { auto now = CoarseMonoClock::Now(); TRACE_TO_WITH_TIME(trace_, now, "Response received."); + // Avoid expensive conn_id.ToString() in production. + VTRACE_TO(1, trace_, "from $0", conn_id_.ToString()); // Track time taken to be responded. if (outbound_call_metrics_) { @@ -459,7 +460,6 @@ void OutboundCall::SetFinished() { if (SetState(FINISHED_SUCCESS)) { InvokeCallback(); } - TRACE_TO(trace_, "Callback called."); } void OutboundCall::SetFailed(const Status &status, std::unique_ptr err_pb) { diff --git a/src/yb/rpc/outbound_call.h b/src/yb/rpc/outbound_call.h index 0b613bbe66e2..9d414178e7d3 100644 --- a/src/yb/rpc/outbound_call.h +++ b/src/yb/rpc/outbound_call.h @@ -342,6 +342,9 @@ class OutboundCall : public RpcCall { // Can be used only while callback_ object is alive. google::protobuf::Message* response_; + // The trace buffer. + scoped_refptr trace_; + private: friend class RpcController; @@ -400,9 +403,6 @@ class OutboundCall : public RpcCall { // Once a response has been received for this call, contains that response. CallResponse call_response_; - // The trace buffer. - scoped_refptr trace_; - std::shared_ptr outbound_call_metrics_; RpcMetrics* rpc_metrics_; diff --git a/src/yb/rpc/rpc.cc b/src/yb/rpc/rpc.cc index dc0a51e179ea..64466d8d173a 100644 --- a/src/yb/rpc/rpc.cc +++ b/src/yb/rpc/rpc.cc @@ -45,6 +45,7 @@ #include "yb/util/flag_tags.h" #include "yb/util/random_util.h" #include "yb/util/tsan_util.h" +#include "yb/util/trace.h" using namespace std::literals; using namespace std::placeholders; @@ -77,6 +78,14 @@ using strings::SubstituteAndAppend; namespace rpc { +RpcCommand::RpcCommand() : trace_(new Trace) { + if (Trace::CurrentTrace()) { + Trace::CurrentTrace()->AddChildTrace(trace_.get()); + } +} + +RpcCommand::~RpcCommand() {} + RpcRetrier::RpcRetrier(CoarseTimePoint deadline, Messenger* messenger, ProxyCache *proxy_cache) : start_(CoarseMonoClock::now()), deadline_(deadline), @@ -193,8 +202,10 @@ void RpcRetrier::DoRetry(RpcCommand* rpc, const Status& status) { } task_id_ = kInvalidTaskId; if (!run) { - rpc->Finished(STATUS_FORMAT( - Aborted, "$0 aborted: $1", rpc->ToString(), yb::rpc::ToString(expected_state))); + auto status = STATUS_FORMAT( + Aborted, "$0 aborted: $1", rpc->ToString(), yb::rpc::ToString(expected_state)); + VTRACE_TO(1, rpc->trace(), "Rpc Finished with status $0", status.ToString()); + rpc->Finished(status); return; } Status new_status = status; @@ -214,6 +225,7 @@ void RpcRetrier::DoRetry(RpcCommand* rpc, const Status& status) { } if (new_status.ok()) { controller_.Reset(); + VTRACE_TO(1, rpc->trace(), "Sending Rpc"); rpc->SendRpc(); } else { // Service unavailable here means that we failed to to schedule delayed task, i.e. reactor @@ -221,6 +233,7 @@ void RpcRetrier::DoRetry(RpcCommand* rpc, const Status& status) { if (new_status.IsServiceUnavailable()) { new_status = STATUS_FORMAT(Aborted, "Aborted because of $0", new_status); } + VTRACE_TO(1, rpc->trace(), "Rpc Finished with status $0", new_status.ToString()); rpc->Finished(new_status); } expected_state = RpcRetrierState::kRunning; @@ -275,6 +288,7 @@ void Rpc::ScheduleRetry(const Status& status) { auto retry_status = mutable_retrier()->DelayedRetry(this, status); if (!retry_status.ok()) { LOG(WARNING) << "Failed to schedule retry: " << retry_status; + VTRACE_TO(1, trace(), "Rpc Finished with status $0", retry_status.ToString()); Finished(retry_status); } } @@ -348,6 +362,7 @@ bool Rpcs::RegisterAndStart(RpcCommandPtr call, Handle* handle) { return false; } + VTRACE_TO(1, (***handle).trace(), "Sending Rpc"); (***handle).SendRpc(); return true; } diff --git a/src/yb/rpc/rpc.h b/src/yb/rpc/rpc.h index 954df975d9d3..f128f07f578a 100644 --- a/src/yb/rpc/rpc.h +++ b/src/yb/rpc/rpc.h @@ -50,6 +50,8 @@ namespace yb { +class Trace; + namespace rpc { class Messenger; @@ -58,6 +60,8 @@ class Rpc; // The command that could be retried by RpcRetrier. class RpcCommand : public std::enable_shared_from_this { public: + RpcCommand(); + // Asynchronously sends the RPC to the remote end. // // Subclasses should use Finished() below as the callback function. @@ -74,8 +78,13 @@ class RpcCommand : public std::enable_shared_from_this { virtual CoarseTimePoint deadline() const = 0; + Trace* trace() { return trace_.get(); } + protected: - ~RpcCommand() {} + virtual ~RpcCommand(); + + // The trace buffer. + scoped_refptr trace_; }; YB_DEFINE_ENUM(RpcRetrierState, (kIdle)(kRunning)(kScheduling)(kWaiting)(kFinished)); @@ -204,7 +213,6 @@ class Rpc : public RpcCommand { } void ScheduleRetry(const Status& status); - protected: const RpcRetrier& retrier() const { return retrier_; } RpcRetrier* mutable_retrier() { return &retrier_; } diff --git a/src/yb/rpc/service_pool.cc b/src/yb/rpc/service_pool.cc index 3a200fe2911f..3d6304fe975b 100644 --- a/src/yb/rpc/service_pool.cc +++ b/src/yb/rpc/service_pool.cc @@ -257,7 +257,7 @@ class ServicePoolImpl final : public InboundCallHandler { } else if (PREDICT_FALSE(ShouldDropRequestDuringHighLoad(incoming))) { error_message = "The server is overloaded. Call waited in the queue past max_time_in_queue."; } else { - TRACE_TO(incoming->trace(), "Handling call"); + TRACE_TO(incoming->trace(), "Handling call $0", yb::ToString(incoming->method_name())); if (incoming->TryStartProcessing()) { service_->Handle(std::move(incoming)); diff --git a/src/yb/rpc/yb_rpc.cc b/src/yb/rpc/yb_rpc.cc index e2d5a026fbeb..ba7112ec89df 100644 --- a/src/yb/rpc/yb_rpc.cc +++ b/src/yb/rpc/yb_rpc.cc @@ -418,7 +418,7 @@ void YBInboundCall::LogTrace() const { // The traces may also be too large to fit in a log message. LOG(WARNING) << ToString() << " took " << total_time << "ms (client timeout " << header_.timeout_ms << "ms)."; - std::string s = trace_->DumpToString("==>", true); + std::string s = trace_->DumpToString(1, true); if (!s.empty()) { LOG(WARNING) << "Trace:\n" << s; } diff --git a/src/yb/tablet/operations/operation_driver.cc b/src/yb/tablet/operations/operation_driver.cc index ebbe02663ff1..55862dd21035 100644 --- a/src/yb/tablet/operations/operation_driver.cc +++ b/src/yb/tablet/operations/operation_driver.cc @@ -179,6 +179,7 @@ void OperationDriver::ExecuteAsync() { VLOG_WITH_PREFIX(4) << "ExecuteAsync()"; TRACE_EVENT_FLOW_BEGIN0("operation", "ExecuteAsync", this); ADOPT_TRACE(trace()); + TRACE_FUNC(); auto delay = GetAtomicFlag(&FLAGS_TEST_delay_execute_async_ms); if (delay != 0 && diff --git a/src/yb/tablet/running_transaction.cc b/src/yb/tablet/running_transaction.cc index 066730d2fd30..006f57388d5e 100644 --- a/src/yb/tablet/running_transaction.cc +++ b/src/yb/tablet/running_transaction.cc @@ -20,6 +20,7 @@ #include "yb/util/flag_tags.h" #include "yb/util/tsan_util.h" +#include "yb/util/trace.h" #include "yb/util/yb_pg_errcodes.h" using namespace std::placeholders; @@ -206,6 +207,8 @@ boost::optional RunningTransaction::GetStatusAt( void RunningTransaction::SendStatusRequest( int64_t serial_no, const RunningTransactionPtr& shared_self) { + TRACE_FUNC(); + VTRACE(1, yb::ToString(metadata_.transaction_id)); tserver::GetTransactionStatusRequestPB req; req.set_tablet_id(metadata_.status_tablet); req.add_transaction_id()->assign( @@ -268,6 +271,7 @@ void RunningTransaction::DoStatusReceived(const Status& status, const tserver::GetTransactionStatusResponsePB& response, int64_t serial_no, const RunningTransactionPtr& shared_self) { + TRACE("$0: $1", __func__, response.ShortDebugString()); VLOG_WITH_PREFIX(4) << __func__ << "(" << status << ", " << response.ShortDebugString() << ", " << serial_no << ")"; diff --git a/src/yb/tserver/tablet_service.cc b/src/yb/tserver/tablet_service.cc index 50eab00b2c95..26389319ebab 100644 --- a/src/yb/tserver/tablet_service.cc +++ b/src/yb/tserver/tablet_service.cc @@ -410,8 +410,13 @@ class WriteOperationCompletionCallback { tablet::WriteOperation* operation, const server::ClockPtr& clock, bool trace = false) - : tablet_peer_(std::move(tablet_peer)), context_(std::move(context)), response_(response), - operation_(operation), clock_(clock), include_trace_(trace) {} + : tablet_peer_(std::move(tablet_peer)), + context_(std::move(context)), + response_(response), + operation_(operation), + clock_(clock), + include_trace_(trace), + trace_(include_trace_ ? Trace::CurrentTrace() : nullptr) {} void operator()(Status status) const { VLOG(1) << __PRETTY_FUNCTION__ << " completing with status " << status; @@ -423,8 +428,13 @@ class WriteOperationCompletionCallback { status = Status::OK(); } + TRACE("Write completing with status $0", yb::ToString(status)); + if (!status.ok()) { LOG(INFO) << tablet_peer_->LogPrefix() << "Write failed: " << status; + if (include_trace_ && trace_) { + response_->set_trace_buffer(trace_->DumpToString(true)); + } SetupErrorAndRespond(get_error(), status, context_.get()); return; } @@ -463,8 +473,8 @@ class WriteOperationCompletionCallback { } } - if (include_trace_ && Trace::CurrentTrace() != nullptr) { - response_->set_trace_buffer(Trace::CurrentTrace()->DumpToString(true)); + if (include_trace_ && trace_) { + response_->set_trace_buffer(trace_->DumpToString(true)); } response_->set_propagated_hybrid_time(clock_->Now().ToUint64()); context_->RespondSuccess(); @@ -482,6 +492,7 @@ class WriteOperationCompletionCallback { tablet::WriteOperation* const operation_; server::ClockPtr clock_; const bool include_trace_; + scoped_refptr trace_; }; // Checksums the scan result. diff --git a/src/yb/util/trace.cc b/src/yb/util/trace.cc index 0d5d0d0f39b2..aed8b00c1fac 100644 --- a/src/yb/util/trace.cc +++ b/src/yb/util/trace.cc @@ -56,6 +56,10 @@ DEFINE_int32(tracing_level, 0, "verbosity levels (like --v) up to which tracing TAG_FLAG(tracing_level, advanced); TAG_FLAG(tracing_level, runtime); +DEFINE_int32(print_nesting_levels, 5, "controls the depth of the child traces to be printed."); +TAG_FLAG(print_nesting_levels, advanced); +TAG_FLAG(print_nesting_levels, runtime); + namespace yb { using strings::internal::SubstituteArg; @@ -64,6 +68,8 @@ __thread Trace* Trace::threadlocal_trace_; namespace { +const char* kNestedChildPrefix = ".. "; + // Get the part of filepath after the last path separator. // (Doesn't modify filepath, contrary to basename() in libgen.h.) // Borrowed from glog. @@ -74,24 +80,29 @@ const char* const_basename(const char* filepath) { template void DumpChildren( - std::ostream* out, const std::string& prefix, bool include_time_deltas, - const Children* children) { + std::ostream* out, int32_t tracing_depth, bool include_time_deltas, const Children* children) { + if (tracing_depth > GetAtomicFlag(&FLAGS_print_nesting_levels)) { + return; + } for (auto &child_trace : *children) { - *out << prefix << "Related trace:" << std::endl; - *out << child_trace->DumpToString(prefix, include_time_deltas); + for (int i = 0; i < tracing_depth; i++) { + *out << kNestedChildPrefix; + } + *out << "Related trace:" << std::endl; + *out << child_trace->DumpToString(tracing_depth, include_time_deltas); } } -void DumpChildren(std::ostream* out, const std::string& prefix, - bool include_time_deltas, std::nullptr_t children) { -} - -template -void DumpEntries(std::ostream* out, - const std::string& prefix, - bool include_time_deltas, - int64_t start, - const Entries& entries) { +void DumpChildren( + std::ostream* out, int32_t tracing_depth, bool include_time_deltas, std::nullptr_t children) {} + +template +void DumpEntries( + std::ostream* out, + int32_t tracing_depth, + bool include_time_deltas, + int64_t start, + const Entries& entries) { if (entries.empty()) { return; } @@ -110,7 +121,9 @@ void DumpEntries(std::ostream* out, struct tm tm_time; localtime_r(&secs_since_epoch, &tm_time); - *out << prefix; + for (int i = 0; i < tracing_depth; i++) { + *out << kNestedChildPrefix; + } // Log format borrowed from glog/logging.cc using std::setw; out->fill('0'); @@ -131,18 +144,19 @@ void DumpEntries(std::ostream* out, } } -template -void DoDump(std::ostream* out, - const std::string& prefix, - bool include_time_deltas, - int64_t start, - const Entries& entries, - Children children) { +template +void DoDump( + std::ostream* out, + int32_t tracing_depth, + bool include_time_deltas, + int64_t start, + const Entries& entries, + Children children) { // Save original flags. std::ios::fmtflags save_flags(out->flags()); - DumpEntries(out, prefix, include_time_deltas, start, entries); - DumpChildren(out, prefix + ".. ", include_time_deltas, children); + DumpEntries(out, tracing_depth, include_time_deltas, start, entries); + DumpChildren(out, tracing_depth + 1, include_time_deltas, children); // Restore stream flags. out->flags(save_flags); @@ -317,10 +331,10 @@ void Trace::AddEntry(TraceEntry* entry) { } void Trace::Dump(std::ostream *out, bool include_time_deltas) const { - Dump(out, "", include_time_deltas); + Dump(out, 0, include_time_deltas); } -void Trace::Dump(std::ostream* out, const std::string& prefix, bool include_time_deltas) const { +void Trace::Dump(std::ostream* out, int32_t tracing_depth, bool include_time_deltas) const { // Gather a copy of the list of entries under the lock. This is fast // enough that we aren't worried about stalling concurrent tracers // (whereas doing the logging itself while holding the lock might be @@ -340,16 +354,14 @@ void Trace::Dump(std::ostream* out, const std::string& prefix, bool include_time trace_start_time_usec = trace_start_time_usec_; } - DoDump(out, prefix, - include_time_deltas, - trace_start_time_usec, - entries | boost::adaptors::indirected, - &child_traces); + DoDump( + out, tracing_depth, include_time_deltas, trace_start_time_usec, + entries | boost::adaptors::indirected, &child_traces); } -string Trace::DumpToString(const std::string& prefix, bool include_time_deltas) const { +string Trace::DumpToString(int32_t tracing_depth, bool include_time_deltas) const { std::stringstream s; - Dump(&s, prefix, include_time_deltas); + Dump(&s, tracing_depth, include_time_deltas); return s.str(); } @@ -395,11 +407,10 @@ void PlainTrace::Trace(const char *file_path, int line_number, const char *messa } void PlainTrace::Dump(std::ostream *out, bool include_time_deltas) const { - Dump(out, "", include_time_deltas); + Dump(out, 0, include_time_deltas); } -void PlainTrace::Dump( - std::ostream* out, const std::string& prefix, bool include_time_deltas) const { +void PlainTrace::Dump(std::ostream* out, int32_t tracing_depth, bool include_time_deltas) const { size_t size; decltype(trace_start_time_usec_) trace_start_time_usec; { @@ -408,12 +419,14 @@ void PlainTrace::Dump( trace_start_time_usec = trace_start_time_usec_; } auto entries = boost::make_iterator_range(entries_, entries_ + size); - DoDump(out, prefix, include_time_deltas, trace_start_time_usec, entries, /* children */ nullptr); + DoDump( + out, tracing_depth, include_time_deltas, trace_start_time_usec, entries, + /* children */ nullptr); } -std::string PlainTrace::DumpToString(const std::string& prefix, bool include_time_deltas) const { +std::string PlainTrace::DumpToString(int32_t tracing_depth, bool include_time_deltas) const { std::stringstream s; - Dump(&s, prefix, include_time_deltas); + Dump(&s, tracing_depth, include_time_deltas); return s.str(); } diff --git a/src/yb/util/trace.h b/src/yb/util/trace.h index 1db01140734b..cf71a2de5254 100644 --- a/src/yb/util/trace.h +++ b/src/yb/util/trace.h @@ -82,6 +82,9 @@ DECLARE_int32(tracing_level); #define TRACE(format, substitutions...) \ VTRACE(0, (format), ##substitutions) +#define TRACE_FUNC() \ + TRACE(__func__) + // Like the above, but takes the trace pointer as an explicit argument. #define VTRACE_TO(level, trace, format, substitutions...) \ do { \ @@ -176,12 +179,12 @@ class Trace : public RefCountedThreadSafe { // If 'include_time_deltas' is true, calculates and prints the difference between // successive trace messages. void Dump(std::ostream* out, bool include_time_deltas) const; - void Dump(std::ostream* out, const std::string& prefix, bool include_time_deltas) const; + void Dump(std::ostream* out, int32_t tracing_depth, bool include_time_deltas) const; // Dump the trace buffer as a string. - std::string DumpToString(const std::string& prefix, bool include_time_deltas) const; + std::string DumpToString(int32_t tracing_depth, bool include_time_deltas) const; std::string DumpToString(bool include_time_deltas) const { - return DumpToString("", include_time_deltas); + return DumpToString(0, include_time_deltas); } // Attaches the given trace which will get appended at the end when Dumping. @@ -278,10 +281,10 @@ class PlainTrace { void Trace(const char* file_path, int line_number, const char* message); void Dump(std::ostream* out, bool include_time_deltas) const; - void Dump(std::ostream* out, const std::string& prefix, bool include_time_deltas) const; - std::string DumpToString(const std::string& prefix, bool include_time_deltas) const; + void Dump(std::ostream* out, int32_t tracing_depth, bool include_time_deltas) const; + std::string DumpToString(int32_t tracing_depth, bool include_time_deltas) const; std::string DumpToString(bool include_time_deltas) const { - return DumpToString("", include_time_deltas); + return DumpToString(0, include_time_deltas); } private: