Skip to content

Commit

Permalink
[#12254, #11640] YSQL: Support read committed semantics for all non-D…
Browse files Browse the repository at this point in the history
…DL work

Summary:
Till now, we supported READ COMMITTED semantics only for SELECT/ INSERT/ UPDATE
and DELETE statements. This is done by restarting the statement after rolling
back work done by the failed execution of the statement.

With this diff, the logic is allowed for all other statements except DDLs.

TODOs for a later diff:
(1) Move read committed retries to a finer level in functions and procedures i.e., per query (#12958).
This is required for multiple reasons:

(i) Performance - in read committed isolation we can retry just the statement that faced a
kConflict or kReadRestart in the function/ procedure. We don't have to restart the whole
procedure or function.

(ii) We should not be redoing non-transactional side-effects in functions and procedures.
This will be resolved automatically once we perform retries at a statement level in the
func/ proc.

(2) Allow saving a snapshot, switching the another and reusing the saved snapshot in YSQL
Detailed use case in #12959

Test Plan:
./yb_build.sh --java-test org.yb.pgsql.TestPgIsolationRegress#isolationRegress
./yb_build.sh --java-test org.yb.pgsql.TestPgReadCommittedVolatileFuncs#testFunctionSemantics

Reviewers: alex, mtakahara

Reviewed By: mtakahara

Subscribers: lnguyen, bogdan, zyu, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D17440
  • Loading branch information
pkj415 committed Jul 11, 2022
1 parent 1634fb2 commit a224db2
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.

package org.yb.pgsql;

import static org.yb.AssertionWrappers.*;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.yb.util.ThreadUtil;
import org.yb.util.YBTestRunnerNonTsanOnly;

/**
*
* Test to ensure that each statement in a volatile plpgsql function uses a
* new snapshot and all statements in immutable and stable functions use the
* same snapshot.
*
* Quoting from Pg docs (https://www.postgresql.org/docs/current/xfunc-volatility.html):
* "STABLE and IMMUTABLE functions use a snapshot established as of the start of the calling
* query, whereas VOLATILE functions obtain a fresh snapshot at the start of each
* query they execute."
*/
@RunWith(value = YBTestRunnerNonTsanOnly.class)
public class TestPgReadCommittedVolatileFuncs extends BasePgSQLTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestPgReadCommittedVolatileFuncs.class);

@Override
protected Map<String, String> getTServerFlags() {
Map<String, String> flags = super.getTServerFlags();
flags.put("yb_enable_read_committed_isolation", "true");
return flags;
}

private String getFunctionDefinitionStr(String volatilityClass) {
return
"CREATE OR REPLACE FUNCTION " + volatilityClass + "_plpgsql_func() RETURNS TABLE(v int) " +
" AS $$" +
" BEGIN " +
" RETURN QUERY SELECT test.v FROM test WHERE k=1;" +
" PERFORM pg_sleep(2);" +
" RETURN QUERY SELECT test.v FROM test WHERE k=1;" +
" END;" +
" $$ LANGUAGE PLPGSQL " + volatilityClass;
}

@Test
public void testFunctionSemantics() throws Exception {
String[] volatilityClasses = {"VOLATILE", "IMMUTABLE", "STABLE"};
try (Statement statement = connection.createStatement()) {
statement.execute("CREATE TABLE TEST (k INT PRIMARY KEY, v INT)");
statement.execute("INSERT INTO TEST VALUES (1, 0)");
statement.execute(
"CREATE OR REPLACE PROCEDURE update_row() " +
" AS $$" +
" DECLARE " +
" end_time TIMESTAMP;" +
" BEGIN " +
" end_time := NOW() + INTERVAL '5 seconds';" +
" WHILE end_time > NOW() LOOP" +
" UPDATE test SET v = EXTRACT(EPOCH FROM AGE(end_time, NOW())) WHERE k=1;" +
" COMMIT;" +
" END LOOP;" +
" END;" +
" $$ LANGUAGE PLPGSQL");
for (String volatilityClass : volatilityClasses) {
statement.execute(getFunctionDefinitionStr(volatilityClass));
}
}

ExecutorService es = Executors.newFixedThreadPool(4);
List<Future<?>> futures = new ArrayList<>();
List<Runnable> runnables = new ArrayList<>();

runnables.add(() -> {
try (Connection conn =
getConnectionBuilder().withIsolationLevel(IsolationLevel.READ_COMMITTED)
.withAutoCommit(AutoCommit.ENABLED).connect();
Statement stmt = conn.createStatement();) {
stmt.execute("CALL update_row();");
}
catch (Exception ex) {
fail("Failed due to exception: " + ex.getMessage());
}
});

for (String volatilityClass : volatilityClasses) {
runnables.add(() -> {
try (Connection conn =
getConnectionBuilder().withIsolationLevel(IsolationLevel.READ_COMMITTED)
.withAutoCommit(AutoCommit.ENABLED).connect();
Statement stmt = conn.createStatement();) {
ResultSet rs = stmt.executeQuery("SELECT " + volatilityClass + "_plpgsql_func();");
assertTrue(rs.next());
int firstVal = rs.getInt(volatilityClass +"_plpgsql_func");
assertTrue(rs.next());
int secondVal = rs.getInt(volatilityClass + "_plpgsql_func");
if (volatilityClass.equalsIgnoreCase("VOLATILE"))
assertTrue(secondVal != firstVal);
else
assertTrue(secondVal == firstVal);
}
catch (Exception ex) {
fail("Failed due to exception: " + ex.getMessage());
}
});
}

for (Runnable r : runnables) {
futures.add(es.submit(r));
}

try {
LOG.info("Waiting for all threads");
for (Future<?> future : futures) {
future.get(10, TimeUnit.SECONDS);
}
} catch (TimeoutException ex) {
LOG.warn("Threads info:\n\n" + ThreadUtil.getAllThreadsInfo());
fail("Waiting for threads timed out, this is unexpected!");
}

try (Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE test");
statement.execute("DROP PROCEDURE update_row");
for (String volatilityClass : volatilityClasses) {
statement.execute("DROP FUNCTION " + volatilityClass + "_plpgsql_func");
}
}
}
}
2 changes: 0 additions & 2 deletions src/postgres/src/backend/access/transam/xact.c
Original file line number Diff line number Diff line change
Expand Up @@ -2954,8 +2954,6 @@ StartTransactionCommandInternal(bool yb_skip_read_committed_handling)
* READ COMMITTED isolation level.
*/
s->ybDataSentForCurrQuery = false;
HandleYBStatus(YBCPgResetTransactionReadPoint());
elog(DEBUG2, "Resetting read point for statement in Read Committed txn");

