Skip to content

Commit

Permalink
[#14607] xCluster: Use tablet level xCluster consistency during read …
Browse files Browse the repository at this point in the history
…of catalog only data

Summary:
Catalog only reads like authentication, and \dt should use latest reads (tablet level consistency), as we always want to read latest data in non-user system tables. This makes sure we always respect the latest databases security policies like users, and roles.

Also when the safe time for the database is not valid yet, it allows us to still start a connection. We can then set `yb_xcluster_consistency_level` to `tablet` if needed and access the partial data.

Fixes #14607

Test Plan: XClusterConsistencyTest

Reviewers: slingam, myang, rahuldesirazu, dmitry

Reviewed By: rahuldesirazu

Subscribers: dmitry, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D21003
  • Loading branch information
hari90 committed Nov 15, 2022
1 parent 1d8e741 commit 27fa5c5
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 65 deletions.
179 changes: 118 additions & 61 deletions ent/src/yb/integration-tests/xcluster_safe_time-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,16 @@
#include "yb/client/table.h"

using std::string;

using namespace std::chrono_literals;

DECLARE_int32(xcluster_safe_time_update_interval_secs);
DECLARE_bool(TEST_xcluster_simulate_have_more_records);
DECLARE_bool(enable_load_balancing);
DECLARE_string(ysql_yb_xcluster_consistency_level);
DECLARE_int32(TEST_xcluster_simulated_lag_ms);
DECLARE_string(TEST_xcluster_simulated_lag_tablet_filter);
DECLARE_int32(cdc_max_apply_batch_num_records);
DECLARE_bool(enable_replicate_transaction_status_table);
DECLARE_string(ysql_yb_xcluster_consistency_level);
DECLARE_int32(transaction_table_num_tablets);
DECLARE_int32(cdc_max_apply_batch_num_records);
DECLARE_string(TEST_xcluster_simulated_lag_tablet_filter);

namespace yb {
using client::YBSchema;
Expand All @@ -61,9 +59,9 @@ const int kMasterCount = 3;
const int kTServerCount = 3;
const int kTabletCount = 3;
const string kTableName = "test_table";
constexpr int kWaitForRowCountTimeout = 5 * kTimeMultiplier;
const string kTableName2 = "test_table2";
const string kDatabaseName = "yugabyte";
constexpr int kWaitForRowCountTimeout = 5 * kTimeMultiplier;

class XClusterSafeTimeTest : public TwoDCTestBase {
public:
Expand Down Expand Up @@ -280,7 +278,7 @@ string GetCompleteTableName(const YBTableName& table) {
}
} // namespace

class XClusterSafeTimeYsqlTest : public TwoDCTestBase {
class XClusterConsistencyTest : public TwoDCTestBase {
public:
void SetUp() override {
// Skip in TSAN as InitDB times out.
Expand Down Expand Up @@ -309,7 +307,7 @@ class XClusterSafeTimeYsqlTest : public TwoDCTestBase {
kTableName,
{} /*tablegroup_name*/,
kTabletCount));
ASSERT_OK(producer_client()->OpenTable(table_name, &producer_table_));
ASSERT_OK(producer_client()->OpenTable(table_name, &producer_table1_));

auto table_name2 = ASSERT_RESULT(CreateYsqlTable(
&producer_cluster_,
Expand All @@ -318,7 +316,7 @@ class XClusterSafeTimeYsqlTest : public TwoDCTestBase {
kTableName2,
{} /*tablegroup_name*/,
1 /*num_tablets*/));
ASSERT_OK(producer_client()->OpenTable(table_name2, &producer_table_2));
ASSERT_OK(producer_client()->OpenTable(table_name2, &producer_table2_));
});

auto consumer_cluster_future = std::async(std::launch::async, [&] {
Expand All @@ -329,7 +327,7 @@ class XClusterSafeTimeYsqlTest : public TwoDCTestBase {
kTableName,
{} /*tablegroup_name*/,
kTabletCount));
ASSERT_OK(consumer_client()->OpenTable(table_name, &consumer_table_));
ASSERT_OK(consumer_client()->OpenTable(table_name, &consumer_table1_));

auto table_name2 = ASSERT_RESULT(CreateYsqlTable(
&consumer_cluster_,
Expand All @@ -338,7 +336,7 @@ class XClusterSafeTimeYsqlTest : public TwoDCTestBase {
kTableName2,
{} /*tablegroup_name*/,
1 /*num_tablets*/));
ASSERT_OK(consumer_client()->OpenTable(table_name2, &consumer_table_2));
ASSERT_OK(consumer_client()->OpenTable(table_name2, &consumer_table2_));
});

producer_cluster_future.get();
Expand All @@ -347,23 +345,24 @@ class XClusterSafeTimeYsqlTest : public TwoDCTestBase {
ASSERT_OK(GetNamespaceId());

ASSERT_OK(producer_cluster_.client_->GetTablets(
producer_table_->name(), 0 /* max_tablets */, &producer_tablet_ids_, NULL));
producer_table1_->name(), 0 /* max_tablets */, &producer_tablet_ids_, NULL));
ASSERT_EQ(producer_tablet_ids_.size(), kTabletCount);

std::vector<TabletId> producer_table2_tablet_ids;
ASSERT_OK(producer_cluster_.client_->GetTablets(
producer_table_2->name(), 0 /* max_tablets */, &producer_table2_tablet_ids, NULL));
producer_table2_->name(), 0 /* max_tablets */, &producer_table2_tablet_ids, NULL));
ASSERT_EQ(producer_table2_tablet_ids.size(), 1);
producer_tablet_ids_.push_back(producer_table2_tablet_ids[0]);

