From eed826d77b1198f0684c76b610bdf989678034f8 Mon Sep 17 00:00:00 2001 From: Dmitry Uspenskiy <47734295+d-uspenskiy@users.noreply.github.com> Date: Mon, 9 Sep 2024 13:07:06 +0300 Subject: [PATCH] [#22519] YSQL: Move ExplicitRowLockBuffer class into separate file Summary: Move the `ExplicitRowLockBuffer` class into newly created file `pg_explicit_row_lock_buffer.cc` Jira: DB-11445 Test Plan: Jenkins Reviewers: pjain, kramanathan, myang, telgersma Reviewed By: kramanathan Subscribers: yql Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D37797 --- src/yb/yql/pggate/CMakeLists.txt | 37 +-- .../yql/pggate/pg_explicit_row_lock_buffer.cc | 88 ++++++++ .../yql/pggate/pg_explicit_row_lock_buffer.h | 58 +++++ src/yb/yql/pggate/pg_session.cc | 78 ------- src/yb/yql/pggate/pg_session.h | 129 +---------- src/yb/yql/pggate/pg_tools.cc | 212 ++++++++++++++++++ src/yb/yql/pggate/pg_tools.h | 110 ++++++++- src/yb/yql/pggate/pggate.cc | 180 +-------------- 8 files changed, 488 insertions(+), 404 deletions(-) create mode 100644 src/yb/yql/pggate/pg_explicit_row_lock_buffer.cc create mode 100644 src/yb/yql/pggate/pg_explicit_row_lock_buffer.h diff --git a/src/yb/yql/pggate/CMakeLists.txt b/src/yb/yql/pggate/CMakeLists.txt index 58a3eb1d1051..0f08e943ed2d 100644 --- a/src/yb/yql/pggate/CMakeLists.txt +++ b/src/yb/yql/pggate/CMakeLists.txt @@ -26,36 +26,37 @@ add_subdirectory(ysql_bench_metrics_handler) # PGGate API library. set(PGGATE_SRCS - ybc_pggate.cc - pggate.cc - pggate_thread_local_vars.cc + pg_callbacks.cc pg_client.cc - pg_op.cc - pg_operation_buffer.cc - pg_perform_future.cc - pg_sample.cc - pg_session.cc + pg_column.cc pg_ddl.cc pg_dml.cc - pg_dml_write.cc pg_dml_read.cc + pg_dml_write.cc + pg_doc_metrics.cc + pg_doc_op.cc + pg_explicit_row_lock_buffer.cc + pg_expr.cc pg_function.cc pg_function_helpers.cc - pg_type.cc + pggate.cc + pggate_thread_local_vars.cc + pg_memctx.cc + pg_op.cc + pg_operation_buffer.cc + pg_perform_future.cc + pg_sample.cc pg_select.cc pg_select_index.cc - pg_expr.cc - pg_column.cc - pg_doc_op.cc + pg_session.cc + pg_sys_table_prefetcher.cc pg_table.cc pg_tabledesc.cc + pg_tools.cc pg_txn_manager.cc + pg_type.cc pg_value.cc - pg_memctx.cc - pg_callbacks.cc - pg_tools.cc - pg_sys_table_prefetcher.cc - pg_doc_metrics.cc) + ybc_pggate.cc) ADD_YB_LIBRARY(yb_pggate_flags SRCS pggate_flags.cc diff --git a/src/yb/yql/pggate/pg_explicit_row_lock_buffer.cc b/src/yb/yql/pggate/pg_explicit_row_lock_buffer.cc new file mode 100644 index 000000000000..44df52a4b908 --- /dev/null +++ b/src/yb/yql/pggate/pg_explicit_row_lock_buffer.cc @@ -0,0 +1,88 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/yql/pggate/pg_explicit_row_lock_buffer.h" + +#include +#include + +#include "yb/yql/pggate/util/ybc_util.h" +#include "yb/util/scope_exit.h" + +namespace yb::pggate { + +ExplicitRowLockBuffer::ExplicitRowLockBuffer( + TableYbctidVectorProvider& ybctid_container_provider, + std::reference_wrapper ybctid_reader) + : ybctid_container_provider_(ybctid_container_provider), ybctid_reader_(ybctid_reader) { +} + +Status ExplicitRowLockBuffer::Add( + Info&& info, const LightweightTableYbctid& key, bool is_region_local) { + if (info_ && *info_ != info) { + RETURN_NOT_OK(DoFlush()); + } + if (!info_) { + info_.emplace(std::move(info)); + } else if (intents_.contains(key)) { + return Status::OK(); + } + + if (is_region_local) { + region_local_tables_.insert(key.table_id); + } + DCHECK(is_region_local || !region_local_tables_.contains(key.table_id)); + intents_.emplace(key.table_id, std::string(key.ybctid)); + return narrow_cast(intents_.size()) >= yb_explicit_row_locking_batch_size + ? DoFlush() : Status::OK(); +} + +Status ExplicitRowLockBuffer::Flush() { + return IsEmpty() ? Status::OK() : DoFlush(); +} + +Status ExplicitRowLockBuffer::DoFlush() { + DCHECK(!IsEmpty()); + auto scope = ScopeExit([this] { Clear(); }); + auto ybctids = ybctid_container_provider_.Get(); + auto initial_intents_size = intents_.size(); + ybctids->reserve(initial_intents_size); + for (auto it = intents_.begin(); it != intents_.end();) { + auto node = intents_.extract(it++); + ybctids->push_back(std::move(node.value())); + } + RETURN_NOT_OK(ybctid_reader_( + info_->database_id, ybctids, region_local_tables_, + make_lw_function( + [&info = *info_](PgExecParameters& params) { + params.rowmark = info.rowmark; + params.pg_wait_policy = info.pg_wait_policy; + params.docdb_wait_policy = info.docdb_wait_policy; + }))); + SCHECK( + initial_intents_size == ybctids->size(), NotFound, + "Some of the requested ybctids are missing"); + return Status::OK(); +} + +void ExplicitRowLockBuffer::Clear() { + intents_.clear(); + info_.reset(); + region_local_tables_.clear(); +} + +bool ExplicitRowLockBuffer::IsEmpty() const { + return !info_; +} + +} // namespace yb::pggate diff --git a/src/yb/yql/pggate/pg_explicit_row_lock_buffer.h b/src/yb/yql/pggate/pg_explicit_row_lock_buffer.h new file mode 100644 index 000000000000..ba462df61dd0 --- /dev/null +++ b/src/yb/yql/pggate/pg_explicit_row_lock_buffer.h @@ -0,0 +1,58 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#pragma once + +#include +#include + +#include "yb/gutil/macros.h" + +#include "yb/util/status.h" +#include "yb/util/lw_function.h" + +#include "yb/yql/pggate/pg_tools.h" + +namespace yb::pggate { + +class ExplicitRowLockBuffer { + public: + struct Info { + int rowmark; + int pg_wait_policy; + int docdb_wait_policy; + PgOid database_id; + + friend bool operator==(const Info&, const Info&) = default; + }; + + ExplicitRowLockBuffer( + TableYbctidVectorProvider& ybctid_container_provider, + std::reference_wrapper ybctid_reader); + Status Add( + Info&& info, const LightweightTableYbctid& key, bool is_region_local); + Status Flush(); + void Clear(); + bool IsEmpty() const; + + private: + Status DoFlush(); + + TableYbctidVectorProvider& ybctid_container_provider_; + const YbctidReader& ybctid_reader_; + TableYbctidSet intents_; + OidSet region_local_tables_; + std::optional info_; +}; + +} // namespace yb::pggate diff --git a/src/yb/yql/pggate/pg_session.cc b/src/yb/yql/pggate/pg_session.cc index 3308243d417c..da9f4e4918e9 100644 --- a/src/yb/yql/pggate/pg_session.cc +++ b/src/yb/yql/pggate/pg_session.cc @@ -17,12 +17,9 @@ #include #include -#include #include #include -#include - #include "yb/client/table_info.h" #include "yb/common/pg_types.h" @@ -42,7 +39,6 @@ #include "yb/util/format.h" #include "yb/util/logging.h" #include "yb/util/result.h" -#include "yb/util/scope_exit.h" #include "yb/util/status_format.h" #include "yb/util/string_util.h" @@ -388,80 +384,6 @@ class PgSession::RunHelper { const ForceFlushBeforeNonBufferableOp force_flush_before_non_bufferable_op_; }; -//-------------------------------------------------------------------------------------------------- -// Class TableYbctidHasher -//-------------------------------------------------------------------------------------------------- - -size_t TableYbctidHasher::operator()(const LightweightTableYbctid& value) const { - size_t hash = 0; - boost::hash_combine(hash, value.table_id); - boost::hash_range(hash, value.ybctid.begin(), value.ybctid.end()); - return hash; -} - -ExplicitRowLockBuffer::ExplicitRowLockBuffer( - TableYbctidVectorProvider& ybctid_container_provider, - std::reference_wrapper ybctid_reader) - : ybctid_container_provider_(ybctid_container_provider), ybctid_reader_(ybctid_reader) { -} - -Status ExplicitRowLockBuffer::Add( - Info&& info, const LightweightTableYbctid& key, bool is_region_local) { - if (info_ && *info_ != info) { - RETURN_NOT_OK(DoFlush()); - } - if (!info_) { - info_.emplace(std::move(info)); - } else if (intents_.contains(key)) { - return Status::OK(); - } - - if (is_region_local) { - region_local_tables_.insert(key.table_id); - } - DCHECK(is_region_local || !region_local_tables_.contains(key.table_id)); - intents_.emplace(key.table_id, std::string(key.ybctid)); - return narrow_cast(intents_.size()) >= yb_explicit_row_locking_batch_size - ? DoFlush() : Status::OK(); -} - -Status ExplicitRowLockBuffer::Flush() { - return IsEmpty() ? Status::OK() : DoFlush(); -} - -Status ExplicitRowLockBuffer::DoFlush() { - DCHECK(!IsEmpty()); - auto scope = ScopeExit([this] { Clear(); }); - auto ybctids = ybctid_container_provider_.Get(); - auto initial_intents_size = intents_.size(); - ybctids->reserve(initial_intents_size); - for (auto it = intents_.begin(); it != intents_.end();) { - auto node = intents_.extract(it++); - ybctids->push_back(std::move(node.value())); - } - RETURN_NOT_OK(ybctid_reader_( - info_->database_id, ybctids, region_local_tables_, - make_lw_function( - [&info = *info_](PgExecParameters& params) { - params.rowmark = info.rowmark; - params.pg_wait_policy = info.pg_wait_policy; - params.docdb_wait_policy = info.docdb_wait_policy; - }))); - SCHECK(initial_intents_size == ybctids->size(), NotFound, - "Some of the requested ybctids are missing"); - return Status::OK(); -} - -void ExplicitRowLockBuffer::Clear() { - intents_.clear(); - info_.reset(); - region_local_tables_.clear(); -} - -bool ExplicitRowLockBuffer::IsEmpty() const { - return !info_; -} - //-------------------------------------------------------------------------------------------------- // Class PgSession //-------------------------------------------------------------------------------------------------- diff --git a/src/yb/yql/pggate/pg_session.h b/src/yb/yql/pggate/pg_session.h index 8b66b91a319b..64b24375a2a9 100644 --- a/src/yb/yql/pggate/pg_session.h +++ b/src/yb/yql/pggate/pg_session.h @@ -13,13 +13,10 @@ #pragma once -#include #include #include -#include #include #include -#include #include #include @@ -39,6 +36,7 @@ #include "yb/yql/pggate/pg_client.h" #include "yb/yql/pggate/pg_doc_metrics.h" +#include "yb/yql/pggate/pg_explicit_row_lock_buffer.h" #include "yb/yql/pggate/pg_gate_fwd.h" #include "yb/yql/pggate/pg_operation_buffer.h" #include "yb/yql/pggate/pg_perform_future.h" @@ -53,131 +51,6 @@ YB_STRONGLY_TYPED_BOOL(InvalidateOnPgClient); YB_STRONGLY_TYPED_BOOL(UseCatalogSession); YB_STRONGLY_TYPED_BOOL(ForceNonBufferable); -struct LightweightTableYbctid { - LightweightTableYbctid(PgOid table_id_, const std::string_view& ybctid_) - : table_id(table_id_), ybctid(ybctid_) {} - LightweightTableYbctid(PgOid table_id_, const Slice& ybctid_) - : LightweightTableYbctid(table_id_, static_cast(ybctid_)) {} - - PgOid table_id; - std::string_view ybctid; -}; - -struct TableYbctid { - TableYbctid(PgOid table_id_, std::string ybctid_) - : table_id(table_id_), ybctid(std::move(ybctid_)) {} - - operator LightweightTableYbctid() const { - return LightweightTableYbctid(table_id, static_cast(ybctid)); - } - - PgOid table_id; - std::string ybctid; -}; - -struct MemoryOptimizedTableYbctid { - MemoryOptimizedTableYbctid(PgOid table_id_, std::string_view ybctid_) - : table_id(table_id_), - ybctid_size(static_cast(ybctid_.size())), - ybctid_data(new char[ybctid_size]) { - std::memcpy(ybctid_data.get(), ybctid_.data(), ybctid_size); - } - - operator LightweightTableYbctid() const { - return LightweightTableYbctid(table_id, std::string_view(ybctid_data.get(), ybctid_size)); - } - - PgOid table_id; - uint32_t ybctid_size; - std::unique_ptr ybctid_data; -}; - -static_assert( - sizeof(MemoryOptimizedTableYbctid) == 16 && - sizeof(MemoryOptimizedTableYbctid) < sizeof(TableYbctid)); - -struct TableYbctidComparator { - using is_transparent = void; - - bool operator()(const LightweightTableYbctid& l, const LightweightTableYbctid& r) const { - return l.table_id == r.table_id && l.ybctid == r.ybctid; - } -}; - -struct TableYbctidHasher { - using is_transparent = void; - - size_t operator()(const LightweightTableYbctid& value) const; -}; - -using OidSet = std::unordered_set; -template -using TableYbctidSetHelper = - std::unordered_set; -using MemoryOptimizedTableYbctidSet = TableYbctidSetHelper; -using TableYbctidSet = TableYbctidSetHelper; -using TableYbctidVector = std::vector; - -class TableYbctidVectorProvider { - public: - class Accessor { - public: - ~Accessor() { container_.clear(); } - TableYbctidVector* operator->() { return &container_; } - TableYbctidVector& operator*() { return container_; } - operator TableYbctidVector&() { return container_; } - - private: - explicit Accessor(TableYbctidVector* container) : container_(*container) {} - - friend class TableYbctidVectorProvider; - - TableYbctidVector& container_; - - DISALLOW_COPY_AND_ASSIGN(Accessor); - }; - - [[nodiscard]] Accessor Get() { return Accessor(&container_); } - - private: - TableYbctidVector container_; -}; - -using ExecParametersMutator = LWFunction; - -using YbctidReader = - std::function; - -class ExplicitRowLockBuffer { - public: - struct Info { - int rowmark; - int pg_wait_policy; - int docdb_wait_policy; - PgOid database_id; - - friend bool operator==(const Info&, const Info&) = default; - }; - - ExplicitRowLockBuffer( - TableYbctidVectorProvider& ybctid_container_provider, - std::reference_wrapper ybctid_reader); - Status Add( - Info&& info, const LightweightTableYbctid& key, bool is_region_local); - Status Flush(); - void Clear(); - bool IsEmpty() const; - - private: - Status DoFlush(); - - TableYbctidVectorProvider& ybctid_container_provider_; - const YbctidReader& ybctid_reader_; - TableYbctidSet intents_; - OidSet region_local_tables_; - std::optional info_; -}; - // This class is not thread-safe as it is mostly used by a single-threaded PostgreSQL backend // process. class PgSession : public RefCountedThreadSafe { diff --git a/src/yb/yql/pggate/pg_tools.cc b/src/yb/yql/pggate/pg_tools.cc index 359e376f2dfb..c901686ac057 100644 --- a/src/yb/yql/pggate/pg_tools.cc +++ b/src/yb/yql/pggate/pg_tools.cc @@ -15,12 +15,134 @@ #include "yb/yql/pggate/pg_tools.h" +#include +#include + +#include +#include + +#include "yb/common/pg_system_attr.h" + +#include "yb/util/memory/arena.h" +#include "yb/util/result.h" + +#include "yb/yql/pggate/pg_doc_op.h" +#include "yb/yql/pggate/pg_session.h" +#include "yb/yql/pggate/pg_table.h" + #include "yb/yql/pggate/ybc_pg_typedefs.h" DECLARE_uint32(TEST_yb_ash_sleep_at_wait_state_ms); DECLARE_uint32(TEST_yb_ash_wait_code_to_sleep_at); namespace yb::pggate { +namespace { + +struct TableHolder { + explicit TableHolder(const PgTableDescPtr& descr) : table_(descr) {} + PgTable table_; +}; + +class PgsqlReadOpWithPgTable : private TableHolder, public PgsqlReadOp { + public: + PgsqlReadOpWithPgTable(ThreadSafeArena* arena, const PgTableDescPtr& descr, bool is_region_local, + PgsqlMetricsCaptureType metrics_capture) + : TableHolder(descr), PgsqlReadOp(arena, *table_, is_region_local, metrics_capture) {} + + PgTable& table() { + return table_; + } +}; + +// Helper class to collect operations from multiple doc_ops and send them with a single perform RPC. +class PrecastRequestSender { + // Struct stores operation and table for futher sending this operation + // with the 'PgSession::RunAsync' method. + struct OperationInfo { + OperationInfo(const PgsqlOpPtr& operation_, const PgTableDesc& table_) + : operation(operation_), table(&table_) {} + PgsqlOpPtr operation; + const PgTableDesc* table; + }; + + class ResponseProvider : public PgDocResponse::Provider { + public: + // Shared state among different instances of the 'PgDocResponse' object returned by the 'Send' + // method. Response field will be initialized when all collected operations will be sent by the + // call of 'TransmitCollected' method. + using State = PgDocResponse::Data; + using StatePtr = std::shared_ptr; + + explicit ResponseProvider(const StatePtr& state) + : state_(state) {} + + Result Get() override { + SCHECK(state_->response, IllegalState, "Response is not set"); + return *state_; + } + + private: + StatePtr state_; + }; + + public: + Result Send( + PgSession& session, const PgsqlOpPtr* ops, size_t ops_count, const PgTableDesc& table, + HybridTime in_txn_limit) { + if (!collecting_mode_) { + return PgDocOp::DefaultSender( + &session, ops, ops_count, table, in_txn_limit, + ForceNonBufferable::kFalse, IsForWritePgDoc::kFalse); + } + // For now PrecastRequestSender can work only with a new in txn limit set to the current time + // for each batch of ops. It doesn't use a single in txn limit for all read ops in a statement. + // TODO: Explain why is this the case because it differs from requirement 1 in + // src/yb/yql/pggate/README + RSTATUS_DCHECK(!in_txn_limit, IllegalState, "Only zero is expected"); + for (auto end = ops + ops_count; ops != end; ++ops) { + ops_.emplace_back(*ops, table); + } + if (!provider_state_) { + provider_state_ = std::make_shared(); + } + return PgDocResponse(std::make_unique(provider_state_)); + } + + Status TransmitCollected(PgSession& session) { + auto res = DoTransmitCollected(session); + ops_.clear(); + provider_state_.reset(); + return res; + } + + void DisableCollecting() { + DCHECK(ops_.empty()); + collecting_mode_ = false; + } + + private: + Status DoTransmitCollected(PgSession& session) { + auto i = ops_.begin(); + PgDocResponse response(VERIFY_RESULT(session.RunAsync(make_lw_function( + [&i, end = ops_.end()] { + using TO = PgSession::TableOperation; + if (i == end) { + return TO(); + } + auto& info = *i++; + return TO{.operation = &info.operation, .table = info.table}; + }), HybridTime())), + {TableType::USER, IsForWritePgDoc::kFalse}); + *provider_state_ = VERIFY_RESULT(response.Get(session)); + return Status::OK(); + } + + bool collecting_mode_ = true; + ResponseProvider::StatePtr provider_state_; + boost::container::small_vector ops_; +}; + +} // namespace RowMarkType GetRowMarkType(const PgExecParameters* exec_params) { return exec_params && exec_params->rowmark > -1 @@ -42,4 +164,94 @@ PgWaitEventWatcher::~PgWaitEventWatcher() { starter_(prev_wait_event_); } +MemoryOptimizedTableYbctid::MemoryOptimizedTableYbctid(PgOid table_id_, std::string_view ybctid_) + : table_id(table_id_), + ybctid_size(static_cast(ybctid_.size())), + ybctid_data(new char[ybctid_size]) { + std::memcpy(ybctid_data.get(), ybctid_.data(), ybctid_size); +} + +MemoryOptimizedTableYbctid::operator LightweightTableYbctid() const { + return LightweightTableYbctid(table_id, std::string_view(ybctid_data.get(), ybctid_size)); +} + +size_t TableYbctidHasher::operator()(const LightweightTableYbctid& value) const { + size_t hash = 0; + boost::hash_combine(hash, value.table_id); + boost::hash_range(hash, value.ybctid.begin(), value.ybctid.end()); + return hash; +} + +Status FetchExistingYbctids(const PgSession::ScopedRefPtr& session, + PgOid database_id, + TableYbctidVector& ybctids, + const OidSet& region_local_tables, + const ExecParametersMutator& exec_params_mutator) { + // Group the items by the table ID. + std::sort(ybctids.begin(), ybctids.end(), [](const auto& a, const auto& b) { + return a.table_id < b.table_id; + }); + + auto arena = std::make_shared(); + + PrecastRequestSender precast_sender; + boost::container::small_vector, 16> doc_ops; + auto request_sender = [&precast_sender]( + PgSession* session, const PgsqlOpPtr* ops, size_t ops_count, const PgTableDesc& table, + HybridTime in_txn_limit, ForceNonBufferable force_non_bufferable, IsForWritePgDoc is_write) { + DCHECK(!force_non_bufferable); + DCHECK(!is_write); + return precast_sender.Send(*session, ops, ops_count, table, in_txn_limit); + }; + // Start all the doc_ops to read from docdb in parallel, one doc_op per table ID. + // Each doc_op will use request_sender to send all the requests with single perform RPC. + for (auto it = ybctids.begin(), end = ybctids.end(); it != end;) { + const auto table_id = it->table_id; + auto desc = VERIFY_RESULT(session->LoadTable(PgObjectId(database_id, table_id))); + bool is_region_local = region_local_tables.find(table_id) != region_local_tables.end(); + auto metrics_capture = session->metrics().metrics_capture(); + auto read_op = std::make_shared( + arena.get(), desc, is_region_local, metrics_capture); + + auto* expr_pb = read_op->read_request().add_targets(); + expr_pb->set_column_id(to_underlying(PgSystemAttrNum::kYBTupleId)); + doc_ops.push_back(std::make_unique( + session, &read_op->table(), std::move(read_op), request_sender)); + auto& doc_op = *doc_ops.back(); + auto exec_params = doc_op.ExecParameters(); + exec_params_mutator(exec_params); + RETURN_NOT_OK(doc_op.ExecuteInit(&exec_params)); + // Populate doc_op with ybctids which belong to current table. + RETURN_NOT_OK(doc_op.PopulateByYbctidOps({make_lw_function([&it, table_id, end] { + return it != end && it->table_id == table_id ? Slice((it++)->ybctid) : Slice(); + }), static_cast(end - it)})); + RETURN_NOT_OK(doc_op.Execute()); + } + + RETURN_NOT_OK(precast_sender.TransmitCollected(*session)); + // Disable further request collecting as in the vast majority of cases new requests will not be + // initiated because requests for all ybctids has already been sent. But in case of dynamic + // splitting new requests might be sent. They will be sent and processed as usual (i.e. request + // of each doc_op will be sent individually). + precast_sender.DisableCollecting(); + // Collect the results from the docdb ops. + ybctids.clear(); + for (auto& it : doc_ops) { + for (;;) { + auto rowsets = VERIFY_RESULT(it->GetResult()); + if (rowsets.empty()) { + break; + } + for (auto& row : rowsets) { + RETURN_NOT_OK(row.ProcessSystemColumns()); + for (const auto& ybctid : row.ybctids()) { + ybctids.emplace_back(it->table()->relfilenode_id().object_oid, ybctid.ToBuffer()); + } + } + } + } + + return Status::OK(); +} + } // namespace yb::pggate diff --git a/src/yb/yql/pggate/pg_tools.h b/src/yb/yql/pggate/pg_tools.h index 0aec6f457b03..c26ad788cf6f 100644 --- a/src/yb/yql/pggate/pg_tools.h +++ b/src/yb/yql/pggate/pg_tools.h @@ -1,5 +1,5 @@ //-------------------------------------------------------------------------------------------------- -// Copyright (c) YugaByte, Inc. +// Copyright (c) YugabyteDB, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except // in compliance with the License. You may obtain a copy of the License at @@ -17,16 +17,32 @@ #pragma once +#include +#include +#include +#include +#include +#include +#include + #include "yb/ash/wait_state.h" +#include "yb/common/pg_types.h" #include "yb/common/transaction.pb.h" #include "yb/gutil/macros.h" +#include "yb/gutil/ref_counted.h" + +#include "yb/util/lw_function.h" +#include "yb/util/slice.h" +#include "yb/util/status.h" struct PgExecParameters; namespace yb::pggate { +class PgSession; + RowMarkType GetRowMarkType(const PgExecParameters* exec_params); struct Bound { @@ -56,4 +72,96 @@ struct EstimatedRowCount { double dead; }; +struct LightweightTableYbctid { + LightweightTableYbctid(PgOid table_id_, const std::string_view& ybctid_) + : table_id(table_id_), ybctid(ybctid_) {} + LightweightTableYbctid(PgOid table_id_, const Slice& ybctid_) + : LightweightTableYbctid(table_id_, static_cast(ybctid_)) {} + + PgOid table_id; + std::string_view ybctid; +}; + +struct TableYbctid { + TableYbctid(PgOid table_id_, std::string ybctid_) + : table_id(table_id_), ybctid(std::move(ybctid_)) {} + + operator LightweightTableYbctid() const { + return LightweightTableYbctid(table_id, static_cast(ybctid)); + } + + PgOid table_id; + std::string ybctid; +}; + +struct MemoryOptimizedTableYbctid { + MemoryOptimizedTableYbctid(PgOid table_id_, std::string_view ybctid_); + + operator LightweightTableYbctid() const; + + PgOid table_id; + uint32_t ybctid_size; + std::unique_ptr ybctid_data; +}; + +static_assert( + sizeof(MemoryOptimizedTableYbctid) == 16 && + sizeof(MemoryOptimizedTableYbctid) < sizeof(TableYbctid)); + +struct TableYbctidComparator { + using is_transparent = void; + + bool operator()(const LightweightTableYbctid& l, const LightweightTableYbctid& r) const { + return l.table_id == r.table_id && l.ybctid == r.ybctid; + } +}; + +struct TableYbctidHasher { + using is_transparent = void; + + size_t operator()(const LightweightTableYbctid& value) const; +}; + +using OidSet = std::unordered_set; +template +using TableYbctidSetHelper = + std::unordered_set; +using MemoryOptimizedTableYbctidSet = TableYbctidSetHelper; +using TableYbctidSet = TableYbctidSetHelper; +using TableYbctidVector = std::vector; + +class TableYbctidVectorProvider { + public: + class Accessor { + public: + ~Accessor() { container_.clear(); } + TableYbctidVector* operator->() { return &container_; } + TableYbctidVector& operator*() { return container_; } + operator TableYbctidVector&() { return container_; } + + private: + explicit Accessor(TableYbctidVector* container) : container_(*container) {} + + friend class TableYbctidVectorProvider; + + TableYbctidVector& container_; + + DISALLOW_COPY_AND_ASSIGN(Accessor); + }; + + [[nodiscard]] Accessor Get() { return Accessor(&container_); } + + private: + TableYbctidVector container_; +}; + +using ExecParametersMutator = LWFunction; + +using YbctidReader = + std::function; + +Status FetchExistingYbctids( + const scoped_refptr& session, PgOid database_id, TableYbctidVector& ybctids, + const OidSet& region_local_tables, const ExecParametersMutator& exec_params_mutator); + } // namespace yb::pggate diff --git a/src/yb/yql/pggate/pggate.cc b/src/yb/yql/pggate/pggate.cc index d1c95fc0c21d..02835a0876e1 100644 --- a/src/yb/yql/pggate/pggate.cc +++ b/src/yb/yql/pggate/pggate.cc @@ -1,5 +1,5 @@ //-------------------------------------------------------------------------------------------------- -// Copyright (c) YugaByteDB, Inc. +// Copyright (c) YugabyteDB, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except // in compliance with the License. You may obtain a copy of the License at @@ -21,8 +21,6 @@ #include #include -#include - #include #include "yb/client/client_utils.h" @@ -92,22 +90,6 @@ DEFINE_RUNTIME_PREVIEW_bool(ysql_pack_inserted_value, false, namespace yb::pggate { namespace { -struct TableHolder { - explicit TableHolder(const PgTableDescPtr& descr) : table_(descr) {} - PgTable table_; -}; - -class PgsqlReadOpWithPgTable : private TableHolder, public PgsqlReadOp { - public: - PgsqlReadOpWithPgTable(ThreadSafeArena* arena, const PgTableDescPtr& descr, bool is_region_local, - PgsqlMetricsCaptureType metrics_capture) - : TableHolder(descr), PgsqlReadOp(arena, *table_, is_region_local, metrics_capture) {} - - PgTable& table() { - return table_; - } -}; - Status AddColumn(PgCreateTable* pg_stmt, const char *attr_name, int attr_num, const YBCPgTypeEntity *attr_type, bool is_hash, bool is_range, bool is_desc, bool is_nulls_first) { @@ -149,166 +131,6 @@ tserver::TServerSharedObject BuildTServerSharedObject() { return CHECK_RESULT(tserver::TServerSharedObject::OpenReadOnly(FLAGS_pggate_tserver_shm_fd)); } -// Helper class to collect operations from multiple doc_ops and send them with a single perform RPC. -class PrecastRequestSender { - // Struct stores operation and table for futher sending this operation - // with the 'PgSession::RunAsync' method. - struct OperationInfo { - OperationInfo(const PgsqlOpPtr& operation_, const PgTableDesc& table_) - : operation(operation_), table(&table_) {} - PgsqlOpPtr operation; - const PgTableDesc* table; - }; - - class ResponseProvider : public PgDocResponse::Provider { - public: - // Shared state among different instances of the 'PgDocResponse' object returned by the 'Send' - // method. Response field will be initialized when all collected operations will be sent by the - // call of 'TransmitCollected' method. - using State = PgDocResponse::Data; - using StatePtr = std::shared_ptr; - - explicit ResponseProvider(const StatePtr& state) - : state_(state) {} - - Result Get() override { - SCHECK(state_->response, IllegalState, "Response is not set"); - return *state_; - } - - private: - StatePtr state_; - }; - - public: - Result Send( - PgSession& session, const PgsqlOpPtr* ops, size_t ops_count, const PgTableDesc& table, - HybridTime in_txn_limit) { - if (!collecting_mode_) { - return PgDocOp::DefaultSender( - &session, ops, ops_count, table, in_txn_limit, - ForceNonBufferable::kFalse, IsForWritePgDoc::kFalse); - } - // For now PrecastRequestSender can work only with a new in txn limit set to the current time - // for each batch of ops. It doesn't use a single in txn limit for all read ops in a statement. - // TODO: Explain why is this the case because it differs from requirement 1 in - // src/yb/yql/pggate/README - RSTATUS_DCHECK(!in_txn_limit, IllegalState, "Only zero is expected"); - for (auto end = ops + ops_count; ops != end; ++ops) { - ops_.emplace_back(*ops, table); - } - if (!provider_state_) { - provider_state_ = std::make_shared(); - } - return PgDocResponse(std::make_unique(provider_state_)); - } - - Status TransmitCollected(PgSession& session) { - auto res = DoTransmitCollected(session); - ops_.clear(); - provider_state_.reset(); - return res; - } - - void DisableCollecting() { - DCHECK(ops_.empty()); - collecting_mode_ = false; - } - - private: - Status DoTransmitCollected(PgSession& session) { - auto i = ops_.begin(); - PgDocResponse response(VERIFY_RESULT(session.RunAsync(make_lw_function( - [&i, end = ops_.end()] { - using TO = PgSession::TableOperation; - if (i == end) { - return TO(); - } - auto& info = *i++; - return TO{.operation = &info.operation, .table = info.table}; - }), HybridTime())), - {TableType::USER, IsForWritePgDoc::kFalse}); - *provider_state_ = VERIFY_RESULT(response.Get(session)); - return Status::OK(); - } - - bool collecting_mode_ = true; - ResponseProvider::StatePtr provider_state_; - boost::container::small_vector ops_; -}; - -Status FetchExistingYbctids(const PgSession::ScopedRefPtr& session, - PgOid database_id, - TableYbctidVector& ybctids, - const OidSet& region_local_tables, - const ExecParametersMutator& exec_params_mutator) { - // Group the items by the table ID. - std::sort(ybctids.begin(), ybctids.end(), [](const auto& a, const auto& b) { - return a.table_id < b.table_id; - }); - - auto arena = std::make_shared(); - - PrecastRequestSender precast_sender; - boost::container::small_vector, 16> doc_ops; - auto request_sender = [&precast_sender]( - PgSession* session, const PgsqlOpPtr* ops, size_t ops_count, const PgTableDesc& table, - HybridTime in_txn_limit, ForceNonBufferable force_non_bufferable, IsForWritePgDoc is_write) { - DCHECK(!force_non_bufferable); - DCHECK(!is_write); - return precast_sender.Send(*session, ops, ops_count, table, in_txn_limit); - }; - // Start all the doc_ops to read from docdb in parallel, one doc_op per table ID. - // Each doc_op will use request_sender to send all the requests with single perform RPC. - for (auto it = ybctids.begin(), end = ybctids.end(); it != end;) { - const auto table_id = it->table_id; - auto desc = VERIFY_RESULT(session->LoadTable(PgObjectId(database_id, table_id))); - bool is_region_local = region_local_tables.find(table_id) != region_local_tables.end(); - auto metrics_capture = session->metrics().metrics_capture(); - auto read_op = std::make_shared( - arena.get(), desc, is_region_local, metrics_capture); - - auto* expr_pb = read_op->read_request().add_targets(); - expr_pb->set_column_id(to_underlying(PgSystemAttrNum::kYBTupleId)); - doc_ops.push_back(std::make_unique( - session, &read_op->table(), std::move(read_op), request_sender)); - auto& doc_op = *doc_ops.back(); - auto exec_params = doc_op.ExecParameters(); - exec_params_mutator(exec_params); - RETURN_NOT_OK(doc_op.ExecuteInit(&exec_params)); - // Populate doc_op with ybctids which belong to current table. - RETURN_NOT_OK(doc_op.PopulateByYbctidOps({make_lw_function([&it, table_id, end] { - return it != end && it->table_id == table_id ? Slice((it++)->ybctid) : Slice(); - }), static_cast(end - it)})); - RETURN_NOT_OK(doc_op.Execute()); - } - - RETURN_NOT_OK(precast_sender.TransmitCollected(*session)); - // Disable further request collecting as in the vast majority of cases new requests will not be - // initiated because requests for all ybctids has already been sent. But in case of dynamic - // splitting new requests might be sent. They will be sent and processed as usual (i.e. request - // of each doc_op will be sent individually). - precast_sender.DisableCollecting(); - // Collect the results from the docdb ops. - ybctids.clear(); - for (auto& it : doc_ops) { - for (;;) { - auto rowsets = VERIFY_RESULT(it->GetResult()); - if (rowsets.empty()) { - break; - } - for (auto& row : rowsets) { - RETURN_NOT_OK(row.ProcessSystemColumns()); - for (const auto& ybctid : row.ybctids()) { - ybctids.emplace_back(it->table()->relfilenode_id().object_oid, ybctid.ToBuffer()); - } - } - } - } - - return Status::OK(); -} - } // namespace //--------------------------------------------------------------------------------------------------