/*
* Create a new internal sub txn before any execution. This aids in rolling back any changes
Expand Down
29 changes: 25 additions & 4 deletions src/postgres/src/backend/tcop/postgres.c
Original file line number Diff line number Diff line change
Expand Up @@ -4072,7 +4072,8 @@ static bool
yb_is_restart_possible(const ErrorData* edata,
int attempt,
const YBQueryRestartData* restart_data,
bool* retries_exhausted)
bool* retries_exhausted,
bool* rc_ignoring_ddl_statement)
{
if (!IsYugaByteEnabled())
{
Expand Down Expand Up @@ -4178,9 +4179,18 @@ yb_is_restart_possible(const ErrorData* edata,
bool is_read = strncmp(command_tag, "SELECT", 6) == 0;
bool is_dml = YBIsDmlCommandTag(command_tag);

if (!(is_read || is_dml))
if (IsYBReadCommitted())
{
// As of now, we only support retries with SELECT/UPDATE/INSERT/DELETE. There are other
if (YBGetDdlNestingLevel() != 0) {
if (yb_debug_log_internal_restarts)
elog(LOG, "READ COMMITTED retry semantics don't support DDLs");
*rc_ignoring_ddl_statement = true;
return false;
}
}
else if (!(is_read || is_dml))
{
// if !read committed, we only support retries with SELECT/UPDATE/INSERT/DELETE. There are other
// statements that might result in a kReadRestart/kConflict like CREATE INDEX. We don't retry
// those as of now.
if (yb_debug_log_internal_restarts)
Expand Down Expand Up @@ -4402,8 +4412,10 @@ yb_attempt_to_restart_on_error(int attempt,
MemoryContext error_context = MemoryContextSwitchTo(exec_context);
ErrorData* edata = CopyErrorData();
bool retries_exhausted = false;
bool rc_ignoring_ddl_statement = false;

if (yb_is_restart_possible(edata, attempt, restart_data, &retries_exhausted)) {
if (yb_is_restart_possible(
edata, attempt, restart_data, &retries_exhausted, &rc_ignoring_ddl_statement)) {
if (yb_debug_log_internal_restarts)
{
ereport(LOG,
Expand Down Expand Up @@ -4462,6 +4474,7 @@ yb_attempt_to_restart_on_error(int attempt,
ResourceOwnerNewParent(portal->resowner, NULL);
}

// TODO(read committed): remove this once the feature is GA
Assert(strcmp(GetCurrentTransactionName(), YB_READ_COMMITTED_INTERNAL_SUB_TXN_NAME) == 0);
RollbackAndReleaseCurrentSubTransaction();
BeginInternalSubTransactionForReadCommittedStatement();
Expand Down Expand Up @@ -4533,10 +4546,18 @@ yb_attempt_to_restart_on_error(int attempt,
}
} else {
/* if we shouldn't restart - propagate the error */