YBTableName global_tran_table_name(
YBTableName producer_tran_table_name(
YQL_DATABASE_CQL, master::kSystemNamespaceName, kGlobalTransactionsTableName);
std::shared_ptr<YBTable> global_tran_table;
ASSERT_OK(producer_client()->OpenTable(global_tran_table_name, &global_tran_table));
ASSERT_OK(producer_client()->OpenTable(producer_tran_table_name, &producer_tran_table_));
ASSERT_OK(producer_cluster_.client_->GetTablets(
global_tran_table_name, 0 /* max_tablets */, &global_tran_tablet_ids_, NULL));
producer_tran_table_name, 0 /* max_tablets */, &consumer_tran_tablet_ids_, NULL));

ASSERT_OK(PreReplicationSetup());

ASSERT_OK(SetupUniverseReplication({producer_table_, producer_table_2}));
ASSERT_OK(SetupUniverseReplication({producer_table1_, producer_table2_}));

// Verify that universe was setup on consumer.
master::GetUniverseReplicationResponsePB resp;
Expand All @@ -374,27 +373,29 @@ class XClusterSafeTimeYsqlTest : public TwoDCTestBase {
ASSERT_OK(ChangeXClusterRole(cdc::XClusterRole::STANDBY));

master::ListCDCStreamsResponsePB stream_resp;
ASSERT_OK(GetCDCStreamForTable(producer_table_->id(), &stream_resp));
ASSERT_OK(GetCDCStreamForTable(producer_table1_->id(), &stream_resp));
ASSERT_EQ(stream_resp.streams_size(), 1);
ASSERT_EQ(stream_resp.streams(0).table_id().Get(0), producer_table_->id());
ASSERT_EQ(stream_resp.streams(0).table_id().Get(0), producer_table1_->id());
stream_ids_.push_back(stream_resp.streams(0).stream_id());

ASSERT_OK(GetCDCStreamForTable(producer_table_2->id(), &stream_resp));
ASSERT_OK(GetCDCStreamForTable(producer_table2_->id(), &stream_resp));
ASSERT_EQ(stream_resp.streams_size(), 1);
ASSERT_EQ(stream_resp.streams(0).table_id().Get(0), producer_table_2->id());
ASSERT_EQ(stream_resp.streams(0).table_id().Get(0), producer_table2_->id());
stream_ids_.push_back(stream_resp.streams(0).stream_id());

ASSERT_OK(GetCDCStreamForTable(global_tran_table->id(), &stream_resp));
ASSERT_OK(GetCDCStreamForTable(producer_tran_table_->id(), &stream_resp));
ASSERT_EQ(stream_resp.streams_size(), 1);
ASSERT_EQ(stream_resp.streams(0).table_id().Get(0), global_tran_table->id());
ASSERT_EQ(stream_resp.streams(0).table_id().Get(0), producer_tran_table_->id());
stream_ids_.push_back(stream_resp.streams(0).stream_id());

ASSERT_OK(CorrectlyPollingAllTablets(
consumer_cluster(),
kTabletCount + 1 + static_cast<uint32_t>(global_tran_tablet_ids_.size())));
ASSERT_OK(WaitForValidSafeTimeOnAllTServers());
kTabletCount + 1 + static_cast<uint32_t>(consumer_tran_tablet_ids_.size())));
ASSERT_OK(PostReplicationSetup());
}

