Skip to content

Commit

Permalink
[#22519] YSQL: Move ExplicitRowLockBuffer class into separate file
Browse files Browse the repository at this point in the history
Summary:
Move the `ExplicitRowLockBuffer` class into newly created file `pg_explicit_row_lock_buffer.cc`
Jira: DB-11445

Test Plan: Jenkins

Reviewers: pjain, kramanathan, myang, telgersma

Reviewed By: kramanathan

Subscribers: yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D37797
  • Loading branch information
d-uspenskiy committed Sep 10, 2024
1 parent f5ad1fb commit eed826d
Show file tree
Hide file tree
Showing 8 changed files with 488 additions and 404 deletions.
37 changes: 19 additions & 18 deletions src/yb/yql/pggate/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,37 @@ add_subdirectory(ysql_bench_metrics_handler)

# PGGate API library.
set(PGGATE_SRCS
ybc_pggate.cc
pggate.cc
pggate_thread_local_vars.cc
pg_callbacks.cc
pg_client.cc
pg_op.cc
pg_operation_buffer.cc
pg_perform_future.cc
pg_sample.cc
pg_session.cc
pg_column.cc
pg_ddl.cc
pg_dml.cc
pg_dml_write.cc
pg_dml_read.cc
pg_dml_write.cc
pg_doc_metrics.cc
pg_doc_op.cc
pg_explicit_row_lock_buffer.cc
pg_expr.cc
pg_function.cc
pg_function_helpers.cc
pg_type.cc
pggate.cc
pggate_thread_local_vars.cc
pg_memctx.cc
pg_op.cc
pg_operation_buffer.cc
pg_perform_future.cc
pg_sample.cc
pg_select.cc
pg_select_index.cc
pg_expr.cc
pg_column.cc
pg_doc_op.cc
pg_session.cc
pg_sys_table_prefetcher.cc
pg_table.cc
pg_tabledesc.cc
pg_tools.cc
pg_txn_manager.cc
pg_type.cc
pg_value.cc
pg_memctx.cc
pg_callbacks.cc
pg_tools.cc
pg_sys_table_prefetcher.cc
pg_doc_metrics.cc)
ybc_pggate.cc)

