Skip to content

Commit

Permalink
[#20505] YSQL: Batch explicit row-level locking
Browse files Browse the repository at this point in the history
Summary:
The `SELECT … FOR UPDATE` command, when applied to multiple keys, currently locks each row serially. This approach results in increased latency due to multiple round-trip communications (RPC requests) to the DocDB storage layer for lock acquisition. This can significantly impact the performance of applications relying on transactional consistency for multi-row operations.

The primary goal of this revision is to enhance the performance of multi-key `SELECT … FOR UPDATE` queries by implementing a batched locking mechanism. This approach will aggregate lock requests for multiple rows and execute them in a single RPC call to the DocDB layer, thereby reducing latency and improving overall transaction throughput.

This revision plans to apply the optimization to all forms of explicit locking supported by PostgreSQL (`FOR UPDATE, FOR NO KEY UPDATE, FOR SHARE, FOR KEY SHARE`). In terms of implementation, this means buffering operations for many types of `RowMarkType`.

To control the batch size, use the gflag as follows: `SET yb_explicit_row_locking_batch_size = <size>;`, where `<size>` is a positive integer. Note that this flag is set to 1 by default, which disables the feature.

As an example, consider the following table with a single primary key column in which we insert 100 rows:

```
CREATE TABLE tbl (k INT PRIMARY KEY);
INSERT INTO tbl (SELECT i FROM generate_series(1, 100) AS i);
```

Currently, explicitly acquiring row-level locks for all 100 rows results in `Storage Read Requests: 101`, as we are performing one initial read, and then one read for every row we intend to acquire a lock for:

```
yugabyte=# EXPLAIN (ANALYZE, DIST) SELECT * FROM tbl WHERE k <= 100 FOR UPDATE;
                                                QUERY PLAN
-----------------------------------------------------------------------------------------------------------
 LockRows  (cost=0.00..112.50 rows=1000 width=36) (actual time=7.027..156.969 rows=100 loops=1)
   ->  Seq Scan on tbl  (cost=0.00..102.50 rows=1000 width=36) (actual time=3.021..3.606 rows=100 loops=1)
         Remote Filter: (k <= 100)
         Storage Table Read Requests: 1
         Storage Table Read Execution Time: 2.488 ms
         Storage Table Rows Scanned: 100
 Planning Time: 0.098 ms
 Execution Time: 157.274 ms
 Storage Read Requests: 101
 Storage Read Execution Time: 139.504 ms
 Storage Rows Scanned: 200
 Storage Write Requests: 0
 Catalog Read Requests: 0
 Catalog Write Requests: 0
 Storage Flush Requests: 0
 Storage Execution Time: 139.504 ms
 Peak Memory Usage: 24 kB
(17 rows)
```

By reducing the number of RPCs with this optimization, we end up with `Storage Read Requests: 2`. This is because we are performing one initial read request followed by another request for the locks, hence significantly reducting the total execution time:

```
yugabyte=# SET yb_explicit_row_locking_batch_size = 1024;
SET
yugabyte=# EXPLAIN (ANALYZE, DIST) SELECT * FROM tbl WHERE k <= 100 FOR UPDATE;
                                                QUERY PLAN
-----------------------------------------------------------------------------------------------------------
 LockRows  (cost=0.00..112.50 rows=1000 width=36) (actual time=3.883..19.285 rows=100 loops=1)
   ->  Seq Scan on tbl  (cost=0.00..102.50 rows=1000 width=36) (actual time=3.810..4.375 rows=100 loops=1)
         Remote Filter: (k <= 100)
         Storage Table Read Requests: 1
         Storage Table Read Execution Time: 2.532 ms
         Storage Table Rows Scanned: 100
 Planning Time: 0.970 ms
 Execution Time: 19.621 ms
 Storage Read Requests: 2
 Storage Read Execution Time: 2.535 ms
 Storage Rows Scanned: 200
 Storage Write Requests: 0
 Catalog Read Requests: 0
 Catalog Write Requests: 0
 Storage Flush Requests: 0
 Storage Execution Time: 2.535 ms
 Peak Memory Usage: 24 kB
(17 rows)
```
Jira: DB-9512

Test Plan:
Added a new SQL regress test `yb_explicit_row_lock_batching.sql/.out` to `yb_misc_serial4_schedule`, which can be run with the following command:

`./yb_build.sh --java-test 'org.yb.pgsql.TestPgRegressMisc#testPgRegressMiscSerial4'`

The test is based off `yb_explicit_row_lock_planning.sql/.out`, but includes `EXPLAIN (ANALYZE, DIST)` commands with deterministic fields to track the number of requests, ensuring that we are flushing once.
Also, there are some newly added cases, such as:
- Simple `JOIN` with top-level locking
- `JOIN` with leaf-level locking (sub-query)
- When `LIMIT` returns less than filtered query
- Filter on the Postgres side, with `NOW()`

Reviewers: kramanathan, dmitry

Reviewed By: kramanathan, dmitry

Subscribers: yql, smishra, patnaik.balivada

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D32543
  • Loading branch information
andyyu8588 committed Apr 24, 2024
1 parent 55278fe commit 09e46ba
Show file tree
Hide file tree
Showing 20 changed files with 1,925 additions and 64 deletions.
72 changes: 53 additions & 19 deletions src/postgres/src/backend/access/yb_access/yb_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -3616,17 +3616,50 @@ HeapTuple YBCFetchTuple(Relation relation, Datum ybctid)
return tuple;
}

