Skip to content

Commit

Permalink
New wal2json format
Browse files Browse the repository at this point in the history
This is a new format for wal2json. You can choose it using
option 'format-version' = 2. This format is completely different from
version 1.

Features are:

* one JSON per tuple;
* each JSON has an "action" (BEGIN, COMMIT, INSERT, UPDATE, DELETE,
MESSAGE);
* one (optional) JSON object for BEGIN/COMMIT;
* BEGIN contains xid, timestamp, and lsn;
* COMMIT contains xid, timestamp, and lsn;
* INSERT/UPDATE/DELETE contains lsn, schema, table, columns, identity;
* "columns" and "identity" are arrays of elements;
* each "columns" element is an object that contains name, type, value,
and optional;
* "identity" is an array of elements (REPLICA IDENTITY for UPDATE /
DELETE statements) that contains name, type, and value;
* MESSAGE contains xid, timestamp, lsn, transactional, prefix, and
content.

This new format solves the big transaction issue (that consumes a lot of
memory) since tuples aren't accumulate until the end of transaction to
be written. Users can control transactions using option
include-transaction that will emit a JSON at the beginning of the
transaction and another one at the end of the transaction.
  • Loading branch information
Euler Taveira committed Dec 16, 2019
1 parent da90c76 commit 1b0cbac
Show file tree
Hide file tree
Showing 43 changed files with 1,066 additions and 74 deletions.
16 changes: 15 additions & 1 deletion expected/bytea.out

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions expected/cmdline.out
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2js
init
(1 row)

SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'nosuchopt', '42');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '1', 'nosuchopt', '42');
ERROR: option "nosuchopt" = "42" is unknown
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-unchanged-toast', '1');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '1', 'include-unchanged-toast', '1');
ERROR: parameter "include-unchanged-toast" was deprecated
-- don't include not-null constraint by default
CREATE TABLE table_optional (
Expand All @@ -22,7 +22,7 @@ INSERT INTO table_optional (b, c) VALUES(NULL, TRUE);
UPDATE table_optional SET b = 123 WHERE a = 1;
DELETE FROM table_optional WHERE a = 1;
DROP TABLE table_optional;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'include-not-null', '1');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '1', 'include-xids', '0', 'include-not-null', '1');
data
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"change":[]}
Expand All @@ -35,14 +35,14 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
-- By default don't write in chunks
CREATE TABLE x ();
DROP TABLE x;
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'include-xids', '0');
data
---------------
{"change":[]}
{"change":[]}
(2 rows)

SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'write-in-chunks', '1');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '1', 'include-xids', '0', 'write-in-chunks', '1');
data
-------------
{"change":[
Expand Down
16 changes: 15 additions & 1 deletion expected/delete1.out
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ DELETE FROM table_without_pk WHERE b = 1;
DELETE FROM table_with_pk WHERE b = 1;
-- DELETE: unique
DELETE FROM table_with_unique WHERE b = 1;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'include-typmod', '0');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'pretty-print', '1', 'include-typmod', '0');
WARNING: table "table_without_pk" without primary key or replica identity is nothing
WARNING: table "table_with_unique" without primary key or replica identity is nothing
data
Expand Down Expand Up @@ -105,6 +105,20 @@ WARNING: table "table_with_unique" without primary key or replica identity is n
}
(3 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2');
WARNING: no tuple identifier for DELETE in table "public"."table_without_pk"
WARNING: no tuple identifier for DELETE in table "public"."table_with_unique"
data
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"D","schema":"public","table":"table_with_pk","identity":[{"name":"b","type":"smallint","value":1},{"name":"c","type":"integer","value":2},{"name":"d","type":"bigint","value":3}]}
{"action":"C"}
{"action":"B"}
{"action":"C"}
(7 rows)

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
Expand Down
28 changes: 27 additions & 1 deletion expected/delete2.out
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ ALTER TABLE table_without_pk REPLICA IDENTITY DEFAULT;
ALTER TABLE table_with_unique REPLICA IDENTITY NOTHING;
DELETE FROM table_with_unique WHERE b = 1;
ALTER TABLE table_with_unique REPLICA IDENTITY DEFAULT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'include-typmod', '0');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'pretty-print', '1', 'include-typmod', '0');
WARNING: table "table_with_pk" without primary key or replica identity is nothing
WARNING: table "table_without_pk" without primary key or replica identity is nothing
WARNING: table "table_with_unique" without primary key or replica identity is nothing
Expand Down Expand Up @@ -124,6 +124,32 @@ WARNING: table "table_with_unique" without primary key or replica identity is n
}
(9 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2');
WARNING: no tuple identifier for DELETE in table "public"."table_with_pk"
WARNING: no tuple identifier for DELETE in table "public"."table_without_pk"
WARNING: no tuple identifier for DELETE in table "public"."table_with_unique"
data
----------------
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
(18 rows)

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
Expand Down
31 changes: 30 additions & 1 deletion expected/delete3.out
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ ALTER TABLE table_without_pk REPLICA IDENTITY DEFAULT;
ALTER TABLE table_with_unique REPLICA IDENTITY FULL;
DELETE FROM table_with_unique WHERE b = 1;
ALTER TABLE table_with_unique REPLICA IDENTITY DEFAULT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'include-typmod', '0');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'pretty-print', '1', 'include-typmod', '0');
data
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{ +
Expand Down Expand Up @@ -168,6 +168,35 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pre
}
(10 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2');
data
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"D","schema":"public","table":"table_with_pk","identity":[{"name":"a","type":"smallint","value":1},{"name":"b","type":"smallint","value":1},{"name":"c","type":"integer","value":2},{"name":"d","type":"bigint","value":3},{"name":"e","type":"numeric(5,3)","value":3.540},{"name":"f","type":"real","value":876.563},{"name":"g","type":"double precision","value":1.23},{"name":"h","type":"character(10)","value":"teste "},{"name":"i","type":"character varying(30)","value":"testando"},{"name":"j","type":"text","value":"um texto longo"},{"name":"k","type":"bit varying(20)","value":"001110010101010"},{"name":"l","type":"timestamp without time zone","value":"Sat Nov 02 17:30:52 2013"},{"name":"m","type":"date","value":"02-04-2013"},{"name":"n","type":"boolean","value":true},{"name":"o","type":"json","value":"{ \"a\": 123 }"},{"name":"p","type":"tsvector","value":"'Old' 'Parr'"}]}
{"action":"C"}
{"action":"B"}
{"action":"D","schema":"public","table":"table_with_pk","identity":[{"name":"a","type":"smallint","value":2},{"name":"b","type":"smallint","value":4},{"name":"c","type":"integer","value":5},{"name":"d","type":"bigint","value":6},{"name":"e","type":"numeric(5,3)","value":3.540},{"name":"f","type":"real","value":876.563},{"name":"g","type":"double precision","value":1.23},{"name":"h","type":"character(10)","value":"teste "},{"name":"i","type":"character varying(30)","value":"testando"},{"name":"j","type":"text","value":"um texto longo"},{"name":"k","type":"bit varying(20)","value":"001110010101010"},{"name":"l","type":"timestamp without time zone","value":"Sat Nov 02 17:30:52 2013"},{"name":"m","type":"date","value":"02-04-2013"},{"name":"n","type":"boolean","value":true},{"name":"o","type":"json","value":"{ \"a\": 123 }"},{"name":"p","type":"tsvector","value":"'Old' 'Parr'"}]}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"D","schema":"public","table":"table_without_pk","identity":[{"name":"a","type":"smallint","value":1},{"name":"b","type":"smallint","value":1},{"name":"c","type":"integer","value":2},{"name":"d","type":"bigint","value":3},{"name":"e","type":"numeric(5,3)","value":3.540},{"name":"f","type":"real","value":876.563},{"name":"g","type":"double precision","value":1.23},{"name":"h","type":"character(10)","value":"teste "},{"name":"i","type":"character varying(30)","value":"testando"},{"name":"j","type":"text","value":"um texto longo"},{"name":"k","type":"bit varying(20)","value":"001110010101010"},{"name":"l","type":"timestamp without time zone","value":"Sat Nov 02 17:30:52 2013"},{"name":"m","type":"date","value":"02-04-2013"},{"name":"n","type":"boolean","value":true},{"name":"o","type":"json","value":"{ \"a\": 123 }"},{"name":"p","type":"tsvector","value":"'Old' 'Parr'"}]}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"D","schema":"public","table":"table_with_unique","identity":[{"name":"a","type":"smallint","value":1},{"name":"b","type":"smallint","value":1},{"name":"c","type":"integer","value":2},{"name":"d","type":"bigint","value":3},{"name":"e","type":"numeric(5,3)","value":3.540},{"name":"f","type":"real","value":876.563},{"name":"g","type":"double precision","value":1.23},{"name":"h","type":"character(10)","value":"teste "},{"name":"i","type":"character varying(30)","value":"testando"},{"name":"j","type":"text","value":"um texto longo"},{"name":"k","type":"bit varying(20)","value":"001110010101010"},{"name":"l","type":"timestamp without time zone","value":"Sat Nov 02 17:30:52 2013"},{"name":"m","type":"date","value":"02-04-2013"},{"name":"n","type":"boolean","value":true},{"name":"o","type":"json","value":"{ \"a\": 123 }"},{"name":"p","type":"tsvector","value":"'Old' 'Parr'"}]}
{"action":"C"}
{"action":"B"}
{"action":"C"}
(24 rows)

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
Expand Down
17 changes: 16 additions & 1 deletion expected/delete4.out
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ALTER TABLE table_with_unique REPLICA IDENTITY USING INDEX table_with_unique_g_n
DELETE FROM table_with_unique WHERE b = 1;
DELETE FROM table_with_unique WHERE n = true;
ALTER TABLE table_with_unique REPLICA IDENTITY DEFAULT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'include-typmod', '0');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'pretty-print', '1', 'include-typmod', '0');
data
-----------------------------------------------------------------
{ +
Expand Down Expand Up @@ -76,6 +76,21 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pre
}
(4 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2');
data
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"D","schema":"public","table":"table_with_unique","identity":[{"name":"g","type":"double precision","value":1.23},{"name":"n","type":"boolean","value":false}]}
{"action":"C"}
{"action":"B"}
{"action":"D","schema":"public","table":"table_with_unique","identity":[{"name":"g","type":"double precision","value":4.56},{"name":"n","type":"boolean","value":true}]}
{"action":"C"}
{"action":"B"}
{"action":"C"}
(10 rows)

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
Expand Down
44 changes: 43 additions & 1 deletion expected/filtertable.out
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ INSERT INTO "filter""table_9" (a, b) VALUES(1, 'public.filter"table_9');
INSERT INTO " filter_table_10" (a, b) VALUES(1, 'public. filter_table_10');
INSERT INTO "*" (a, b) VALUES(1, 'public.*');
INSERT INTO "*".filter_table_0 (a, b) VALUES(1, '*.filter_table_0');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'filter-tables', ' foo.bar,*.filter_table_1 ,filter_schema_2.* , public.filter_table_3 , public.Filter_table_5, public.filter\ table_6, public.filter\.table_7 , public.filter\,table_8 , public.filter"table_9, *.\ filter_table_10 , public.\* , \*.filter_table_0 ');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'pretty-print', '1', 'filter-tables', ' foo.bar,*.filter_table_1 ,filter_schema_2.* , public.filter_table_3 , public.Filter_table_5, public.filter\ table_6, public.filter\.table_7 , public.filter\,table_8 , public.filter"table_9, *.\ filter_table_10 , public.\* , \*.filter_table_0 ');
data
-------------------------------------------------------------------------------
{ +
Expand Down Expand Up @@ -169,6 +169,48 @@ SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'pr
}
(17 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'filter-tables', ' foo.bar,*.filter_table_1 ,filter_schema_2.* , public.filter_table_3 , public.Filter_table_5, public.filter\ table_6, public.filter\.table_7 , public.filter\,table_8 , public.filter"table_9, *.\ filter_table_10 , public.\* , \*.filter_table_0 ');
data
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"I","schema":"filter_schema_1","table":"filter_table_2","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"text","value":"filter_schema_1.filter_table_2"}]}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"I","schema":"public","table":"filter_table_2","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"text","value":"public.filter_table_2"}]}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"I","schema":"public","table":"filter_table_4","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"text","value":"public.filter_table_4"}]}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
(37 rows)

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
Expand Down
6 changes: 3 additions & 3 deletions expected/include_lsn.out
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2js

