Skip to content

Commit

Permalink
[#16696] DocDB: Expose endpoint in pg_client_session to cancel a tran…
Browse files Browse the repository at this point in the history
…saction

Summary:
This diff introduces support for cancelling a transaction given its transaction id. Providing a status tablet id is left as optional.

When a status tablet id provided, we just check that tablet alone. When left empty, we check all the global txn status tablets and the local txn status tablets and try to cancel the txn. If we are unable to cancel the transaction for some reason, the `error` field in the `CancelTransactionResponsePB` is populated, which is returned back to pg as a status using the `status` field in `PgCancelTransactionResponsePB`.

Note: When a transaction undergoes promotion, there is a minor period in which both the old and the new status tablet assume they are responsible for the transaction. In such cases, where multiple tablets report cancelation statuses, any reported error(s) take precedence. So it might also happen that we actually abort the txn, but report an error to the client. Subsequent cancel calls should report a `NOT_FOUND` in that case. We do this to be on the safer side as reporting `ABORTED` while the txn may still be active is not at all acceptable.
Jira: DB-6068

Test Plan:
./yb_build.sh --cxx-test pgwrapper_pg_cancel_transaction-test
./yb_build.sh --cxx-test pgwrapper_pg_get_lock_status-test

Old plan, but still can test using the following
git cherry-pick commit basavaraj29@a864741 and basavaraj29@5ab76b0

create a cluster using the following
```
./bin/yb-ctl create --rf=3 --placement_info "cloud0.rack1.zone,cloud0.rack2.zone,cloud0.rack3.zone" --data_dir ~/yugabyte-data --tserver_flags 'ysql_num_shards_per_tserver=1,ysql_pg_conf_csv="statement_timeout=0",enable_wait_queues=true,enable_deadlock_detection=true,enable_intentsdb_page=true' --master_flags 'auto_create_local_transaction_tables=true,auto_promote_nonlocal_transactions_to_global=true,enable_ysql_tablespaces_for_placement=true,ysql_tablespace_info_refresh_secs=1,client_read_write_timeout_ms=60000'
```

```
CREATE TABLESPACE tablespace1 WITH (replica_placement='{"num_replicas": 1,"placement_blocks":[{"cloud": "cloud0","region": "rack1","zone": "zone","min_num_replicas": 1}]}');
CREATE TABLE test(k int PRIMARY KEY, v int);
INSERT INTO test(k, v) VALUES (0,0),(1,1);
CREATE TABLE foo(k int, v int) TABLESPACE tablespace1;
INSERT INTO foo(k, v) VALUES (0,0),(1,1);
```

operating on foo produces a local txn, and operating on test produces a global txn
```
build/latest/bin/yb-ts-cli cancel_transaction <txn_id> [<status_tablet_id>]
```

Reviewers: esheng, tvesely, rsami

Reviewed By: rsami

Subscribers: ybase, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D24476
  • Loading branch information
basavaraj29 committed Jun 14, 2023
1 parent 399340e commit 3097369
Show file tree
Hide file tree
Showing 16 changed files with 983 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/yb/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4290,7 +4290,7 @@ Status Tablet::GetLockStatus(const std::set<TransactionId>& transaction_ids,
intent_iter->SeekToFirst();

if (transaction_ids.empty()) {
// If transaction_ids is not empty, iterate over all records in intents_db.
// If transaction_ids is empty, iterate over all records in intents_db.
while (intent_iter->Valid()) {
auto key = intent_iter->key();

Expand Down
95 changes: 65 additions & 30 deletions src/yb/tablet/transaction_coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ DEFINE_test_flag(bool, disable_cleanup_applied_transactions, false,
DEFINE_test_flag(bool, disable_apply_committed_transactions, false,
"Should we disable the apply of committed transactions.");

DEFINE_test_flag(bool, mock_cancel_unhosted_transactions, false,
"When enabled, the flag alters the behavior of the txn coordinator to falsely "
"claim successful cancelation of transactions it does not actually host.");

DEFINE_test_flag(bool, fail_abort_request_with_try_again, false,
"When enabled, the txn coordinator responds to all abort transaction requests "
"with TryAgain error status, for the set of transactions it hosts.");

DEFINE_RUNTIME_uint32(external_transaction_apply_rpc_limit, 0,
"Limit on the number of outstanding APPLY external transaction rpcs sent to "
"involved tablets at a given time. If set to 0, the default is half "
Expand Down Expand Up @@ -1249,45 +1257,37 @@ class TransactionCoordinator::Impl : public TransactionStateContext,
return TransactionStatusResult{TransactionStatus::PENDING, result.status_time.Decremented()};
}

void Abort(const std::string& transaction_id, int64_t term, TransactionAbortCallback callback) {
void Abort(const TransactionId& transaction_id, int64_t term, TransactionAbortCallback callback) {
AtomicFlagSleepMs(&FLAGS_TEST_inject_txn_get_status_delay_ms);

auto id = FullyDecodeTransactionId(transaction_id);
VLOG_WITH_PREFIX_AND_FUNC(4) << "transaction_id: " << id << ".";
if (!id.ok()) {
callback(id.status());
std::unique_lock<std::mutex> lock(managed_mutex_);
auto it = managed_transactions_.find(transaction_id);
if (it == managed_transactions_.end()) {
lock.unlock();
VLOG_WITH_PREFIX_AND_FUNC(4) << "transaction_id: " << transaction_id << " not found.";
callback(TransactionStatusResult::Aborted());
return;
}
Abort(*id, term, callback);

DoAbort(it, term, std::move(callback), std::move(lock));
}

void Abort(const TransactionId& transaction_id, int64_t term, TransactionAbortCallback callback) {
PostponedLeaderActions actions;
{
std::unique_lock<std::mutex> lock(managed_mutex_);
auto it = managed_transactions_.find(transaction_id);
if (it == managed_transactions_.end()) {
lock.unlock();
VLOG_WITH_PREFIX_AND_FUNC(4) << "transaction_id: " << transaction_id << " not found.";
bool CancelTransactionIfFound(
const TransactionId& transaction_id, int64_t term, TransactionAbortCallback callback) {

std::unique_lock<std::mutex> lock(managed_mutex_);
auto it = managed_transactions_.find(transaction_id);
if (it == managed_transactions_.end()) {
lock.unlock();
if (PREDICT_FALSE(FLAGS_TEST_mock_cancel_unhosted_transactions)) {
callback(TransactionStatusResult::Aborted());
return;
}
VLOG_WITH_PREFIX_AND_FUNC(4)
<< "transaction_id: " << transaction_id << " found, aborting now.";
postponed_leader_actions_.leader_term = term;
boost::optional<TransactionStatusResult> status;
managed_transactions_.modify(it, [&status, &callback](TransactionState& state) {
status = state.Abort(&callback);
});
if (callback) {
lock.unlock();
callback(*status);
return;
return true;
}
actions.Swap(&postponed_leader_actions_);
return false;
}

ExecutePostponedLeaderActions(&actions);
DoAbort(it, term, std::move(callback), std::move(lock));
return true;
}

size_t test_count_transactions() {
Expand Down Expand Up @@ -1799,6 +1799,36 @@ class TransactionCoordinator::Impl : public TransactionStateContext,
}
}

void DoAbort(
ManagedTransactions::iterator it, int64_t term, TransactionAbortCallback callback,
std::unique_lock<std::mutex> lock) {
CHECK(it != managed_transactions_.end());
VLOG_WITH_PREFIX_AND_FUNC(4) << "transaction_id: " << it->id() << " found, aborting now.";

if (PREDICT_FALSE(FLAGS_TEST_fail_abort_request_with_try_again)) {
lock.unlock();
callback(STATUS_FORMAT(
TryAgain, "Test flag fail_abort_request_with_try_again is enabled."));
return;
}

PostponedLeaderActions actions;
postponed_leader_actions_.leader_term = term;
TransactionStatusResult status;
managed_transactions_.modify(it, [&status, &callback](TransactionState& state) {
status = state.Abort(&callback);
});
if (callback) {
lock.unlock();
callback(status);
return;
}
actions.Swap(&postponed_leader_actions_);
lock.unlock();

ExecutePostponedLeaderActions(&actions);
}

TransactionCoordinatorContext& context_;
Counter& expired_metric_;
const std::string log_prefix_;
Expand Down Expand Up @@ -1870,12 +1900,17 @@ Status TransactionCoordinator::GetStatus(
return impl_->GetStatus(transaction_ids, deadline, response);
}

void TransactionCoordinator::Abort(const std::string& transaction_id,
void TransactionCoordinator::Abort(const TransactionId& transaction_id,
int64_t term,
TransactionAbortCallback callback) {
impl_->Abort(transaction_id, term, std::move(callback));
}

bool TransactionCoordinator::CancelTransactionIfFound(
const TransactionId& transaction_id, int64_t term, TransactionAbortCallback callback) {
return impl_->CancelTransactionIfFound(transaction_id, term, std::move(callback));
}

std::string TransactionCoordinator::DumpTransactions() {
return impl_->DumpTransactions();
}
Expand Down
8 changes: 7 additions & 1 deletion src/yb/tablet/transaction_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,13 @@ class TransactionCoordinator {
CoarseTimePoint deadline,
tserver::GetTransactionStatusResponsePB* response);

void Abort(const std::string& transaction_id, int64_t term, TransactionAbortCallback callback);
void Abort(const TransactionId& transaction_id, int64_t term, TransactionAbortCallback callback);

// CancelTransactionIfFound returns true if the transaction is found in the list of managed txns
// at the coordinator, and invokes the callback with the cancelation status. If the txn isn't
// found, it returns false and the callback is not invoked.
bool CancelTransactionIfFound(
const TransactionId& transaction_id, int64_t term, TransactionAbortCallback callback);

std::string DumpTransactions();

Expand Down
14 changes: 14 additions & 0 deletions src/yb/tserver/pg_client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ service PgClientService {
rpc CheckIfPitrActive(PgCheckIfPitrActiveRequestPB) returns (PgCheckIfPitrActiveResponsePB);
rpc GetTserverCatalogVersionInfo(PgGetTserverCatalogVersionInfoRequestPB)
returns (PgGetTserverCatalogVersionInfoResponsePB);
rpc CancelTransaction(PgCancelTransactionRequestPB) returns (PgCancelTransactionResponsePB);
}

message PgHeartbeatRequestPB {
Expand Down Expand Up @@ -649,3 +650,16 @@ message PgWaitForBackendsCatalogVersionResponsePB {
// version. If the field is -1, the counting is in-progress.
int32 num_lagging_backends = 2;
}

message PgCancelTransactionRequestPB {
bytes transaction_id = 1;

// Populating the field is optional. When populated, the cancel request is sent to the specified
// status tablet alone. If not, the request is forwarded to all tablet servers and is thereafter
// broadcast to all status tablets hosted by the tablet server.
bytes status_tablet_id = 2;
}

message PgCancelTransactionResponsePB {
AppStatusPB status = 1;
}
113 changes: 113 additions & 0 deletions src/yb/tserver/pg_client_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include "yb/tserver/tserver_service.proxy.h"

#include "yb/util/flags.h"
#include "yb/util/logging.h"
#include "yb/util/net/net_util.h"
#include "yb/util/random_util.h"
#include "yb/util/result.h"
Expand Down Expand Up @@ -156,6 +157,8 @@ class PgClientSessionLocker {
};

using LockablePgClientSessionPtr = std::shared_ptr<LockablePgClientSession>;
using RemoteTabletServerPtr = std::shared_ptr<client::internal::RemoteTabletServer>;
using client::internal::RemoteTabletPtr;

void GetTablePartitionList(const client::YBTablePtr& table, PgTablePartitionsPB* partition_list) {
const auto table_partition_list = table->GetVersionedPartitions();
Expand Down Expand Up @@ -490,6 +493,116 @@ class PgClientServiceImpl::Impl {
table_cache_.InvalidateAll(CoarseMonoClock::Now());
}

// Return the TabletServer hosting the specified status tablet.
std::future<Result<RemoteTabletServerPtr>> GetTServerHostingStatusTablet(
const TabletId& status_tablet_id, CoarseTimePoint deadline) {

return MakeFuture<Result<RemoteTabletServerPtr>>([&](auto callback) {
client().LookupTabletById(
status_tablet_id, /* table =*/ nullptr, master::IncludeInactive::kFalse,
master::IncludeDeleted::kFalse, deadline,
[&, status_tablet_id, callback] (const auto& lookup_result) {
if (!lookup_result.ok()) {
return callback(lookup_result.status());
}

auto& remote_tablet = *lookup_result;
if (!remote_tablet) {
return callback(STATUS_FORMAT(
InvalidArgument,
Format("Status tablet with id: $0 not found", status_tablet_id)));
}

if (!remote_tablet->LeaderTServer()) {
return callback(STATUS_FORMAT(
TryAgain, Format("Leader not found for tablet $0", status_tablet_id)));
}
const auto& permanent_uuid = remote_tablet->LeaderTServer()->permanent_uuid();
callback(client().GetRemoteTabletServer(permanent_uuid));
},
// Force a client cache refresh so as to not hit NOT_LEADER error.
client::UseCache::kFalse);
});
}

Result<std::vector<RemoteTabletServerPtr>> GetAllLiveTservers() {
std::vector<RemoteTabletServerPtr> remote_tservers;
std::vector<master::TSInformationPB> live_tservers;
RETURN_NOT_OK(tablet_server_.GetLiveTServers(&live_tservers));
for (const auto& live_ts : live_tservers) {
const auto& permanent_uuid = live_ts.tserver_instance().permanent_uuid();
remote_tservers.push_back(VERIFY_RESULT(client().GetRemoteTabletServer(permanent_uuid)));
}
return remote_tservers;
}

Status CancelTransaction(const PgCancelTransactionRequestPB& req,
PgCancelTransactionResponsePB* resp,
rpc::RpcContext* context) {
if (req.transaction_id().empty()) {
return STATUS_FORMAT(IllegalState,
"Transaction Id not provided in PgCancelTransactionRequestPB");
}
tserver::CancelTransactionRequestPB node_req;
node_req.set_transaction_id(req.transaction_id());

std::vector<RemoteTabletServerPtr> remote_tservers;
if (req.status_tablet_id().empty()) {
remote_tservers = VERIFY_RESULT(GetAllLiveTservers());
} else {
const auto& remote_ts = VERIFY_RESULT(GetTServerHostingStatusTablet(
req.status_tablet_id(), context->GetClientDeadline()).get());
remote_tservers.push_back(remote_ts);
node_req.set_status_tablet_id(req.status_tablet_id());
}

std::vector<std::future<Status>> status_future;
std::vector<std::shared_ptr<tserver::CancelTransactionResponsePB>>
node_resp(remote_tservers.size(), std::make_shared<tserver::CancelTransactionResponsePB>());

for (size_t i = 0 ; i < remote_tservers.size() ; i++) {
const auto& proxy = remote_tservers[i]->proxy();
std::shared_ptr<rpc::RpcController> controller;
status_future.push_back(
MakeFuture<Status>([&, controller](auto callback) {
proxy->CancelTransactionAsync(
node_req, node_resp[i].get(), controller.get(), [callback, controller] {
callback(controller->status());
});
}));
}

auto status = STATUS_FORMAT(NotFound, "Unable to cancel transaction");
resp->Clear();
for (size_t i = 0 ; i < status_future.size() ; i++) {
const auto& s = status_future[i].get();
if (!s.ok()) {
LOG(WARNING) << "CancelTransaction request to TS failed with status: " << s;
continue;
}

if (node_resp[i]->has_error()) {
// Errors take precedence over TransactionStatus::ABORTED statuses. This needs to be done to
// correctly handle cancelation requests of promoted txns. Ignore all NotFound statuses as
// we collate them, collect all other error types.
const auto& status_from_pb = StatusFromPB(node_resp[i]->error().status());
if (status_from_pb.IsNotFound()) {
continue;
}
status = status_from_pb.CloneAndAppend("\n").CloneAndAppend(status.message());
}

// One of the TServers reported successfull cancelation of the transaction. Reset the status
// if we haven't seen any errors other than NOT_FOUND from the remaining TServers.
if (status.IsNotFound()) {
status = Status::OK();
}
}

StatusToPB(status, resp->mutable_status());
return Status::OK();
}

#define PG_CLIENT_SESSION_METHOD_FORWARD(r, data, method) \
Status method( \
const BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)& req, \
Expand Down
1 change: 1 addition & 0 deletions src/yb/tserver/pg_client_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class PgMutationCounter;
(CheckIfPitrActive) \
(GetTserverCatalogVersionInfo) \
(WaitForBackendsCatalogVersion) \
(CancelTransaction) \
/**/

class PgClientServiceImpl : public PgClientServiceIf {
Expand Down
Loading

0 comments on commit 3097369

Please sign in to comment.