Skip to content

Commit

Permalink
[BACKPORT 2024.1][#22449] CDCSDK: Introduce support for wal2json plugin
Browse files Browse the repository at this point in the history
Summary:
Original commit: b361d4e / D35521
This diff introduces the support for `wal2json` output plugin with the PG compatible logical replication support.

Key points:

  # A commit from PG (postgres/postgres@87c1dd2) was needed. This will be added in a separate revision (https://phorge.dev.yugabyte.com/D35740)

  # Like test_decoding this plugin does not send the relation object. So the schema refresh callback is a NOOP.

  # The sql and expected files from the wal2json repo and the corresponding PgRegressTests have not been added since it requires support for PG functions such as `pg_logical_slot_get_changes` which is currently unsupported. They will be imported separately as part of issue: #22687

######Backport Description
Minor merge conflicts were encountered in `src/postgres/contrib/Makefile`
Jira: DB-11366

Test Plan:
Jenkins: test regex: .*ReplicationSlot.*

./yb_build.sh --java-test 'org.yb.pgsql.TestPgReplicationSlot#testWithWal2JsonPlugin'

Manually tested with pg_recvlogical

Reviewers: stiwary, asrinivasan, skumar

Reviewed By: stiwary

Subscribers: ycdcxcluster, yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D35751
  • Loading branch information
Sumukh-Phalgaonkar committed Jun 12, 2024
1 parent 8ffb98c commit 650d80a
Show file tree
Hide file tree
Showing 7 changed files with 3,569 additions and 3 deletions.
129 changes: 127 additions & 2 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgReplicationSlot.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private static String toString(ByteBuffer buffer) {
return new String(source, offset, length);
}

private List<String> receiveTestDecodingMessages(PGReplicationStream stream, int count)
private List<String> receiveStringMessages(PGReplicationStream stream, int count)
throws Exception {
List<String> result = new ArrayList<String>(count);
for (int index = 0; index < count; index++) {
Expand Down Expand Up @@ -2165,7 +2165,7 @@ public void testWithTestDecodingPlugin() throws Exception {
.start();

List<String> result = new ArrayList<String>();
result.addAll(receiveTestDecodingMessages(stream, 36));
result.addAll(receiveStringMessages(stream, 36));

List<String> expectedResult = new ArrayList<String>() {
{
Expand Down Expand Up @@ -2505,4 +2505,129 @@ public void testDDLWithRestart() throws Exception {
assertEquals(expectedResult, result);
stream.close();
}

@Test
public void testWithWal2JsonPlugin() throws Exception {
try (Statement stmt = connection.createStatement()) {
stmt.execute("DROP TABLE IF EXISTS t1");
stmt.execute("DROP TABLE IF EXISTS t2");
stmt.execute("DROP TABLE IF EXISTS t3");
stmt.execute("CREATE TABLE t1 (a int primary key, b text, c bool)");
stmt.execute("CREATE TABLE t2 (a int primary key, b text, c bool)");
stmt.execute("CREATE TABLE t3 (a int primary key, b text, c bool)");

// CHANGE is the default but we do it explicitly so that the tests do not need changing if we
// change the default.
stmt.execute("ALTER TABLE t1 REPLICA IDENTITY CHANGE");
stmt.execute("ALTER TABLE t2 REPLICA IDENTITY FULL");
stmt.execute("ALTER TABLE t3 REPLICA IDENTITY DEFAULT");
}

String slotName = "test_with_wal2json";
Connection conn =
getConnectionBuilder().withTServer(0).replicationConnect();
PGReplicationConnection replConnection = conn.unwrap(PGConnection.class).getReplicationAPI();

createSlot(replConnection, slotName, "wal2json");
try (Statement stmt = connection.createStatement()) {
stmt.execute("INSERT INTO t1 VALUES(1, 'abcd', true)");
stmt.execute("INSERT INTO t1 VALUES(2, 'defg', true)");
stmt.execute("INSERT INTO t1 VALUES(3, 'hijk', false)");
stmt.execute("UPDATE t1 SET b = 'updated_abcd' WHERE a = 1");
stmt.execute("UPDATE t1 SET b = NULL, c = false WHERE a = 2");
stmt.execute("DELETE FROM t1 WHERE a = 2");

stmt.execute("INSERT INTO t2 VALUES(1, 'abcd', true)");
stmt.execute("UPDATE t2 SET b = 'updated_abcd' WHERE a = 1");
stmt.execute("DELETE FROM t2 WHERE a = 1");

stmt.execute("INSERT INTO t3 VALUES(1, 'abcd', true)");
stmt.execute("UPDATE t3 SET b = 'updated_abcd' WHERE a = 1");
stmt.execute("DELETE FROM t3 WHERE a = 1");
}

PGReplicationStream stream = replConnection.replicationStream()
.logical()
.withSlotName(slotName)
.withStartPosition(LogSequenceNumber.valueOf(0L))
.withSlotOption("include-xids", true)
.start();

List<String> result = new ArrayList<String>();
result.addAll(receiveStringMessages(stream, 12));

List<String> expectedResult = new ArrayList<String>() {
{
add(
"{\"xid\":2,\"change\":[{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"t1\","
+"\"columnnames\":[\"a\",\"b\",\"c\"],\"columntypes\":[\"integer\",\"text\",\"boolean\"],"
+"\"columnvalues\":[1,\"abcd\",true]}]}"
);
add(
"{\"xid\":3,\"change\":[{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"t1\","
+"\"columnnames\":[\"a\",\"b\",\"c\"],\"columntypes\":[\"integer\",\"text\",\"boolean\"],"
+"\"columnvalues\":[2,\"defg\",true]}]}"
);
add(
"{\"xid\":4,\"change\":[{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"t1\","
+"\"columnnames\":[\"a\",\"b\",\"c\"],\"columntypes\":[\"integer\",\"text\",\"boolean\"],"
+"\"columnvalues\":[3,\"hijk\",false]}]}"
);
add(
"{\"xid\":5,\"change\":[{\"kind\":\"update\",\"schema\":\"public\",\"table\":\"t1\","
+"\"columnnames\":[\"a\",\"b\"],\"columntypes\":[\"integer\",\"text\"],"
+"\"columnvalues\":[1,\"updated_abcd\"],\"oldkeys\":{\"keynames\":[],\"keytypes\":[],"
+"\"keyvalues\":[]}}]}"
);
add(
"{\"xid\":6,\"change\":[{\"kind\":\"update\",\"schema\":\"public\",\"table\":\"t1\","
+"\"columnnames\":[\"a\",\"b\",\"c\"],\"columntypes\":[\"integer\",\"text\",\"boolean\"],"
+"\"columnvalues\":[2,null,false],\"oldkeys\":{\"keynames\":[],\"keytypes\":[],"
+"\"keyvalues\":[]}}]}"
);
add(
"{\"xid\":7,\"change\":[{\"kind\":\"delete\",\"schema\":\"public\",\"table\":\"t1\","
+"\"oldkeys\":{\"keynames\":[\"a\"],\"keytypes\":[\"integer\"],\"keyvalues\":[2]}}]}"
);
add(
"{\"xid\":8,\"change\":[{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"t2\","
+"\"columnnames\":[\"a\",\"b\",\"c\"],\"columntypes\":[\"integer\",\"text\",\"boolean\"],"
+"\"columnvalues\":[1,\"abcd\",true]}]}"
);
add(
"{\"xid\":9,\"change\":[{\"kind\":\"update\",\"schema\":\"public\",\"table\":\"t2\","
+"\"columnnames\":[\"a\",\"b\",\"c\"],\"columntypes\":[\"integer\",\"text\",\"boolean\"],"
+"\"columnvalues\":[1,\"updated_abcd\",true],"
+"\"oldkeys\":{\"keynames\":[\"a\",\"b\",\"c\"],"
+"\"keytypes\":[\"integer\",\"text\",\"boolean\"],\"keyvalues\":[1,\"abcd\",true]}}]}"
);
add(
"{\"xid\":10,\"change\":[{\"kind\":\"delete\","
+"\"schema\":\"public\",\"table\":\"t2\","
+"\"oldkeys\":{\"keynames\":[\"a\",\"b\",\"c\"],"
+"\"keytypes\":[\"integer\",\"text\",\"boolean\"],"
+"\"keyvalues\":[1,\"updated_abcd\",true]}}]}"
);
add(
"{\"xid\":11,\"change\":[{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"t3\","
+"\"columnnames\":[\"a\",\"b\",\"c\"],\"columntypes\":[\"integer\",\"text\",\"boolean\"],"
+"\"columnvalues\":[1,\"abcd\",true]}]}"
);
add(
"{\"xid\":12,\"change\":[{\"kind\":\"update\",\"schema\":\"public\",\"table\":\"t3\","
+"\"columnnames\":[\"a\",\"b\",\"c\"],\"columntypes\":[\"integer\",\"text\",\"boolean\"],"
+"\"columnvalues\":[1,\"updated_abcd\",true],"
+"\"oldkeys\":{\"keynames\":[],\"keytypes\":[],\"keyvalues\":[]}}]}"
);
add(
"{\"xid\":13,\"change\":[{\"kind\":\"delete\",\"schema\":\"public\",\"table\":\"t3\","
+"\"oldkeys\":{\"keynames\":[\"a\"],\"keytypes\":[\"integer\"],\"keyvalues\":[1]}}]}"
);
}
};

assertEquals(expectedResult, result);

stream.close();
}
}
3 changes: 2 additions & 1 deletion src/postgres/contrib/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ SUBDIRS = \
vacuumlo \
yb_pg_metrics \
yb_test_extension \
yb_ycql_utils
yb_ycql_utils \
wal2json

ifeq ($(with_openssl),yes)
SUBDIRS += sslinfo
Expand Down
6 changes: 6 additions & 0 deletions src/postgres/contrib/wal2json/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Generated subdirectories
/log/
/results/
/output_iso/
/tmp_check/
/tmp_check_iso/
27 changes: 27 additions & 0 deletions src/postgres/contrib/wal2json/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
Copyright (c) 2013-2024, Euler Taveira de Oliveira
All rights reserved.

Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice, this
list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.

* Neither the name of the Euler Taveira de Oliveira nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
49 changes: 49 additions & 0 deletions src/postgres/contrib/wal2json/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# contrib/wal2json/Makefile

MODULES = wal2json
PGFILEDESC = "wal2json - JSON output plugin for changeset extraction"

REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \
delete3 delete4 savepoint specialvalue toast bytea message typmod \
filtertable selecttable include_timestamp include_lsn include_xids \
include_domain_data_type truncate type_oid actions position default \
pk rename_column numeric_data_types_as_string

EXTRA_CLEAN = $(pg_regress_clean_files)

ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = contrib/wal2json
top_builddir = ../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif

# message API is available in 9.6+
ifneq (,$(findstring $(MAJORVERSION),9.4 9.5))
REGRESS := $(filter-out message, $(REGRESS))
endif

# truncate API is available in 11+
ifneq (,$(findstring $(MAJORVERSION),9.4 9.5 9.6 10))
REGRESS := $(filter-out truncate, $(REGRESS))
endif

# actions API is available in 11+
# this test should be executed in prior versions, however, truncate will fail.
ifneq (,$(findstring $(MAJORVERSION),9.4 9.5 9.6 10))
REGRESS := $(filter-out actions, $(REGRESS))
endif

# make installcheck
#
# It can be run but you need to add the following parameters to
# postgresql.conf:
#
# wal_level = logical
# max_replication_slots = 10
#
# Also, you should start the server before executing it.
2 changes: 2 additions & 0 deletions src/postgres/contrib/wal2json/logical.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
wal_level = logical
max_replication_slots = 4
Loading

0 comments on commit 650d80a

Please sign in to comment.