Skip to content

Commit

Permalink
[#12959] YSQL: Refactor code to disambiguate in txn limit and index b…
Browse files Browse the repository at this point in the history
…ackfill read time

Summary:
This refactor is needed to ease understanding of a larger change that is being
planned next that will allow a backend to switch back and forth between
snapshots (i.e., consistent read points) in READ COMMITTED isolation.

Till now, the read time required by backfill and the in txn limit logic were using
the same statement_read_time variable in exec parameters. This is confusing
and might lead to bugs later. This diff separates both.

Test Plan:
./yb_build.sh --cxx-test pgwrapper_pg_index_backfill-test --gtest_filter PgIndexBackfillTest.CreateUniqueIndexWriteAfterSafeTime
./yb_build.sh --cxx-test pgwrapper_pg_index_backfill-test --gtest_filter PgIndexBackfillTest.ReadTime

Reviewers: jason, dmitry

Reviewed By: dmitry

Subscribers: yql

Differential Revision: https://phabricator.dev.yugabyte.com/D17770
  • Loading branch information
pkj415 committed Jun 27, 2022
1 parent 7ab10bc commit 893cb65
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 56 deletions.
2 changes: 1 addition & 1 deletion src/postgres/src/backend/access/yb_access/yb_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -2261,7 +2261,7 @@ YBCLockTuple(Relation relation, Datum ybctid, RowMarkType mode, LockWaitPolicy w
exec_params.limit_count = 1;
exec_params.rowmark = mode;
exec_params.wait_policy = wait_policy;
exec_params.statement_read_time = estate->yb_exec_params.statement_read_time;
exec_params.statement_in_txn_limit = estate->yb_exec_params.statement_in_txn_limit;

HTSU_Result res = HeapTupleMayBeUpdated;
MemoryContext exec_context = GetCurrentMemoryContext();
Expand Down
2 changes: 1 addition & 1 deletion src/postgres/src/backend/catalog/index.c
Original file line number Diff line number Diff line change
Expand Up @@ -2806,7 +2806,7 @@ IndexBuildHeapRangeScanInternal(Relation heapRelation,
{
if (bfinfo->bfinstr)
exec_params->bfinstr = pstrdup(bfinfo->bfinstr);
*exec_params->statement_read_time = bfinfo->read_time;
exec_params->backfill_read_time = bfinfo->read_time;
exec_params->partition_key = pstrdup(bfinfo->row_bounds->partition_key);
exec_params->out_param = bfresult;
exec_params->is_index_backfill = true;
Expand Down
14 changes: 8 additions & 6 deletions src/postgres/src/backend/executor/execUtils.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,24 +174,26 @@ CreateExecutorState(void)
estate->yb_es_is_fk_check_disabled = false;
estate->yb_conflict_slot = NULL;
/*
* The read hybrid time used for this query. This will be initialized
* by the first read operation invoked for this query, and all later
* reads performed by this query will not read any data written past this time.
* The in txn limit used for this query. This will be initialized by the first
* read operation invoked for this query, and all later reads performed by
* this query will not read any data written past this time.
*
* TODO: Each query can have multiple "EState"s. Fix logic for such cases.
*/
estate->yb_es_read_ht = 0;
estate->yb_es_in_txn_limit_ht = 0;

estate->yb_exec_params.limit_count = 0;
estate->yb_exec_params.limit_offset = 0;
estate->yb_exec_params.limit_use_default = true;
estate->yb_exec_params.rowmark = -1;
estate->yb_exec_params.is_index_backfill = false;
/*
* Pointer to the query read hybrid time. This pointer is passed
* Pointer to the query's in txn limit. This pointer is passed
* down to all the DocDB read operations invoked for this query. Only
* the first read operation initializes its value, and all the other
* operations ensure that they don't read any value written past this time.
*/
estate->yb_exec_params.statement_read_time = &estate->yb_es_read_ht;
estate->yb_exec_params.statement_in_txn_limit = &estate->yb_es_in_txn_limit_ht;

return estate;
}
Expand Down
4 changes: 2 additions & 2 deletions src/postgres/src/include/nodes/execnodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -593,12 +593,12 @@ typedef struct EState
YBCPgExecParameters yb_exec_params;

/*
* The read hybrid time used for this query. This value is initialized
* The in txn limit used for this query. This value is initialized
* to 0, and later updated by the first read operation initiated for this
* query. All later read operations are then ensured that they will never
* read any data written past this time.
*/
uint64_t yb_es_read_ht;
uint64_t yb_es_in_txn_limit_ht;
} EState;

/*
Expand Down
30 changes: 16 additions & 14 deletions src/yb/yql/pggate/pg_doc_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ Status PgDocResult::ProcessSparseSystemColumns(std::string *reservoir) {

//--------------------------------------------------------------------------------------------------

PgDocResponse::PgDocResponse(PerformFuture future, uint64_t used_read_time)
: holder_(PerformInfo{.future = std::move(future), .used_read_time = used_read_time}) {}
PgDocResponse::PgDocResponse(PerformFuture future, uint64_t in_txn_limit)
: holder_(PerformInfo{.future = std::move(future), .in_txn_limit = in_txn_limit}) {}

PgDocResponse::PgDocResponse(ProviderPtr provider)
: holder_(std::move(provider)) {}
Expand All @@ -211,7 +211,7 @@ bool PgDocResponse::Valid() const {
Result<PgDocResponse::Data> PgDocResponse::Get() {
if (std::holds_alternative<PerformInfo>(holder_)) {
auto& info = std::get<PerformInfo>(holder_);
return Data(VERIFY_RESULT(info.future.Get()), info.used_read_time);
return Data(VERIFY_RESULT(info.future.Get()), info.in_txn_limit);
}
// Detach provider pointer after first usage to make PgDocResponse::Valid return false.
ProviderPtr provider;
Expand Down Expand Up @@ -357,7 +357,7 @@ Status PgDocOp::SendRequestImpl(bool force_non_bufferable) {
size_t send_count = std::min(parallelism_level_, active_op_count_);
response_ = VERIFY_RESULT(sender_(
pg_session_.get(), pgsql_ops_.data(), send_count,
*table_, GetReadTime(), force_non_bufferable));
*table_, GetInTxnLimit(), force_non_bufferable));
return Status::OK();
}

Expand All @@ -381,7 +381,7 @@ Result<std::list<PgDocResult>> PgDocOp::ProcessResponseImpl(
}
const auto& data = *response;
auto result = VERIFY_RESULT(ProcessCallResponse(*data.response));
GetReadTime() = data.used_read_time;
GetInTxnLimit() = data.in_txn_limit;
RETURN_NOT_OK(CompleteProcessResponse());
return result;
}
Expand Down Expand Up @@ -445,9 +445,9 @@ Result<std::list<PgDocResult>> PgDocOp::ProcessCallResponse(const rpc::CallRespo
return result;
}

uint64_t& PgDocOp::GetReadTime() {
return (read_time_ || !exec_params_.statement_read_time)
? read_time_ : *exec_params_.statement_read_time;
uint64_t& PgDocOp::GetInTxnLimit() {
return exec_params_.statement_in_txn_limit ? *exec_params_.statement_in_txn_limit
: in_txn_limit_;
}

Status PgDocOp::CreateRequests() {
Expand All @@ -474,10 +474,10 @@ Status PgDocOp::CompleteRequests() {

Result<PgDocResponse> PgDocOp::DefaultSender(
PgSession* session, const PgsqlOpPtr* ops, size_t ops_count, const PgTableDesc& table,
uint64_t read_time, bool force_non_bufferable) {
uint64_t in_txn_limit, bool force_non_bufferable) {
auto result = VERIFY_RESULT(session->RunAsync(
ops, ops_count, table, &read_time, force_non_bufferable));
return PgDocResponse(std::move(result), read_time);
ops, ops_count, table, &in_txn_limit, force_non_bufferable));
return PgDocResponse(std::move(result), in_txn_limit);
}

//-------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -509,7 +509,7 @@ Status PgDocReadOp::ExecuteInit(const PgExecParameters *exec_params) {
SetRequestPrefetchLimit();
SetBackfillSpec();
SetRowMark();
SetReadTime();
SetReadTimeForBackfill();
return Status::OK();
}

Expand Down Expand Up @@ -1032,10 +1032,12 @@ void PgDocReadOp::SetBackfillSpec() {
}
}

void PgDocReadOp::SetReadTime() {
void PgDocReadOp::SetReadTimeForBackfill() {
if (exec_params_.is_index_backfill) {
read_op_->read_request().set_is_for_backfill(true);
read_op_->set_read_time(ReadHybridTime::FromUint64(GetReadTime()));
// TODO: Change to RSTATUS_DCHECK
DCHECK(exec_params_.backfill_read_time);
read_op_->set_read_time(ReadHybridTime::FromUint64(exec_params_.backfill_read_time));
}
}

Expand Down
34 changes: 20 additions & 14 deletions src/yb/yql/pggate/pg_doc_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,11 @@ class PgDocResult {
class PgDocResponse {
public:
struct Data {
Data(const rpc::CallResponsePtr& response_, uint64_t used_read_time_)
: response(response_), used_read_time(used_read_time_) {
Data(const rpc::CallResponsePtr& response_, uint64_t in_txn_limit_)
: response(response_), in_txn_limit(in_txn_limit_) {
}
rpc::CallResponsePtr response;
uint64_t used_read_time;
uint64_t in_txn_limit;
};

class Provider {
Expand All @@ -228,7 +228,7 @@ class PgDocResponse {
using ProviderPtr = std::unique_ptr<Provider>;

PgDocResponse() = default;
PgDocResponse(PerformFuture future, uint64_t used_read_time);
PgDocResponse(PerformFuture future, uint64_t in_txn_limit);
explicit PgDocResponse(ProviderPtr provider);

bool Valid() const;
Expand All @@ -237,7 +237,7 @@ class PgDocResponse {
private:
struct PerformInfo {
PerformFuture future;
uint64_t used_read_time;
uint64_t in_txn_limit;
};
std::variant<PerformInfo, ProviderPtr> holder_;
};
Expand Down Expand Up @@ -307,7 +307,7 @@ class PgDocOp : public std::enable_shared_from_this<PgDocOp> {
const PgTable& table() const { return table_; }

protected:
uint64_t& GetReadTime();
uint64_t& GetInTxnLimit();

// Populate Protobuf requests using the collected information for this DocDB operator.
virtual Result<bool> DoCreateRequests() = 0;
Expand Down Expand Up @@ -337,8 +337,6 @@ class PgDocOp : public std::enable_shared_from_this<PgDocOp> {
// Clone READ or WRITE "template_op_" into new operators.
virtual PgsqlOpPtr CloneFromTemplate() = 0;

void SetReadTime();

private:
Status SendRequest(bool force_non_bufferable);

Expand All @@ -356,16 +354,24 @@ class PgDocOp : public std::enable_shared_from_this<PgDocOp> {

static Result<PgDocResponse> DefaultSender(
PgSession* session, const PgsqlOpPtr* ops, size_t ops_count, const PgTableDesc& table,
uint64_t read_time, bool force_non_bufferable);
uint64_t in_txn_limit, bool force_non_bufferable);

//----------------------------------- Data Members -----------------------------------------------
protected:
// Session control.
PgSession::ScopedRefPtr pg_session_;

// Operation time. This time is set at the start and must stay the same for the lifetime of the
// operation to ensure that it is operating on one snapshot.
uint64_t read_time_ = 0;
// This time is set at the start (i.e., before sending the first batch of PgsqlOp ops) and must
// stay the same for the lifetime of the PgDocOp.
//
// Each query must only see data written by earlier queries in the same transaction, not data
// written by itself. Setting it at the start ensures that future operations of the PgDocOp only
// see data written by previous queries.
//
// NOTE: Each query might result in many PgDocOps. So using 1 in_txn_limit_ per PgDocOp is not
// enough. The same should be used across all PgDocOps in the query. This is ensured by the use
// of statement_in_txn_limit in yb_exec_params of EState.
uint64_t in_txn_limit_ = 0;

// Target table.
PgTable& table_;
Expand Down Expand Up @@ -533,8 +539,8 @@ class PgDocReadOp : public PgDocOp {
// Set the row_mark_type field of our read request based on our exec control parameter.
void SetRowMark();

// Set the read_time for our read request based on our exec control parameter.
void SetReadTime();
// Set the read_time for our backfill's read request based on our exec control parameter.
void SetReadTimeForBackfill();

// Clone the template into actual requests to be sent to server.
PgsqlOpPtr CloneFromTemplate() override {
Expand Down
12 changes: 6 additions & 6 deletions src/yb/yql/pggate/pg_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class PgSession::RunHelper {

Status Apply(const PgTableDesc& table,
const PgsqlOpPtr& op,
uint64_t* read_time,
uint64_t* in_txn_limit,
bool force_non_bufferable) {
auto& buffer = pg_session_.buffer_;
// Try buffering this operation if it is a write operation, buffering is enabled and no
Expand All @@ -222,11 +222,11 @@ class PgSession::RunHelper {
IllegalState,
"Buffered operations must be flushed before applying first non-bufferable operation");
// Buffered operations can't be combined within single RPC with non bufferable operation
// in case non bufferable operation has preset read_time.
// in case non bufferable operation has preset in_txn_limit.
// Buffered operations must be flushed independently in this case.
// Also operations for catalog session can be combined with buffered operations
// as catalog session is used for read-only operations.
if ((IsTransactional() && read_time && *read_time) || IsCatalog()) {
if ((IsTransactional() && in_txn_limit && *in_txn_limit) || IsCatalog()) {
RETURN_NOT_OK(buffer.Flush());
} else {
operations_ = VERIFY_RESULT(buffer.FlushTake(table, *op, IsTransactional()));
Expand All @@ -252,7 +252,7 @@ class PgSession::RunHelper {
read_only = read_only && !IsValidRowMarkType(row_mark_type);

return pg_session_.pg_txn_manager_->CalculateIsolation(
read_only, txn_priority_requirement, read_time);
read_only, txn_priority_requirement, in_txn_limit);
}

Result<PerformFuture> Flush() {
Expand Down Expand Up @@ -744,7 +744,7 @@ Status PgSession::ValidatePlacement(const string& placement_info) {
}

Result<PerformFuture> PgSession::RunAsync(
const OperationGenerator& generator, uint64_t* read_time, bool force_non_bufferable) {
const OperationGenerator& generator, uint64_t* in_txn_limit, bool force_non_bufferable) {
auto table_op = generator();
SCHECK(table_op.operation, IllegalState, "Operation list must not be empty");
const auto* table = table_op.table;
Expand All @@ -763,7 +763,7 @@ Result<PerformFuture> PgSession::RunAsync(
IllegalState,
"Operations on different sessions can't be mixed");
has_write_ops_in_ddl_mode_ = has_write_ops_in_ddl_mode_ || (ddl_mode && !IsReadOnly(**op));
RETURN_NOT_OK(runner.Apply(*table, *op, read_time, force_non_bufferable));
RETURN_NOT_OK(runner.Apply(*table, *op, in_txn_limit, force_non_bufferable));
}
return runner.Flush();
}
Expand Down
20 changes: 10 additions & 10 deletions src/yb/yql/pggate/pggate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,21 @@ class PrecastRequestSender {
public:
Result<PgDocResponse> Send(
PgSession* session, const PgsqlOpPtr* ops, size_t ops_count, const PgTableDesc& table,
uint64_t read_time, bool force_non_bufferable) {
uint64_t in_txn_limit, bool force_non_bufferable) {
if (!collecting_mode_) {
auto future = VERIFY_RESULT(session->RunAsync(
ops, ops_count, table, &read_time, force_non_bufferable));
return PgDocResponse(std::move(future), read_time);
ops, ops_count, table, &in_txn_limit, force_non_bufferable));
return PgDocResponse(std::move(future), in_txn_limit);
}
// For now PrecastRequestSender can work with zero read time only.
// Zero read time means that current time should be used as read time.
RSTATUS_DCHECK(!read_time, IllegalState, "Only zero read time is expected");
// For now PrecastRequestSender can work with zero in txn limit only.
// Zero read time means that current time should be used as in txn limit.
RSTATUS_DCHECK(!in_txn_limit, IllegalState, "Only zero read time is expected");
for (auto end = ops + ops_count; ops != end; ++ops) {
ops_.emplace_back(*ops, table);
}
if (!provider_state_) {
provider_state_ = std::make_shared<ResponseProvider::State>(
rpc::CallResponsePtr(), 0 /* used_read_time */);
rpc::CallResponsePtr(), 0 /* in_txn_limit */);
}
return PgDocResponse(std::make_unique<ResponseProvider>(provider_state_));
}
Expand All @@ -231,7 +231,7 @@ class PrecastRequestSender {
}
auto& info = *i++;
return PgSession::TableOperation{.operation = &info.operation, .table = info.table};
}), &provider_state_->used_read_time, false /* force_non_bufferable */));
}), &provider_state_->in_txn_limit, false /* force_non_bufferable */));
provider_state_->response = VERIFY_RESULT(perform_future.Get());
return Status::OK();
}
Expand Down Expand Up @@ -262,8 +262,8 @@ Status FetchExistingYbctids(PgSession::ScopedRefPtr session,
boost::container::small_vector<std::unique_ptr<PgDocReadOp>, 16> doc_ops;
auto request_sender = [&precast_sender](
PgSession* session, const PgsqlOpPtr* ops, size_t ops_count, const PgTableDesc& table,
uint64_t read_time, bool force_non_bufferable) {
return precast_sender.Send(session, ops, ops_count, table, read_time, force_non_bufferable);
uint64_t in_txn_limit, bool force_non_bufferable) {
return precast_sender.Send(session, ops, ops_count, table, in_txn_limit, force_non_bufferable);
};
// Start all the doc_ops to read from docdb in parallel, one doc_op per table ID.
// Each doc_op will use request_sender to send all the requests with single perform RPC.
Expand Down
6 changes: 4 additions & 2 deletions src/yb/yql/pggate/ybc_pg_typedefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ typedef struct PgExecParameters {
int rowmark = -1;
int wait_policy = 2; // Cast to yb::WaitPolicy for C++ use. (2 is for yb::WAIT_ERROR)
char *bfinstr = NULL;
uint64_t* statement_read_time = NULL;
uint64_t backfill_read_time = 0;
uint64_t* statement_in_txn_limit = NULL;
char *partition_key = NULL;
PgExecOutParam *out_param = NULL;
bool is_index_backfill = false;
Expand All @@ -306,7 +307,8 @@ typedef struct PgExecParameters {
int rowmark;
int wait_policy; // Cast to LockWaitPolicy for C use
char *bfinstr;
uint64_t* statement_read_time;
uint64_t backfill_read_time;
uint64_t* statement_in_txn_limit;
char *partition_key;
PgExecOutParam *out_param;
bool is_index_backfill;
Expand Down

0 comments on commit 893cb65

Please sign in to comment.