-- One row should have one record and one nextlsn
INSERT INTO tbl VALUES (1);
SELECT count(*) = 1, count(distinct ((data::json)->'nextlsn')::text) = 1 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-lsn', '1');
SELECT count(*) = 1, count(distinct ((data::json)->'nextlsn')::text) = 1 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '1', 'include-lsn', '1');
?column? | ?column?
----------+----------
t | t
Expand All @@ -20,15 +20,15 @@ SELECT count(*) = 1, count(distinct ((data::json)->'nextlsn')::text) = 1 FROM pg
-- Two rows should have two records and two nextlsns
INSERT INTO tbl VALUES (2);
INSERT INTO tbl VALUES (3);
SELECT count(*) = 2, count(distinct ((data::json)->'nextlsn')::text) = 2 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-lsn', '1');
SELECT count(*) = 2, count(distinct ((data::json)->'nextlsn')::text) = 2 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '1', 'include-lsn', '1');
?column? | ?column?
----------+----------
t | t
(1 row)

-- Two rows in one transaction should have one record and one nextlsn
INSERT INTO tbl VALUES (4), (5);
SELECT count(*) = 1, count(distinct ((data::json)->'nextlsn')::text) = 1 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-lsn', '1');
SELECT count(*) = 1, count(distinct ((data::json)->'nextlsn')::text) = 1 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'format-version', '1', 'include-lsn', '1');
?column? | ?column?
----------+----------
t | t
Expand Down
Loading

0 comments on commit 1b0cbac

Please sign in to comment.