if (rc_ignoring_ddl_statement) {
edata->message = psprintf(
"Read Committed txn cannot proceed because of error in DDL. %s", edata->message);
ReThrowError(edata);
}

if (retries_exhausted) {
edata->message = psprintf("%s. %s", "All transparent retries exhausted", edata->message);
ReThrowError(edata);
}

MemoryContextSwitchTo(error_context);
PG_RE_THROW();
}
Expand Down
16 changes: 16 additions & 0 deletions src/postgres/src/backend/utils/time/snapmgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,22 @@ GetTransactionSnapshot(void)

CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);

/*
* YB: We have to RESET read point in YSQL for READ COMMITTED isolation level.
* A read point is analogous to the snapshot in PostgreSQL.
*
* We also need to flush all earlier operations so that they complete on the
* previous snapshot.
*
* READ COMMITTED semantics don't apply to DDLs.
*/
if (IsYBReadCommitted() && YBGetDdlNestingLevel() == 0)
{
elog(DEBUG2, "Resetting read point for statement in Read Committed txn");
HandleYBStatus(YBCPgFlushBufferedOperations());
HandleYBStatus(YBCPgResetTransactionReadPoint());
}

return CurrentSnapshot;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Parsed test spec with 2 sessions

starting permutation: update_k1_in_s2_do update_k1_in_s1_do c2 select c1
step update_k1_in_s2_do: DO $$ BEGIN UPDATE test SET v=v+2 WHERE k=1; END $$;
step update_k1_in_s1_do: DO $$ BEGIN UPDATE test SET v=v*5 WHERE k=1; END $$; <waiting ...>
step c2: COMMIT;
step update_k1_in_s1_do: <... completed>
step select: SELECT * FROM test;
k v

1 15
step c1: COMMIT;

starting permutation: update_k1_in_s2_call update_k1_in_s1_call c2 select c1
step update_k1_in_s2_call: CALL update_k1_in_s2();
step update_k1_in_s1_call: CALL update_k1_in_s1(); <waiting ...>
step c2: COMMIT;
step update_k1_in_s1_call: <... completed>
step select: SELECT * FROM test;
k v

1 15
step c1: COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Tests to ensure that DO and CALL are retried for ensuring Read Committed semantics

setup
{
CREATE TABLE test (k INT PRIMARY KEY, v INT);
INSERT INTO test VALUES (1, 1);
CREATE PROCEDURE update_k1_in_s1() AS $$
BEGIN
UPDATE test SET v=v*5 WHERE k=1;
END $$ LANGUAGE PLPGSQL;
CREATE PROCEDURE update_k1_in_s2() AS $$
BEGIN
UPDATE test SET v=v+2 WHERE k=1;
END $$ LANGUAGE PLPGSQL;
}

teardown
{
DROP PROCEDURE update_k1_in_s1;
DROP PROCEDURE update_k1_in_s2;
DROP TABLE test;
}

session "s1"
setup { BEGIN ISOLATION LEVEL READ COMMITTED; }
step "update_k1_in_s1_do" { DO $$ BEGIN UPDATE test SET v=v*5 WHERE k=1; END $$; }
step "update_k1_in_s1_call" { CALL update_k1_in_s1(); }
step "select" { SELECT * FROM test; }
step "c1" { COMMIT; }