virtual Status PreReplicationSetup() { return OK(); }

Status GetNamespaceId() {
master::GetNamespaceInfoResponsePB resp;

Expand Down Expand Up @@ -452,12 +453,12 @@ class XClusterSafeTimeYsqlTest : public TwoDCTestBase {
return PQntuples(results.get());
}

Status WaitForValidSafeTimeOnAllTServers() {
virtual Status PostReplicationSetup() {
// Wait till we have a valid safe time on all tservers.
for (auto& tserver : consumer_cluster()->mini_tablet_servers()) {
RETURN_NOT_OK(WaitFor(
[&]() -> Result<bool> {
auto safe_time =
tserver->server()->GetXClusterSafeTimeMap().GetSafeTime(namespace_id_);
auto safe_time = tserver->server()->GetXClusterSafeTimeMap().GetSafeTime(namespace_id_);
if (!safe_time) {
return false;
}
Expand Down Expand Up @@ -585,26 +586,26 @@ class XClusterSafeTimeYsqlTest : public TwoDCTestBase {
protected:
MonoDelta safe_time_propagation_timeout_;
std::vector<string> stream_ids_;
std::shared_ptr<client::YBTable> producer_table_, producer_table_2;
std::shared_ptr<client::YBTable> consumer_table_, consumer_table_2;
std::shared_ptr<client::YBTable> producer_table1_, producer_table2_, producer_tran_table_;
std::shared_ptr<client::YBTable> consumer_table1_, consumer_table2_;
std::vector<TabletId> producer_tablet_ids_;
std::vector<TabletId> global_tran_tablet_ids_;
std::vector<TabletId> consumer_tran_tablet_ids_;
string namespace_id_;
std::map<string, uint64_t> producer_tablet_read_time_;
};

TEST_F(XClusterSafeTimeYsqlTest, ConsistentReads) {
TEST_F(XClusterConsistencyTest, ConsistentReads) {
constexpr auto kNumRecordsPerBatch = 10;
CHECK_GE(FLAGS_cdc_max_apply_batch_num_records, kNumRecordsPerBatch);
uint32_t num_records_written = 0;
StoreReadTimes();

WriteWorkload(producer_table_->name(), 0, kNumRecordsPerBatch);
WriteWorkload(producer_table1_->name(), 0, kNumRecordsPerBatch);
num_records_written += kNumRecordsPerBatch;

// Verify data is written on the consumer.
ASSERT_OK(WaitForRowCount(consumer_table_->name(), num_records_written));
ASSERT_OK(ValidateConsumerRows(consumer_table_->name(), num_records_written));
ASSERT_OK(WaitForRowCount(consumer_table1_->name(), num_records_written));
ASSERT_OK(ValidateConsumerRows(consumer_table1_->name(), num_records_written));
ASSERT_EQ(CountTabletsWithNewReadTimes(), kTabletCount);

// Get the initial regular rpo.
Expand All @@ -614,28 +615,28 @@ TEST_F(XClusterSafeTimeYsqlTest, ConsistentReads) {
// Pause replication on only 1 tablet.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_xcluster_simulated_lag_tablet_filter) =
producer_tablet_ids_[0];
SetAtomicFlag(-1, &FLAGS_TEST_xcluster_simulated_lag_ms);
ASSERT_OK(SET_FLAG(TEST_xcluster_simulated_lag_ms, -1));
// Wait for in flight GetChanges.
SleepFor(2s * kTimeMultiplier);
StoreReadTimes();

// Write a batch of 100 in one transaction in table1 and a single row without a transaction in
// table2.
WriteWorkload(producer_table_->name(), kNumRecordsPerBatch, 2 * kNumRecordsPerBatch);
WriteWorkload(producer_table_2->name(), 0, 1);
WriteWorkload(producer_table1_->name(), kNumRecordsPerBatch, 2 * kNumRecordsPerBatch);
WriteWorkload(producer_table2_->name(), 0, 1);

// Verify none of the new rows in either table are visible.
ASSERT_NOK(
WaitForRowCount(consumer_table_->name(), num_records_written + 1, true /*allow_greater*/));
ASSERT_NOK(WaitForRowCount(consumer_table_2->name(), 1, true /*allow_greater*/));
WaitForRowCount(consumer_table1_->name(), num_records_written + 1, true /*allow_greater*/));
ASSERT_NOK(WaitForRowCount(consumer_table2_->name(), 1, true /*allow_greater*/));

// 2 tablets of table 1 and 1 tablet from table2 should still contain the rows.
ASSERT_EQ(CountTabletsWithNewReadTimes(), kTabletCount);
ASSERT_OK(ValidateConsumerRows(consumer_table_->name(), num_records_written));
ASSERT_OK(ValidateConsumerRows(consumer_table1_->name(), num_records_written));

// Reading latest data should return a subset of the rows but not all rows.
const auto latest_row_count =
ASSERT_RESULT(GetRowCount(consumer_table_->name(), &consumer_cluster_, true /*read_latest*/));
const auto latest_row_count = ASSERT_RESULT(
GetRowCount(consumer_table1_->name(), &consumer_cluster_, true /*read_latest*/));
ASSERT_GT(latest_row_count, num_records_written);
ASSERT_LT(latest_row_count, num_records_written + kNumRecordsPerBatch);

Expand All @@ -646,12 +647,12 @@ TEST_F(XClusterSafeTimeYsqlTest, ConsistentReads) {
ASSERT_GT(high_rpo, MonoDelta::FromSeconds(kWaitForRowCountTimeout).ToMicroseconds());

// Resume replication and verify all data is written on the consumer.
SetAtomicFlag(0, &FLAGS_TEST_xcluster_simulated_lag_ms);
ASSERT_OK(SET_FLAG(TEST_xcluster_simulated_lag_ms, 0));
num_records_written += kNumRecordsPerBatch;
ASSERT_OK(WaitForRowCount(consumer_table_->name(), num_records_written));
ASSERT_OK(ValidateConsumerRows(consumer_table_->name(), num_records_written));
ASSERT_OK(WaitForRowCount(consumer_table_2->name(), 1));
ASSERT_OK(ValidateConsumerRows(consumer_table_2->name(), 1));
ASSERT_OK(WaitForRowCount(consumer_table1_->name(), num_records_written));
ASSERT_OK(ValidateConsumerRows(consumer_table1_->name(), num_records_written));
ASSERT_OK(WaitForRowCount(consumer_table2_->name(), 1));
ASSERT_OK(ValidateConsumerRows(consumer_table2_->name(), 1));
ASSERT_EQ(CountTabletsWithNewReadTimes(), kTabletCount + 1);

// Check that safe time rpo has dropped again.
Expand All @@ -660,36 +661,92 @@ TEST_F(XClusterSafeTimeYsqlTest, ConsistentReads) {
ASSERT_LT(final_rpo, high_rpo);
}

TEST_F(XClusterSafeTimeYsqlTest, LagInTransactionsTable) {
TEST_F(XClusterConsistencyTest, LagInTransactionsTable) {
constexpr auto kNumRecordsPerBatch = 10;
CHECK_GE(FLAGS_cdc_max_apply_batch_num_records, kNumRecordsPerBatch);

// Pause replication on global transactions table
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_xcluster_simulated_lag_tablet_filter) =
JoinStrings(global_tran_tablet_ids_, ",");
SetAtomicFlag(-1, &FLAGS_TEST_xcluster_simulated_lag_ms);
JoinStrings(consumer_tran_tablet_ids_, ",");
ASSERT_OK(SET_FLAG(TEST_xcluster_simulated_lag_ms, -1));
// Wait for in flight GetChanges
SleepFor(2s * kTimeMultiplier);
StoreReadTimes();

// Write a batch of 100 in one transaction in table1 and a single row without a transaction in
// table2
WriteWorkload(producer_table_->name(), 0, kNumRecordsPerBatch);
WriteWorkload(producer_table_2->name(), 0, 1);
WriteWorkload(producer_table1_->name(), 0, kNumRecordsPerBatch);
WriteWorkload(producer_table2_->name(), 0, 1);

// Verify none of the new rows in either table are visible
ASSERT_NOK(WaitForRowCount(consumer_table_->name(), 1, true /*allow_greater*/));
ASSERT_NOK(WaitForRowCount(consumer_table_2->name(), 1, true /*allow_greater*/));
ASSERT_NOK(WaitForRowCount(consumer_table1_->name(), 1, true /*allow_greater*/));
ASSERT_NOK(WaitForRowCount(consumer_table2_->name(), 1, true /*allow_greater*/));

// 3 tablets of table 1 and 1 tablet from table2 should still contain the rows
ASSERT_EQ(CountTabletsWithNewReadTimes(), kTabletCount + 1);

// Resume replication and verify all data is written on the consumer
SetAtomicFlag(0, &FLAGS_TEST_xcluster_simulated_lag_ms);
ASSERT_OK(WaitForRowCount(consumer_table_->name(), kNumRecordsPerBatch));
ASSERT_OK(ValidateConsumerRows(consumer_table_->name(), kNumRecordsPerBatch));
ASSERT_OK(WaitForRowCount(consumer_table_2->name(), 1));
ASSERT_OK(ValidateConsumerRows(consumer_table_2->name(), 1));
ASSERT_OK(SET_FLAG(TEST_xcluster_simulated_lag_ms, 0));
ASSERT_OK(WaitForRowCount(consumer_table1_->name(), kNumRecordsPerBatch));
ASSERT_OK(ValidateConsumerRows(consumer_table1_->name(), kNumRecordsPerBatch));
ASSERT_OK(WaitForRowCount(consumer_table2_->name(), 1));
ASSERT_OK(ValidateConsumerRows(consumer_table2_->name(), 1));
}

class XClusterConsistencyNoSafeTimeTest : public XClusterConsistencyTest {
public:
void SetUp() override {
ASSERT_OK(SET_FLAG(TEST_xcluster_simulated_lag_ms, -1));
XClusterConsistencyTest::SetUp();
}

Status PreReplicationSetup() override {
// Write some custom data to system tables so that we have some rows to scan there.
auto conn = VERIFY_RESULT(consumer_cluster_.ConnectToDB(
consumer_table1_->name().namespace_name(), true /*simple_query_protocol*/));

return conn.Execute("CREATE USER clock WITH PASSWORD 'clock'");
}

Status PostReplicationSetup() override {
// Wait till we get "XCluster safe time not yet initialized" error on all t-servers. This
// ensures we got some safe time data but the namespace alone lacks a valid safe time.
for (auto& tserver : consumer_cluster()->mini_tablet_servers()) {
RETURN_NOT_OK(WaitFor(
[&]() -> Result<bool> {
auto safe_time = tserver->server()->GetXClusterSafeTimeMap().GetSafeTime(namespace_id_);
CHECK(!safe_time);

if (safe_time.status().ToString().find(
"XCluster safe time not yet initialized for namespace") != string::npos) {
return true;
}
return false;
},
safe_time_propagation_timeout_,
Format("Wait for safe_time of namespace $0 to be in-valid", namespace_id_)));
}

return OK();
}
};

TEST_F_EX(XClusterConsistencyTest, LoginWithNoSafeTime, XClusterConsistencyNoSafeTimeTest) {
// Verify that we can login and query a catalog table.
auto conn = ASSERT_RESULT(consumer_cluster_.ConnectToDB(
consumer_table1_->name().namespace_name(), true /*simple_query_protocol*/));

auto result = ASSERT_RESULT(conn.Fetch("SELECT relname FROM pg_catalog.pg_class LIMIT 1;"));

// Verify that we can't read user table with default consistency.
const auto query = Format("SELECT * FROM $0", GetCompleteTableName(consumer_table1_->name()));

ASSERT_NOK(conn.Execute(query));

// Verify that we can't read user table with tablet level consistency.
ASSERT_OK(conn.Execute("SET yb_xcluster_consistency_level = tablet"));
result = ASSERT_RESULT(conn.Fetch(query));
}

} // namespace enterprise
} // namespace yb
11 changes: 7 additions & 4 deletions src/yb/yql/pggate/pg_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,13 @@ Result<PerformFuture> PgSession::Perform(
}
options.set_force_global_transaction(global_transaction);

// For DDLs we always read latest data.
options.set_use_xcluster_database_consistency(
!pg_txn_manager_->IsDdlMode() &&
yb_xcluster_consistency_level == XCLUSTER_CONSISTENCY_DATABASE);
auto use_xcluster_database_consistency =
yb_xcluster_consistency_level == XCLUSTER_CONSISTENCY_DATABASE;
if (use_catalog_session || pg_txn_manager_->IsDdlMode()) {
// For Catalog sessions and DDLs we always want to read our universe's latest data.
use_xcluster_database_consistency = false;
}
options.set_use_xcluster_database_consistency(use_xcluster_database_consistency);

auto promise = std::make_shared<std::promise<PerformResult>>();

Expand Down

0 comments on commit 27fa5c5

Please sign in to comment.