From d0841642ab1c9126cc2c02614bca926393e923a3 Mon Sep 17 00:00:00 2001 From: Piyush Jain Date: Wed, 4 May 2022 15:43:19 -0700 Subject: [PATCH] [#12184] YSQL: Flush and wait for buffered ops before executing statements with non-transactional side-effects Summary: YSQL buffers tserver operations (writes specifically) unless it hits some condition that forces it to flush the buffer and wait for the response. Some conditions that force waiting for a buffer's response are - completion of a txn, completion of an exception handling block in plpgsql, a read operation, or a write to a key which already has a buffered write. With buffering, execution can move on to later statements unless a flush and response wait is required based on the conditions similar to those mentioned above. For example - with autocommit mode off, writes for a statement (like INSERT/UPDATE) are not flushed until required. Instead, they are buffered. This is okay because, even before flushing, YSQL knows the number of inserts/updates to be done and returns that number to the user client (as "INSERT x"). If an error occurs in the rpc, it will anyway be caught in some later flush before the txn commits (this was an optimzation introdcued in D7782). Allowing execution to move on to later statements without waiting for the actual work to be done on the tablet servers helps improve performance by buffering and reduce latency. But, the downside is that any database violations associated with the buffered operations might be observed after execution has moved to later statements. The downside is not an issue in many cases -- such as the optimization mentioned above to not wait for writes of a DML in a transaction to complete. But, this can cause issues with exception blocks as seen in gh issue #12184. Incorrect behaviour is observed when an exception, that occurs due to some statement's rpcs, is seen after some non-reversible side-effect(s) have also been executed. Non-reversible side-effects are those which don't modify the database and hence can't be undone when an exception is caught, or when a txn is rolled back. E.g: create table test (k int primary key, v int); insert into test values (1, 1); create function ... returns table(z text) begin z := "before insert" insert into test values (1, 1); -- operation is buffered, will result in duplicate key violation return next; -- statement sends "before insert" to user. This is a non-transactional side effect of which can't be undone exception end; The fix is: buffered operations should be flushed and waited for before executing any statements with non-reversible side effects in plpgsql. Test Plan: ./yb_build.sh --java-test org.yb.pgsql.TestPgRegressBufferingInPlpgsql#testPgRegressBufferingInPlpgsql Reviewers: dmitry, mtakahara Subscribers: yql Differential Revision: https://phabricator.dev.yugabyte.com/D16809 --- .../TestPgRegressBufferingInPlpgsql.java | 33 +++++ src/postgres/src/backend/executor/functions.c | 7 + src/postgres/src/pl/plpgsql/src/pl_exec.c | 33 +++++ .../expected/yb_buffering_in_plpgsql.out | 136 ++++++++++++++++++ .../regress/sql/yb_buffering_in_plpgsql.sql | 121 ++++++++++++++++ .../regress/yb_buffering_in_plpgsql_schedule | 6 + src/yb/yql/pgwrapper/pg_op_buffering-test.cc | 36 ++++- 7 files changed, 370 insertions(+), 2 deletions(-) create mode 100644 java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgRegressBufferingInPlpgsql.java create mode 100644 src/postgres/src/test/regress/expected/yb_buffering_in_plpgsql.out create mode 100644 src/postgres/src/test/regress/sql/yb_buffering_in_plpgsql.sql create mode 100644 src/postgres/src/test/regress/yb_buffering_in_plpgsql_schedule diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgRegressBufferingInPlpgsql.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgRegressBufferingInPlpgsql.java new file mode 100644 index 000000000000..ed3b6d1b3d66 --- /dev/null +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgRegressBufferingInPlpgsql.java @@ -0,0 +1,33 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// +package org.yb.pgsql; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.yb.util.YBTestRunnerNonTsanOnly; + +/** + * Runs the pg_regress test suite on YB code. + */ +@RunWith(value=YBTestRunnerNonTsanOnly.class) +public class TestPgRegressBufferingInPlpgsql extends BasePgSQLTest { + @Override + public int getTestMethodTimeoutSec() { + return 180; + } + + @Test + public void testPgRegressBufferingInPlpgsql() throws Exception { + runPgRegressTest("yb_buffering_in_plpgsql_schedule"); + } +} diff --git a/src/postgres/src/backend/executor/functions.c b/src/postgres/src/backend/executor/functions.c index ba17e5503276..508f0f55177b 100644 --- a/src/postgres/src/backend/executor/functions.c +++ b/src/postgres/src/backend/executor/functions.c @@ -1152,6 +1152,13 @@ fmgr_sql(PG_FUNCTION_ARGS) UpdateActiveSnapshotCommandId(); } + elog(LOG, "Piyush - flushing buffered ops before next statement in an SQL language function"); + // TODO(Piyush): Flush always except for safe-situations. + if (es->stmt->commandType == CMD_UPDATE || + es->stmt->commandType == CMD_INSERT || + es->stmt->commandType == CMD_DELETE) + YBFlushBufferedOperations(); + postquel_start(es, fcache); } else if (!fcache->readonly_func && !pushed_snapshot) diff --git a/src/postgres/src/pl/plpgsql/src/pl_exec.c b/src/postgres/src/pl/plpgsql/src/pl_exec.c index c9b82da2c381..c5d7d8f33ae7 100644 --- a/src/postgres/src/pl/plpgsql/src/pl_exec.c +++ b/src/postgres/src/pl/plpgsql/src/pl_exec.c @@ -47,6 +47,7 @@ #include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/typcache.h" +#include "yb/common/ybc_util.h" #include "plpgsql.h" @@ -1874,6 +1875,22 @@ exec_stmts(PLpgSQL_execstate *estate, List *stmts) foreach(s, stmts) { PLpgSQL_stmt *stmt = (PLpgSQL_stmt *) lfirst(s); + + /* + * Flush buffered operations before executing a statement which might have + * non-transactional side-effects that won't be reverted in case the + * buffered operations (i.e., from previous statements) lead to an + * exception. + */ + if (stmt->cmd_type != PLPGSQL_STMT_EXECSQL) { + /* + * PLPGSQL_STMT_EXECSQL commands require flushing for everything except + * UPDATE/ INSERT and DELETE. So the handling for that is present in + * exec_stmt_execsql(). + */ + YBFlushBufferedOperations(); + } + int rc = exec_stmt(estate, stmt); if (rc != PLPGSQL_RC_OK) @@ -4157,6 +4174,21 @@ exec_stmt_execsql(PLpgSQL_execstate *estate, stmt->mod_stmt_set = true; } + /* + * Flush buffered operations before executing a statement which might have + * non-transactional side-effects that won't be reverted in case the buffered + * operations (i.e., from previous statements) lead to an exception. + * + * UPDATE/ INSERT and DELETE have only transactional side-effects. If they + * call other functions which might have some statements that lead to + * non-transactional side-effects, a flush will be performed there (e.g. if + * the function called is a plpgsql function, the YBFlushBufferedOperations + * in exec_stmts will take care of flushing before non-transactional work is + * performed). + */ + if (!stmt->mod_stmt) + YBFlushBufferedOperations(); + /* * Set up ParamListInfo to pass to executor */ @@ -5868,6 +5900,7 @@ static int exec_run_select(PLpgSQL_execstate *estate, PLpgSQL_expr *expr, long maxtuples, Portal *portalP) { + elog(LOG, "Piyush - exec_run_select()"); ParamListInfo paramLI; int rc; diff --git a/src/postgres/src/test/regress/expected/yb_buffering_in_plpgsql.out b/src/postgres/src/test/regress/expected/yb_buffering_in_plpgsql.out new file mode 100644 index 000000000000..b071d7003d43 --- /dev/null +++ b/src/postgres/src/test/regress/expected/yb_buffering_in_plpgsql.out @@ -0,0 +1,136 @@ +-- +-- Tests to ensure YSQL buffering (i.e., logic in pg_operation_buffer.cc and +-- related files) doesn't break any semantics of PLPGSQL. +-- +-- ***************************************************************************** +-- * Exception handling in PLPGSQL +-- ***************************************************************************** +-- +-- PLPGSQL exception blocks allow a user to execute a block of statements such +-- that if an error occurs, the changes to the database are undone/ reverted and +-- user-specified error-handling is invoked. +-- +-- The changes to the database are undone as follows - an internal savepoint is +-- registered before any code in the exception block is executed. If any error +-- occurs, it is caught and the savepoint is rolled back and released. This +-- helps revert any modifications to the database. +-- +-- However, there are some statements which don't modify the database, but have +-- other side-effects. These are not undone even if an exception occurs. A +-- simple example of this is the "return next;" statement in plpgsql. Once data +-- is sent to the user it can't be undone. +-- ***************************************************************************** +-- +-- ***************************************************************************** +-- * YSQL Buffering +-- ***************************************************************************** +-- YSQL buffers operations to tserver (writes specifically) unless it hits some +-- condition that forces it to flush the buffer and wait for the response. Some +-- conditions that force waiting for a buffer response are - completion of a txn, +-- completion of an exception handling block in plpgsql, a read operation, or a +-- write to a key which already has a buffered write. +-- +-- With buffering, execution can move on to later statements unless a flush and +-- response wait is required based on the conditions above. For example - with +-- autocommit mode off, writes for a statement (like INSERT/UPDATE) are not +-- flushed until required. Instead, they are buffered. This is okay because, +-- even before flushing, we would know the number of inserts/updates to be done +-- and return that number to the user client (as "INSERT x"). If an error occurs +-- in the rpc, it will anyway be caught in some later flush, but before the txn +-- commits. +-- +-- Allowing execution to move on to later statements without waiting for the +-- actual work to be done on the tablet servers helps improve performance by +-- buffering and reduce latency. +-- ***************************************************************************** +-- +-- +-- As seen in gh issue #12184, incorrect behaviour is observed with YSQL +-- buffering when an exception that occurs due to some statement's rpcs is seen +-- after a later statement which has non-reversible side-effect(s) has also been +-- executed. Buffered operations should be flushed and waited for before +-- executing any statements with non-reversible side effects in the same +-- transaction. The following tests ensure this for the various cases. +-- +-- 1(a) PL/pgsql: ensure statements with non-reversible side effects (i.e., non +-- transactional work) are not executed if an ealier statement caused an +-- exception. +create table t(k serial primary key, v varchar(100) not null); +create unique index t_v_unq on t(v); +insert into t(v) values ('dog'), ('cat'), ('frog'); +create or replace function f(new_v in text) + returns table(z text) + language plpgsql +as $body$ +begin + begin + z := 'return next was executed after insert, this was not expected'; + insert into t(v) values (new_v); + return next; + exception + when unique_violation then + z := 'unique_violation'; return next; + when others then + raise; + end; +end; +$body$; +select f('dog'); + f +------------------ + unique_violation +(1 row) + +-- 1(b) PL/pgsql: same case as 1(a) but the statement that does non-reversible +-- side effects (i.e., non-transactional work) is in a nested function. +create or replace function f_outer() + returns table(z text) + language plpgsql +as $body$ +begin + begin + insert into t(v) select f('dog'); + exception + when unique_violation then + z := 'unique_violation'; return next; + when others then + raise; + end; +end; +$body$; +select f_outer(); + f_outer +--------- +(0 rows) + +select * from t; + k | v +---+------------------ + 1 | dog + 6 | unique_violation + 2 | cat + 3 | frog +(4 rows) + +-- 2. SQL functions: ensure statements with non-reversible side effects (i.e., +-- non transactional work) are not executed if an ealier statement caused an +-- exception. +prepare dummy_query as select * from t; +create or replace function f(new_v in text) + returns table(z text) + language sql +as $body$ + insert into t(v) values ('dog'); + deallocate dummy_query; + select v from t; +$body$; +select f('dog'); +ERROR: duplicate key value violates unique constraint "t_v_unq" +execute dummy_query; -- this should find the prepared statement and run fine + k | v +---+------------------ + 1 | dog + 6 | unique_violation + 2 | cat + 3 | frog +(4 rows) \ No newline at end of file diff --git a/src/postgres/src/test/regress/sql/yb_buffering_in_plpgsql.sql b/src/postgres/src/test/regress/sql/yb_buffering_in_plpgsql.sql new file mode 100644 index 000000000000..72c92380508d --- /dev/null +++ b/src/postgres/src/test/regress/sql/yb_buffering_in_plpgsql.sql @@ -0,0 +1,121 @@ +-- +-- Tests to ensure YSQL buffering (i.e., logic in pg_operation_buffer.cc and +-- related files) doesn't break any semantics of PLPGSQL. +-- +-- ***************************************************************************** +-- * Exception handling in PLPGSQL +-- ***************************************************************************** +-- +-- PLPGSQL exception blocks allow a user to execute a block of statements such +-- that if an error occurs, the changes to the database are undone/ reverted and +-- user-specified error-handling is invoked. +-- +-- The changes to the database are undone as follows - an internal savepoint is +-- registered before any code in the exception block is executed. If any error +-- occurs, it is caught and the savepoint is rolled back and released. This +-- helps revert any modifications to the database. +-- +-- However, there are some statements which don't modify the database, but have +-- other side-effects. These are not undone even if an exception occurs. A +-- simple example of this is the "return next;" statement in plpgsql. Once data +-- is sent to the user it can't be undone. +-- ***************************************************************************** +-- +-- ***************************************************************************** +-- * YSQL Buffering +-- ***************************************************************************** +-- YSQL buffers operations to tserver (writes specifically) unless it hits some +-- condition that forces it to flush the buffer and wait for the response. Some +-- conditions that force waiting for a buffer response are - completion of a txn, +-- completion of an exception handling block in plpgsql, a read operation, or a +-- write to a key which already has a buffered write. +-- +-- With buffering, execution can move on to later statements unless a flush and +-- response wait is required based on the conditions above. For example - with +-- autocommit mode off, writes for a statement (like INSERT/UPDATE) are not +-- flushed until required. Instead, they are buffered. This is okay because, +-- even before flushing, we would know the number of inserts/updates to be done +-- and return that number to the user client (as "INSERT x"). If an error occurs +-- in the rpc, it will anyway be caught in some later flush, but before the txn +-- commits. +-- +-- Allowing execution to move on to later statements without waiting for the +-- actual work to be done on the tablet servers helps improve performance by +-- buffering and reduce latency. +-- ***************************************************************************** +-- +-- +-- As seen in gh issue #12184, incorrect behaviour is observed with YSQL +-- buffering when an exception that occurs due to some statement's rpcs is seen +-- after a later statement which has non-reversible side-effect(s) has also been +-- executed. Buffered operations should be flushed and waited for before +-- executing any statements with non-reversible side effects in the same +-- transaction. The following tests ensure this for the various cases. +-- +-- 1(a) PL/pgsql: ensure statements with non-reversible side effects (i.e., non +-- transactional work) are not executed if an ealier statement caused an +-- exception. + +create table t(k serial primary key, v varchar(100) not null); +create unique index t_v_unq on t(v); +insert into t(v) values ('dog'), ('cat'), ('frog'); + +create or replace function f(new_v in text) + returns table(z text) + language plpgsql +as $body$ +begin + begin + z := 'return next was executed after insert, this was not expected'; + insert into t(v) values (new_v); + return next; + exception + when unique_violation then + z := 'unique_violation'; return next; + when others then + raise; + end; +end; +$body$; + +select f('dog'); + +-- 1(b) PL/pgsql: same case as 1(a) but the statement that does non-reversible +-- side effects (i.e., non-transactional work) is in a nested function. + +create or replace function f_outer() + returns table(z text) + language plpgsql +as $body$ +begin + begin + insert into t(v) select f('dog'); + exception + when unique_violation then + z := 'unique_violation'; return next; + when others then + raise; + end; +end; +$body$; + +select f_outer(); +select * from t; + +-- 2. SQL functions: ensure statements with non-reversible side effects (i.e., +-- non transactional work) are not executed if an ealier statement caused an +-- exception. + +prepare dummy_query as select * from t; + +create or replace function f(new_v in text) + returns table(z text) + language sql +as $body$ + insert into t(v) values ('dog'); + deallocate dummy_query; + select v from t; +$body$; + +select f('dog'); +execute dummy_query; -- this should find the prepared statement and run fine diff --git a/src/postgres/src/test/regress/yb_buffering_in_plpgsql_schedule b/src/postgres/src/test/regress/yb_buffering_in_plpgsql_schedule new file mode 100644 index 000000000000..44e7e6b67b10 --- /dev/null +++ b/src/postgres/src/test/regress/yb_buffering_in_plpgsql_schedule @@ -0,0 +1,6 @@ +# src/test/regress/yb_buffering_in_plpgsql_schedule +# +#################################################################################################### +# This includes tests related to buffering in PL/pgSQL. +#################################################################################################### +test: yb_buffering_in_plpgsql diff --git a/src/yb/yql/pgwrapper/pg_op_buffering-test.cc b/src/yb/yql/pgwrapper/pg_op_buffering-test.cc index c7078c20444f..5421b301b6d3 100644 --- a/src/yb/yql/pgwrapper/pg_op_buffering-test.cc +++ b/src/yb/yql/pgwrapper/pg_op_buffering-test.cc @@ -114,14 +114,26 @@ TEST_F(PgOpBufferingTest, YB_DISABLE_TEST_IN_TSAN(GeneralOptimization)) { " INSERT INTO $0 VALUES (123);" \ "END$$$$;", kTable)); - const auto proc_insert_rpc_count = ASSERT_RESULT(write_rpc_watcher_->Delta([&conn]() { + auto proc_insert_rpc_count = ASSERT_RESULT(write_rpc_watcher_->Delta([&conn]() { + return conn.Execute("CALL test()"); + })); + ASSERT_EQ(proc_insert_rpc_count, 1); + + ASSERT_OK(conn.ExecuteFormat( + "CREATE OR REPLACE PROCEDURE test() LANGUAGE sql AS $$$$" \ + " INSERT INTO $0 SELECT s FROM generate_series(1001, 1005) AS s; " \ + " INSERT INTO $0 VALUES (1011), (1012), (1013);" \ + " INSERT INTO $0 VALUES (1021);" \ + " INSERT INTO $0 VALUES (1022);" \ + " INSERT INTO $0 VALUES (1023);$$$$", kTable)); + proc_insert_rpc_count = ASSERT_RESULT(write_rpc_watcher_->Delta([&conn]() { return conn.Execute("CALL test()"); })); ASSERT_EQ(proc_insert_rpc_count, 1); } // The test checks that buffering mechanism splits operations into batches with respect to -// 'ysql_session_max_batch_size' configuration parameter. This paramenter can be changed via GUC. +// 'ysql_session_max_batch_size' configuration parameter. This parameter can be changed via GUC. TEST_F(PgOpBufferingTest, YB_DISABLE_TEST_IN_TSAN(MaxBatchSize)) { auto conn = ASSERT_RESULT(Connect()); ASSERT_OK(CreateTable(&conn)); @@ -137,6 +149,26 @@ TEST_F(PgOpBufferingTest, YB_DISABLE_TEST_IN_TSAN(MaxBatchSize)) { })); ASSERT_EQ(write_rpc_count, std::ceil(static_cast(items_for_insert) / max_batch_size)); } + + ASSERT_OK(conn.ExecuteFormat("truncate $0", kTable)); + for (size_t i = 0; i < max_insert_count; ++i) { + const auto items_for_insert = i + 1; + const auto write_rpc_count = ASSERT_RESULT(write_rpc_watcher_->Delta( + [&conn, start = i * 100 + 1, end = i * 100 + items_for_insert]() { + RETURN_NOT_OK(conn.ExecuteFormat( + "CREATE OR REPLACE PROCEDURE f() " + "LANGUAGE plpgsql " + "as $$body$$ " + "BEGIN " + " FOR i in $1..$2 LOOP " + " INSERT INTO $0 VALUES (i); " + " END LOOP; " + "END; " + "$$body$$;", kTable, start, end)); + return conn.ExecuteFormat("CALL f()"); + })); + ASSERT_EQ(write_rpc_count, std::ceil(static_cast(items_for_insert) / max_batch_size)); + } } // The test checks that buffering mechanism flushes currently buffered operations in case of