HTSU_Result
YBCLockTuple(Relation relation, Datum ybctid, RowMarkType mode, LockWaitPolicy pg_wait_policy,
EState* estate)
// TODO: Substitute the YBSetRowLockPolicy with this function
static int
YBCGetRowLockPolicy(LockWaitPolicy pg_wait_policy)
{
int docdb_wait_policy;

YBSetRowLockPolicy(&docdb_wait_policy, pg_wait_policy);
return docdb_wait_policy;
}

/*
* The return value of this function depends on whether we are batching or not.
* Currently, batching is enabled if the GUC yb_explicit_row_locking_batch_size > 1
* and the wait policy is not "SKIP LOCKED".
* If we are batching, then the return value is just a placeholder, as we are not
* acquiring the lock on the row before returning.
* Otherwise, the returned HTSU_Result is adjusted in case of an error in acquiring the lock.
*/
HTSU_Result
YBCLockTuple(
Relation relation, Datum ybctid, RowMarkType mode,
LockWaitPolicy pg_wait_policy, EState* estate)
{
int docdb_wait_policy = YBCGetRowLockPolicy(pg_wait_policy);
const YBCPgExplicitRowLockParams lock_params = {
.rowmark = mode,
.pg_wait_policy = pg_wait_policy,
.docdb_wait_policy = docdb_wait_policy};

const Oid relfile_oid = YbGetRelfileNodeId(relation);
const Oid db_oid = YBCGetDatabaseOid(relation);

if (yb_explicit_row_locking_batch_size > 1
&& lock_params.pg_wait_policy != LockWaitSkip)
{
// TODO: Error message requires conversion
HandleYBStatus(YBCAddExplicitRowLockIntent(
relfile_oid, ybctid, db_oid, &lock_params, YBCIsRegionLocal(relation)));
YBCPgAddIntoForeignKeyReferenceCache(relfile_oid, ybctid);
return HeapTupleMayBeUpdated;
}

YBCPgStatement ybc_stmt;
HandleYBStatus(YBCPgNewSelect(YBCGetDatabaseOid(relation),
YbGetRelfileNodeId(relation),
HandleYBStatus(YBCPgNewSelect(db_oid,
relfile_oid,
NULL /* prepare_params */,
YBCIsRegionLocal(relation),
&ybc_stmt));
Expand All @@ -3637,9 +3670,9 @@ YBCLockTuple(Relation relation, Datum ybctid, RowMarkType mode, LockWaitPolicy p

YBCPgExecParameters exec_params = {0};
exec_params.limit_count = 1;
exec_params.rowmark = mode;
exec_params.pg_wait_policy = pg_wait_policy;
exec_params.docdb_wait_policy = docdb_wait_policy;
exec_params.rowmark = lock_params.rowmark;
exec_params.pg_wait_policy = lock_params.pg_wait_policy;
exec_params.docdb_wait_policy = lock_params.docdb_wait_policy;
exec_params.stmt_in_txn_limit_ht_for_reads =
estate->yb_exec_params.stmt_in_txn_limit_ht_for_reads;

Expand All @@ -3651,7 +3684,7 @@ YBCLockTuple(Relation relation, Datum ybctid, RowMarkType mode, LockWaitPolicy p
/*
* Execute the select statement to lock the tuple with given ybctid.
*/
HandleYBStatus(YBCPgExecSelect(ybc_stmt, &exec_params /* exec_params */));
HandleYBStatus(YBCPgExecSelect(ybc_stmt, &exec_params));

bool has_data = false;
Datum *values = NULL;
Expand All @@ -3662,15 +3695,9 @@ YBCLockTuple(Relation relation, Datum ybctid, RowMarkType mode, LockWaitPolicy p
* Below is done to ensure the read request is flushed to tserver.
*/
HandleYBStatus(
YBCPgDmlFetch(
ybc_stmt,
0,
(uint64_t *) values,
nulls,
&syscols,
&has_data));
YBCPgAddIntoForeignKeyReferenceCache(
YbGetRelfileNodeId(relation), ybctid);
YBCPgDmlFetch(
ybc_stmt, 0, (uint64_t *) values, nulls, &syscols, &has_data));
YBCPgAddIntoForeignKeyReferenceCache(relfile_oid, ybctid);
}
PG_CATCH();
{
Expand Down Expand Up @@ -3700,6 +3727,13 @@ YBCLockTuple(Relation relation, Datum ybctid, RowMarkType mode, LockWaitPolicy p
return res;
}

void
YBCFlushTupleLocks()
{
// TODO: Error message requires conversion
HandleYBStatus(YBCFlushExplicitRowLockIntents());
}

/*
* ANALYZE support: take random sample of a YB table data
*/
Expand Down
3 changes: 3 additions & 0 deletions src/postgres/src/backend/executor/execProcnode.c
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,9 @@ ExecShutdownNode(PlanState *node)
case T_HashJoinState:
ExecShutdownHashJoin((HashJoinState *) node);
break;
case T_LockRowsState:
ExecShutdownLockRows((LockRowsState *) node);
break;
default:
break;
}
Expand Down
24 changes: 20 additions & 4 deletions src/postgres/src/backend/executor/nodeLockRows.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,13 @@ ExecLockRows(PlanState *pstate)
break;
}

if (IsYBBackedRelation(erm->relation)) {
test = YBCLockTuple(erm->relation, datum, erm->markType, erm->waitPolicy,
estate);
if (IsYBBackedRelation(erm->relation))
{
test = YBCLockTuple(
erm->relation, datum, erm->markType, erm->waitPolicy, estate);
}
else {
else
{
test = heap_lock_tuple(erm->relation, &tuple,
estate->es_output_cid,
lockmode, erm->waitPolicy, true,
Expand Down Expand Up @@ -529,3 +531,17 @@ ExecReScanLockRows(LockRowsState *node)
if (node->ps.lefttree->chgParam == NULL)
ExecReScan(node->ps.lefttree);
}

/* ----------------------------------------------------------------
* ExecShutdownLockRows
*
* YB: This flushes the explicit row lock buffer once there are no
* more rows to be locked.
*
* ----------------------------------------------------------------
*/
void
ExecShutdownLockRows(LockRowsState *node)
{
YBCFlushTupleLocks();
}
19 changes: 18 additions & 1 deletion src/postgres/src/backend/utils/misc/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ static void assign_tcp_keepalives_count(int newval, void *extra);
static const char *show_tcp_keepalives_idle(void);
static const char *show_tcp_keepalives_interval(void);
static const char *show_tcp_keepalives_count(void);
static bool check_yb_explicit_row_locking_batch_size(int *newval, void **extra, GucSource source);
static bool check_maxconnections(int *newval, void **extra, GucSource source);
static const char *yb_show_maxconnections(void);
static bool check_max_worker_processes(int *newval, void **extra, GucSource source);
Expand Down Expand Up @@ -2617,7 +2618,17 @@ static struct config_int ConfigureNamesInt[] =
1024, 1, INT_MAX,
NULL, NULL, NULL
},

{
{"yb_explicit_row_locking_batch_size", PGC_USERSET, QUERY_TUNING_OTHER,
gettext_noop("Batch size of explicit row locking"),
gettext_noop("Set to 1 to conserve default behavior, "
"batching is disabled by default."),
GUC_NOT_IN_SAMPLE
},
&yb_explicit_row_locking_batch_size,
1, 1, INT_MAX,
check_yb_explicit_row_locking_batch_size, NULL, NULL
},
{
{"default_statistics_target", PGC_USERSET, QUERY_TUNING_OTHER,
gettext_noop("Sets the default statistics target."),
Expand Down Expand Up @@ -12438,6 +12449,12 @@ show_tcp_keepalives_count(void)
return nbuf;
}

static bool
check_yb_explicit_row_locking_batch_size(int *newval, void **extra, GucSource source)
{
return *newval > 0;
}

static bool
check_maxconnections(int *newval, void **extra, GucSource source)
{
Expand Down
1 change: 1 addition & 0 deletions src/postgres/src/include/access/yb_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ extern void ybcIndexCostEstimate(struct PlannerInfo *root, IndexPath *path,
extern HeapTuple YBCFetchTuple(Relation relation, Datum ybctid);
extern HTSU_Result YBCLockTuple(Relation relation, Datum ybctid, RowMarkType mode,
LockWaitPolicy wait_policy, EState* estate);
extern void YBCFlushTupleLocks();

/*
* ANALYZE support: sampling of table data
Expand Down
1 change: 1 addition & 0 deletions src/postgres/src/include/executor/nodeLockRows.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@
extern LockRowsState *ExecInitLockRows(LockRows *node, EState *estate, int eflags);
extern void ExecEndLockRows(LockRowsState *node);
extern void ExecReScanLockRows(LockRowsState *node);
extern void ExecShutdownLockRows(LockRowsState *node);

#endif /* NODELOCKROWS_H */
2 changes: 2 additions & 0 deletions src/postgres/src/include/utils/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ extern int yb_bnl_batch_size;
extern bool yb_bnl_optimize_first_batch;
extern bool yb_bnl_enable_hashing;

extern int yb_explicit_row_locking_batch_size;

extern bool yb_lock_pk_single_rpc;

extern int yb_toast_catcache_threshold;
Expand Down
Loading

0 comments on commit 09e46ba

Please sign in to comment.