Skip to content

Commit

Permalink
[#15856] YSQL: Pick read time on tserver for new statements in Read C…
Browse files Browse the repository at this point in the history
…ommitted isolation

Summary:
In Read Committed isolation, a new read time is picked for each statement
(i.e., a new logical snapshot of the database is used for each statement's
reads). This is done (in PgClientService) by setting the read time to the
current time at the start of each new statement before issuing requests to any
tserver. However, this might results in high latencies in the first read op that
is executed as part of that statement because the tablet serving the read
(likely on another node) might have to wait for the "safe" time to reach the
picked read time. A long wait for safe time is usually seen when there are
concurrent writes to the tablet and the read enters while the raft replication
that moves the safe time ahead is still in progress (see #11805).

This issue is avoided in Repeatable Read isolation because there, the first
tablet serving the read in a transaction is allowed to pick the read time as the
latest available "safe" time without having to wait for any catchup. This read
time is sent back to PgClientService as used_read_time so that future reads can
use the same read time. Note that even in Repeatable Read isolation, in case,
there are multiple parallel RPCs to various tservers, the read time is still
picked on the PgClientService because otherwise, the rpcs would have to wait for
one of them to execute and came back with a used_read_time.

This diff extends the same logic to Read Committed isolation.
Jira: DB-5248

Test Plan:
./yb_build.sh --java-test org.yb.pgsql.TestPgTransactions#testReadPointInReadCommittedIsolation
./yb_build.sh --java-test org.yb.pgsql.TestPgIsolationRegress

Reviewers: dmitry

Reviewed By: dmitry

Subscribers: dsrinivasan, gkukreja, yql, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D24075
  • Loading branch information
pkj415 committed Jul 7, 2023
1 parent 69b3ce6 commit b223af9
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 42 deletions.
94 changes: 58 additions & 36 deletions src/yb/tserver/pg_client_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ DEFINE_RUNTIME_string(ysql_sequence_cache_method, "connection",
"Where sequence values are cached for both existing and new sequences. Valid values are "
"\"connection\" and \"server\"");

DEFINE_NON_RUNTIME_bool(ysql_rc_force_pick_read_time_on_pg_client, false,
"When resetting read time for a statement in Read Commited isolation level,"
" pick read time on the PgClientService instead of allowing the tserver to"
" pick one.");
TAG_FLAG(ysql_rc_force_pick_read_time_on_pg_client, advanced);

DECLARE_bool(ysql_serializable_isolation_for_ddl_txn);
DECLARE_bool(ysql_ddl_rollback_enabled);

Expand Down Expand Up @@ -983,25 +989,30 @@ Status PgClientSession::DoPerform(const DataPtr& data, CoarseTimePoint deadline,
return Status::OK();
}

void PgClientSession::ProcessReadTimeManipulation(ReadTimeManipulation manipulation) {
Result<PgClientSession::UsedReadTimePtr> PgClientSession::ProcessReadTimeManipulation(
ReadTimeManipulation manipulation) {
switch (manipulation) {
case ReadTimeManipulation::RESET: {
// If a txn_ has been created, session_->read_point() returns the read point stored in txn_.
ConsistentReadPoint* rp = Session(PgClientSessionKind::kPlain)->read_point();
rp->SetCurrentReadTime();

VLOG(1) << "Setting current ht as read point " << rp->GetReadTime();
auto* read_point = Session(PgClientSessionKind::kPlain)->read_point();
if (FLAGS_ysql_rc_force_pick_read_time_on_pg_client ||
Transaction(PgClientSessionKind::kPlain)) {
// If a txn_ has been created, session_->read_point() returns the read point stored in
// txn_.
read_point->SetCurrentReadTime();
VLOG(1) << "Setting current ht as read point " << read_point->GetReadTime();
return PgClientSession::UsedReadTimePtr();
}
return VERIFY_RESULT(ResetReadPoint(PgClientSessionKind::kPlain));
}
return;
case ReadTimeManipulation::RESTART: {
ConsistentReadPoint* rp = Session(PgClientSessionKind::kPlain)->read_point();
auto* rp = Session(PgClientSessionKind::kPlain)->read_point();
rp->Restart();

VLOG(1) << "Restarted read point " << rp->GetReadTime();
}
return;
return PgClientSession::UsedReadTimePtr();
case ReadTimeManipulation::NONE:
return;
return PgClientSession::UsedReadTimePtr();
case ReadTimeManipulation::ReadTimeManipulation_INT_MIN_SENTINEL_DO_NOT_USE_:
case ReadTimeManipulation::ReadTimeManipulation_INT_MAX_SENTINEL_DO_NOT_USE_:
break;
Expand Down Expand Up @@ -1085,37 +1096,29 @@ PgClientSession::SetupSession(

UsedReadTimePtr used_read_time;
if (options.restart_transaction()) {
if(options.ddl_mode()) {
return STATUS(NotSupported, "Not supported to restart DDL transaction");
}
RSTATUS_DCHECK(!options.ddl_mode(), NotSupported, "Restarting a DDL transaction not supported");
Transaction(kind) = VERIFY_RESULT(RestartTransaction(session, transaction));
transaction = Transaction(kind).get();
} else {
const auto has_time_manipulation =
options.read_time_manipulation() != ReadTimeManipulation::NONE;
RSTATUS_DCHECK(
kind == PgClientSessionKind::kPlain ||
options.read_time_manipulation() == ReadTimeManipulation::NONE,
IllegalState,
"Read time manipulation can't be specified for kDdl/ kCatalog transactions");
ProcessReadTimeManipulation(options.read_time_manipulation());
if (options.has_read_time() || options.use_catalog_session()) {
const auto read_time = options.has_read_time() && options.read_time().has_read_ht()
? ReadHybridTime::FromPB(options.read_time()) : ReadHybridTime();
!(has_time_manipulation && options.has_read_time()),
IllegalState, "read_time_manipulation and read_time fields can't be satisfied together");

if (has_time_manipulation) {
RSTATUS_DCHECK(
kind == PgClientSessionKind::kPlain, IllegalState,
"Read time manipulation can't be specified for non kPlain sessions");
used_read_time = VERIFY_RESULT(ProcessReadTimeManipulation(options.read_time_manipulation()));
} else if (options.has_read_time() && options.read_time().has_read_ht()) {
const auto read_time = ReadHybridTime::FromPB(options.read_time());
session->SetReadPoint(read_time);
if (read_time) {
VLOG_WITH_PREFIX(3) << "Read time: " << read_time;
} else {
VLOG_WITH_PREFIX(3) << "Reset read time: " << session->read_point()->GetReadTime();
}
} else if (!transaction &&
(options.ddl_mode() || txn_serial_no_ != options.txn_serial_no())) {
session->SetReadPoint(ReadHybridTime());
if (kind == PgClientSessionKind::kPlain) {
used_read_time = std::weak_ptr<UsedReadTime>(
std::shared_ptr<UsedReadTime>(shared_from_this(), &plain_session_used_read_time_));
std::lock_guard guard(plain_session_used_read_time_.lock);
plain_session_used_read_time_.value = ReadHybridTime();
}
VLOG_WITH_PREFIX(3) << "Reset read time: " << session->read_point()->GetReadTime();
VLOG_WITH_PREFIX(3) << "Read time: " << read_time;
} else if (options.has_read_time() ||
options.use_catalog_session() ||
(!transaction && (txn_serial_no_ != options.txn_serial_no()))) {
used_read_time = VERIFY_RESULT(ResetReadPoint(kind));
} else {
if (!transaction && kind == PgClientSessionKind::kPlain) {
RETURN_NOT_OK(CheckPlainSessionReadTime());
Expand Down Expand Up @@ -1159,6 +1162,25 @@ PgClientSession::SetupSession(
return std::make_pair(sessions_[to_underlying(kind)], used_read_time);
}

Result<PgClientSession::UsedReadTimePtr> PgClientSession::ResetReadPoint(PgClientSessionKind kind) {
auto& data = sessions_[to_underlying(kind)];
RSTATUS_DCHECK(
!data.transaction, IllegalState,
"Can't reset read time in case distributed transaction has started");
auto& session = *data.session;
session.SetReadPoint(ReadHybridTime());
VLOG_WITH_PREFIX(3) << "Reset read time: " << session.read_point()->GetReadTime();

UsedReadTimePtr used_read_time;
if (kind == PgClientSessionKind::kPlain) {
used_read_time = std::weak_ptr(
std::shared_ptr<UsedReadTime>(shared_from_this(), &plain_session_used_read_time_));
std::lock_guard guard(plain_session_used_read_time_.lock);
plain_session_used_read_time_.value = ReadHybridTime();
}
return used_read_time;
}

std::string PgClientSession::LogPrefix() {
return SessionLogPrefix(id_);
}
Expand Down
10 changes: 9 additions & 1 deletion src/yb/tserver/pg_client_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ class PgClientSession : public std::enable_shared_from_this<PgClientSession> {
Status ProcessResponse(
const PgClientSessionOperations& operations, const PgPerformRequestPB& req,
PgPerformResponsePB* resp, rpc::RpcContext* context);
void ProcessReadTimeManipulation(ReadTimeManipulation manipulation);
Result<PgClientSession::UsedReadTimePtr> ProcessReadTimeManipulation(
ReadTimeManipulation manipulation);

client::YBClient& client();
client::YBSessionPtr& EnsureSession(PgClientSessionKind kind);
Expand Down Expand Up @@ -172,6 +173,13 @@ class PgClientSession : public std::enable_shared_from_this<PgClientSession> {
template <class DataPtr>
Status DoPerform(const DataPtr& data, CoarseTimePoint deadline, rpc::RpcContext* context);

// Resets the session's current read point.
//
// For kPlain sessions, also reset the plain session used read time since the tserver will pick a
// read time and send back as "used read time" in the response for use by future rpcs of the
// session.
Result<PgClientSession::UsedReadTimePtr> ResetReadPoint(PgClientSessionKind kind);

const uint64_t id_;
client::YBClient& client_;
scoped_refptr<ClockBase> clock_;
Expand Down
5 changes: 0 additions & 5 deletions src/yb/yql/pggate/pg_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -501,11 +501,6 @@ class PgClient::Impl {
}
union_op.ref_write(&write_op.write_request());
}
if (op->read_time()) {
DCHECK(!req->options().has_isolation() ||
req->options().isolation() != IsolationLevel::SERIALIZABLE_ISOLATION);
op->read_time().AddToPB(req->mutable_options());
}
}
}

Expand Down
78 changes: 78 additions & 0 deletions src/yb/yql/pggate/pg_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,65 @@ bool IsReadOnly(const PgsqlOp& op) {
return op.is_read() && !IsValidRowMarkType(GetRowMarkType(op));
}

Result<ReadHybridTime> GetReadTime(const PgsqlOps& operations) {
ReadHybridTime read_time;
for (const auto& op : operations) {
if (op->read_time()) {
// TODO(#18127): Substitute LOG_IF(WARNING) with SCHECK.
// cumulative_add_cardinality_correction and cumulative_add_comprehensive_promotion in
// TestPgRegressThirdPartyExtensionsHll fail on this check.
LOG_IF(WARNING, read_time && read_time != op->read_time())
<< "Operations in a batch have different read times "
<< read_time << " vs " << op->read_time();
read_time = op->read_time();
}
}
return read_time;
}

// Helper function to chose read time with in_txn_limit from pair of read time.
// All components in read times except in_txn_limit must be equal.
// One of the read time must have in_txn_limit equal to HybridTime::kMax
// (i.e. must be default initialized)
Result<const ReadHybridTime&> ActualReadTime(
std::reference_wrapper<const ReadHybridTime> read_time1,
std::reference_wrapper<const ReadHybridTime> read_time2) {
if (read_time1 == read_time2) {
return read_time1.get();
}
const auto* read_time_with_in_txn_limit_max = &(read_time1.get());
const auto* read_time = &(read_time2.get());
if (read_time_with_in_txn_limit_max->in_txn_limit != HybridTime::kMax) {
std::swap(read_time_with_in_txn_limit_max, read_time);
}
SCHECK(
read_time_with_in_txn_limit_max->in_txn_limit == HybridTime::kMax,
InvalidArgument, "At least one read time with kMax in_txn_limit is expected");
auto tmp_read_time = *read_time;
tmp_read_time.in_txn_limit = read_time_with_in_txn_limit_max->in_txn_limit;
SCHECK(
tmp_read_time == *read_time_with_in_txn_limit_max,
InvalidArgument, "Ambiguous read time $0 $1", read_time1, read_time2);
return *read_time;
}

Status UpdateReadTime(tserver::PgPerformOptionsPB* options, const ReadHybridTime& read_time) {
ReadHybridTime options_read_time;
const auto* actual_read_time = &read_time;
if (options->has_read_time()) {
options_read_time = ReadHybridTime::FromPB(options->read_time());
// In case of follower reads a read time is set in the options, with the in_txn_limit set to
// kMax. But when fetching the next page, the ops_read_time might have a different
// in_txn_limit (received by the tserver with the first page).
// ActualReadTime will select appropriate read time (read time with in_txn_limit)
// and make necessary checks.

actual_read_time = &VERIFY_RESULT_REF(ActualReadTime(options_read_time, read_time));
}
actual_read_time->AddToPB(options);
return Status::OK();
}

} // namespace

//--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -616,6 +675,10 @@ Result<PerformFuture> PgSession::Perform(BufferableOperations&& ops, PerformOpti
if (pg_txn_manager_->IsTxnInProgress()) {
options.mutable_in_txn_limit_ht()->set_value(ops_options.in_txn_limit.ToUint64());
}
auto ops_read_time = VERIFY_RESULT(GetReadTime(ops.operations));
if (ops_read_time) {
RETURN_NOT_OK(UpdateReadTime(&options, ops_read_time));
}
ProcessPerformOnTxnSerialNo(txn_serial_no, ops_options.ensure_read_time_is_set, &options);
}
bool global_transaction = yb_force_global_transaction;
Expand Down Expand Up @@ -667,6 +730,21 @@ Result<PerformFuture> PgSession::Perform(BufferableOperations&& ops, PerformOpti
}
}

// Workaround for index backfill case:
//
// In case of index backfill, the read_time is set and is to be used for reading. However, if
// read committed isolation is enabled, the read_time_manipulation is also set to RESET for
// index backfill since it is a non-DDL statement.
//
// As a workaround, clear the read time manipulation to prefer read time over manipulation in
// case both are set. Remove after proper fix in context of GH #18080.
if (options.read_time_manipulation() != tserver::ReadTimeManipulation::NONE &&
options.has_read_time()) {
options.clear_read_time_manipulation();
}

DCHECK(!options.has_read_time() || options.isolation() != IsolationLevel::SERIALIZABLE_ISOLATION);

pg_client_.PerformAsync(&options, &ops.operations, [promise](const PerformResult& result) {
promise->set_value(result);
});
Expand Down

0 comments on commit b223af9

Please sign in to comment.