ADD_YB_LIBRARY(yb_pggate_flags
SRCS pggate_flags.cc
Expand Down
88 changes: 88 additions & 0 deletions src/yb/yql/pggate/pg_explicit_row_lock_buffer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/yql/pggate/pg_explicit_row_lock_buffer.h"

#include <string>
#include <utility>

#include "yb/yql/pggate/util/ybc_util.h"
#include "yb/util/scope_exit.h"

namespace yb::pggate {

ExplicitRowLockBuffer::ExplicitRowLockBuffer(
TableYbctidVectorProvider& ybctid_container_provider,
std::reference_wrapper<const YbctidReader> ybctid_reader)
: ybctid_container_provider_(ybctid_container_provider), ybctid_reader_(ybctid_reader) {
}

Status ExplicitRowLockBuffer::Add(
Info&& info, const LightweightTableYbctid& key, bool is_region_local) {
if (info_ && *info_ != info) {
RETURN_NOT_OK(DoFlush());
}
if (!info_) {
info_.emplace(std::move(info));
} else if (intents_.contains(key)) {
return Status::OK();
}

if (is_region_local) {
region_local_tables_.insert(key.table_id);
}
DCHECK(is_region_local || !region_local_tables_.contains(key.table_id));
intents_.emplace(key.table_id, std::string(key.ybctid));
return narrow_cast<int>(intents_.size()) >= yb_explicit_row_locking_batch_size
? DoFlush() : Status::OK();
}

Status ExplicitRowLockBuffer::Flush() {
return IsEmpty() ? Status::OK() : DoFlush();
}

Status ExplicitRowLockBuffer::DoFlush() {
DCHECK(!IsEmpty());
auto scope = ScopeExit([this] { Clear(); });
auto ybctids = ybctid_container_provider_.Get();
auto initial_intents_size = intents_.size();
ybctids->reserve(initial_intents_size);
for (auto it = intents_.begin(); it != intents_.end();) {
auto node = intents_.extract(it++);
ybctids->push_back(std::move(node.value()));
}
RETURN_NOT_OK(ybctid_reader_(
info_->database_id, ybctids, region_local_tables_,
make_lw_function(
[&info = *info_](PgExecParameters& params) {
params.rowmark = info.rowmark;
params.pg_wait_policy = info.pg_wait_policy;
params.docdb_wait_policy = info.docdb_wait_policy;
})));
SCHECK(
initial_intents_size == ybctids->size(), NotFound,
"Some of the requested ybctids are missing");
return Status::OK();
}

void ExplicitRowLockBuffer::Clear() {
intents_.clear();
info_.reset();
region_local_tables_.clear();
}

bool ExplicitRowLockBuffer::IsEmpty() const {
return !info_;
}

} // namespace yb::pggate
58 changes: 58 additions & 0 deletions src/yb/yql/pggate/pg_explicit_row_lock_buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#pragma once

#include <functional>
#include <optional>

#include "yb/gutil/macros.h"

#include "yb/util/status.h"
#include "yb/util/lw_function.h"

#include "yb/yql/pggate/pg_tools.h"

namespace yb::pggate {

class ExplicitRowLockBuffer {
public:
struct Info {
int rowmark;
int pg_wait_policy;
int docdb_wait_policy;
PgOid database_id;

friend bool operator==(const Info&, const Info&) = default;
};

ExplicitRowLockBuffer(
TableYbctidVectorProvider& ybctid_container_provider,
std::reference_wrapper<const YbctidReader> ybctid_reader);
Status Add(
Info&& info, const LightweightTableYbctid& key, bool is_region_local);
Status Flush();
void Clear();
bool IsEmpty() const;

private:
Status DoFlush();

TableYbctidVectorProvider& ybctid_container_provider_;
const YbctidReader& ybctid_reader_;
TableYbctidSet intents_;
OidSet region_local_tables_;
std::optional<Info> info_;
};

} // namespace yb::pggate
78 changes: 0 additions & 78 deletions src/yb/yql/pggate/pg_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@

#include <algorithm>
#include <future>
#include <memory>
#include <optional>
#include <utility>

#include <boost/functional/hash.hpp>

#include "yb/client/table_info.h"

#include "yb/common/pg_types.h"
Expand All @@ -42,7 +39,6 @@
#include "yb/util/format.h"
#include "yb/util/logging.h"
#include "yb/util/result.h"
#include "yb/util/scope_exit.h"
#include "yb/util/status_format.h"
#include "yb/util/string_util.h"

Expand Down Expand Up @@ -388,80 +384,6 @@ class PgSession::RunHelper {
const ForceFlushBeforeNonBufferableOp force_flush_before_non_bufferable_op_;
};

//--------------------------------------------------------------------------------------------------
// Class TableYbctidHasher
//--------------------------------------------------------------------------------------------------

size_t TableYbctidHasher::operator()(const LightweightTableYbctid& value) const {
size_t hash = 0;
boost::hash_combine(hash, value.table_id);
boost::hash_range(hash, value.ybctid.begin(), value.ybctid.end());
return hash;
}

ExplicitRowLockBuffer::ExplicitRowLockBuffer(
TableYbctidVectorProvider& ybctid_container_provider,
std::reference_wrapper<const YbctidReader> ybctid_reader)
: ybctid_container_provider_(ybctid_container_provider), ybctid_reader_(ybctid_reader) {
}

Status ExplicitRowLockBuffer::Add(
Info&& info, const LightweightTableYbctid& key, bool is_region_local) {
if (info_ && *info_ != info) {
RETURN_NOT_OK(DoFlush());
}
if (!info_) {
info_.emplace(std::move(info));
} else if (intents_.contains(key)) {
return Status::OK();
}

if (is_region_local) {
region_local_tables_.insert(key.table_id);
}
DCHECK(is_region_local || !region_local_tables_.contains(key.table_id));
intents_.emplace(key.table_id, std::string(key.ybctid));
return narrow_cast<int>(intents_.size()) >= yb_explicit_row_locking_batch_size
? DoFlush() : Status::OK();
}

Status ExplicitRowLockBuffer::Flush() {
return IsEmpty() ? Status::OK() : DoFlush();
}

Status ExplicitRowLockBuffer::DoFlush() {
DCHECK(!IsEmpty());
auto scope = ScopeExit([this] { Clear(); });
auto ybctids = ybctid_container_provider_.Get();
auto initial_intents_size = intents_.size();
ybctids->reserve(initial_intents_size);
for (auto it = intents_.begin(); it != intents_.end();) {
auto node = intents_.extract(it++);
ybctids->push_back(std::move(node.value()));
}
RETURN_NOT_OK(ybctid_reader_(
info_->database_id, ybctids, region_local_tables_,
make_lw_function(
[&info = *info_](PgExecParameters& params) {
params.rowmark = info.rowmark;
params.pg_wait_policy = info.pg_wait_policy;
params.docdb_wait_policy = info.docdb_wait_policy;
})));
SCHECK(initial_intents_size == ybctids->size(), NotFound,
"Some of the requested ybctids are missing");
return Status::OK();
}

void ExplicitRowLockBuffer::Clear() {
intents_.clear();
info_.reset();
region_local_tables_.clear();
}

bool ExplicitRowLockBuffer::IsEmpty() const {
return !info_;
}

//--------------------------------------------------------------------------------------------------
// Class PgSession
//--------------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit eed826d

Please sign in to comment.