session "s2"
setup { BEGIN ISOLATION LEVEL READ COMMITTED; }
step "update_k1_in_s2_do" { DO $$ BEGIN UPDATE test SET v=v+2 WHERE k=1; END $$; }
step "update_k1_in_s2_call" { CALL update_k1_in_s2(); }
step "c2" { COMMIT; }

permutation "update_k1_in_s2_do" "update_k1_in_s1_do" "c2" "select" "c1"
permutation "update_k1_in_s2_call" "update_k1_in_s1_call" "c2" "select" "c1"
3 changes: 2 additions & 1 deletion src/postgres/src/test/isolation/yb_pg_isolation_schedule
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ test: yb_pg_eval-plan-qual
test: yb_read_committed_update_and_explicit_locking
test: yb_read_committed_insert
test: yb_read_committed_test_internal_savepoint
test: yb_read_committed_test_do_call

# Skip locked related tests from Pg
test: yb_pg_skip-locked
Expand Down Expand Up @@ -30,4 +31,4 @@ test: read-only-anomaly
test: nowait-2
test: nowait-3
test: aborted-keyrevoke
test: delete-abort-savept-2
test: delete-abort-savept-2
5 changes: 5 additions & 0 deletions src/yb/tserver/pg_client_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,11 @@ PgClientSession::SetupSession(const PgPerformRequestPB& req, CoarseTimePoint dea
Transaction(kind) = VERIFY_RESULT(RestartTransaction(session, transaction));
transaction = Transaction(kind).get();
} else {
RSTATUS_DCHECK(
kind == PgClientSessionKind::kPlain ||
options.read_time_manipulation() == ReadTimeManipulation::NONE,
IllegalState,
"Read time manipulation can't be specified for kDdl/ kCatalog transactions");
ProcessReadTimeManipulation(options.read_time_manipulation());
if (options.has_read_time() &&
(options.read_time().has_read_ht() || options.use_catalog_session())) {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/yql/cql/ql/ptree/pt_expr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ Status PTRelationExpr::AnalyzeOperator(SemContext *sem_context,
// Allow only expressions involving columns. Block subscripted/json col+operators.
if (op1->expr_op() != ExprOperator::kRef) {
return sem_context->Error(this,
"Parital index where clause only allows operators on table columns",
"Partial index where clause only allows operators on table columns",
ErrorCode::FEATURE_NOT_SUPPORTED);
}
}
Expand Down
11 changes: 9 additions & 2 deletions src/yb/yql/pggate/pg_txn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ Status PgTxnManager::RestartTransaction() {
/* This is called at the start of each statement in READ COMMITTED isolation level */
Status PgTxnManager::ResetTransactionReadPoint() {
RSTATUS_DCHECK(!ddl_mode_, IllegalState,
"READ COMMITTED semantics don't apply to DDL transactions");
read_time_manipulation_ = tserver::ReadTimeManipulation::RESET;
read_time_for_follower_reads_ = HybridTime();
RETURN_NOT_OK(UpdateReadTimeForFollowerReadsIfRequired());
Expand Down Expand Up @@ -429,8 +431,13 @@ uint64_t PgTxnManager::SetupPerformOptions(tserver::PgPerformOptionsPB* options)
options->set_defer_read_point(true);
need_defer_read_point_ = false;
}
options->set_read_time_manipulation(read_time_manipulation_);
read_time_manipulation_ = tserver::ReadTimeManipulation::NONE;
if (!ddl_mode_) {
// The state in read_time_manipulation_ is only for kPlain transactions. And if YSQL switches to
// kDdl mode for sometime, we should keep read_time_manipulation_ as is so that once YSQL
// switches back to kDdl mode, the read_time_manipulation_ is not lost.
options->set_read_time_manipulation(read_time_manipulation_);
read_time_manipulation_ = tserver::ReadTimeManipulation::NONE;
}
if (read_time_for_follower_reads_) {
ReadHybridTime::SingleTime(read_time_for_follower_reads_).ToPB(options->mutable_read_time());
}
Expand Down

0 comments on commit a224db2

Please sign in to comment.