From a96dd31fdf464935febdb9f862ab6f064abd511a Mon Sep 17 00:00:00 2001 From: Euler Taveira Date: Sun, 25 Apr 2021 22:18:03 -0300 Subject: [PATCH] Combine common code into functions Instead of repeating code into different wal2json format output functions, combine the common code into functions and use them. --- wal2json.c | 241 ++++++++++++++++++++++------------------------------- 1 file changed, 101 insertions(+), 140 deletions(-) diff --git a/wal2json.c b/wal2json.c index 1285e0abfa2d..daa78e12705b 100644 --- a/wal2json.c +++ b/wal2json.c @@ -144,6 +144,10 @@ static bool string_to_SelectTable(char *rawstring, char separator, List **select static bool split_string_to_list(char *rawstring, char separator, List **sl); static bool split_string_to_oid_list(char *rawstring, char separator, List **sl); +static bool pg_filter_by_action(int change_type, JsonAction actions); +static bool pg_filter_by_table(List *filter_tables, char *schemaname, char *tablename); +static bool pg_add_by_table(List *add_tables, char *schemaname, char *tablename); + /* version 1 */ static void pg_decode_begin_txn_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); @@ -1468,6 +1472,83 @@ pk_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple pfree(pktypes.data); } +static bool +pg_filter_by_action(int change_type, JsonAction actions) +{ + if (change_type == REORDER_BUFFER_CHANGE_INSERT && !actions.insert) + { + elog(DEBUG3, "ignore INSERT"); + return true; + } + if (change_type == REORDER_BUFFER_CHANGE_UPDATE && !actions.update) + { + elog(DEBUG3, "ignore UPDATE"); + return true; + } + if (change_type == REORDER_BUFFER_CHANGE_DELETE && !actions.delete) + { + elog(DEBUG3, "ignore DELETE"); + return true; + } + + return false; +} + +static bool +pg_filter_by_table(List *filter_tables, char *schemaname, char *tablename) +{ + if (list_length(filter_tables) > 0) + { + ListCell *lc; + + foreach(lc, filter_tables) + { + SelectTable *t = lfirst(lc); + + if (t->allschemas || strcmp(t->schemaname, schemaname) == 0) + { + if (t->alltables || strcmp(t->tablename, tablename) == 0) + { + elog(DEBUG2, "\"%s\".\"%s\" was filtered out", + ((t->allschemas) ? "*" : t->schemaname), + ((t->alltables) ? "*" : t->tablename)); + return true; + } + } + } + } + + return false; +} + +static bool +pg_add_by_table(List *add_tables, char *schemaname, char *tablename) +{ + if (list_length(add_tables) > 0) + { + ListCell *lc; + + /* all tables in all schemas are added by default */ + foreach(lc, add_tables) + { + SelectTable *t = lfirst(lc); + + if (t->allschemas || strcmp(t->schemaname, schemaname) == 0) + { + if (t->alltables || strcmp(t->tablename, tablename) == 0) + { + elog(DEBUG2, "\"%s\".\"%s\" was added", + ((t->allschemas) ? "*" : t->schemaname), + ((t->alltables) ? "*" : t->tablename)); + return true; + } + } + } + } + + return false; +} + /* Callback for individual changed tuples */ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -1502,21 +1583,9 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, data = ctx->output_plugin_private; - if (change->action == REORDER_BUFFER_CHANGE_INSERT && !data->actions.insert) - { - elog(DEBUG3, "ignore INSERT"); - return; - } - if (change->action == REORDER_BUFFER_CHANGE_UPDATE && !data->actions.update) - { - elog(DEBUG3, "ignore UPDATE"); + /* filter changes by action */ + if (pg_filter_by_action(change->action, data->actions)) return; - } - if (change->action == REORDER_BUFFER_CHANGE_DELETE && !data->actions.delete) - { - elog(DEBUG3, "ignore DELETE"); - return; - } class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); @@ -1535,69 +1604,21 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, RelationGetIndexList(relation); /* Filter tables, if available */ - if (list_length(data->filter_tables) > 0) + if (pg_filter_by_table(data->filter_tables, schemaname, tablename)) { - ListCell *lc; - bool skip = false; - - foreach(lc, data->filter_tables) - { - SelectTable *t = lfirst(lc); - - if (t->allschemas || strcmp(t->schemaname, schemaname) == 0) - { - if (t->alltables || strcmp(t->tablename, tablename) == 0) - { - elog(DEBUG2, "\"%s\".\"%s\" was filtered out", - ((t->allschemas) ? "*" : t->schemaname), - ((t->alltables) ? "*" : t->tablename)); - skip = true; - } - } - } - - /* table was found */ - if (skip) - { - MemoryContextSwitchTo(old); - MemoryContextReset(data->context); - return; - } + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + return; } /* Add tables */ - if (list_length(data->add_tables) > 0) + if (!pg_add_by_table(data->add_tables, schemaname, tablename)) { - ListCell *lc; - bool skip = true; - - /* all tables in all schemas are added by default */ - foreach(lc, data->add_tables) - { - SelectTable *t = lfirst(lc); - - if (t->allschemas || strcmp(t->schemaname, schemaname) == 0) - { - if (t->alltables || strcmp(t->tablename, tablename) == 0) - { - elog(DEBUG2, "\"%s\".\"%s\" was added", - ((t->allschemas) ? "*" : t->schemaname), - ((t->alltables) ? "*" : t->tablename)); - skip = false; - } - } - } - - /* table was not found */ - if (skip) - { - MemoryContextSwitchTo(old); - MemoryContextReset(data->context); - return; - } + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + return; } - /* Sanity checks */ switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: @@ -2083,7 +2104,6 @@ pg_decode_write_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relat /* make sure rd_pkindex and rd_replidindex are set */ RelationGetIndexList(relation); - /* sanity checks */ switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: @@ -2266,21 +2286,9 @@ pg_decode_change_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, char *schemaname; char *tablename; - if (change->action == REORDER_BUFFER_CHANGE_INSERT && !data->actions.insert) - { - elog(DEBUG3, "ignore INSERT"); - return; - } - if (change->action == REORDER_BUFFER_CHANGE_UPDATE && !data->actions.update) - { - elog(DEBUG3, "ignore UPDATE"); + /* filter changes by action */ + if (pg_filter_by_action(change->action, data->actions)) return; - } - if (change->action == REORDER_BUFFER_CHANGE_DELETE && !data->actions.delete) - { - elog(DEBUG3, "ignore DELETE"); - return; - } /* avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -2290,66 +2298,19 @@ pg_decode_change_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, tablename = RelationGetRelationName(relation); /* Exclude tables, if available */ - if (list_length(data->filter_tables) > 0) + if (pg_filter_by_table(data->filter_tables, schemaname, tablename)) { - ListCell *lc; - bool skip = false; - - foreach(lc, data->filter_tables) - { - SelectTable *t = lfirst(lc); - - if (t->allschemas || strcmp(t->schemaname, schemaname) == 0) - { - if (t->alltables || strcmp(t->tablename, tablename) == 0) - { - elog(DEBUG2, "\"%s\".\"%s\" was filtered out", - ((t->allschemas) ? "*" : t->schemaname), - ((t->alltables) ? "*" : t->tablename)); - skip = true; - } - } - } - - /* table was found */ - if (skip) - { - MemoryContextSwitchTo(old); - MemoryContextReset(data->context); - return; - } + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + return; } /* Add tables */ - if (list_length(data->add_tables) > 0) + if (!pg_add_by_table(data->add_tables, schemaname, tablename)) { - ListCell *lc; - bool skip = true; - - /* all tables in all schemas are added by default */ - foreach(lc, data->add_tables) - { - SelectTable *t = lfirst(lc); - - if (t->allschemas || strcmp(t->schemaname, schemaname) == 0) - { - if (t->alltables || strcmp(t->tablename, tablename) == 0) - { - elog(DEBUG2, "\"%s\".\"%s\" was added", - ((t->allschemas) ? "*" : t->schemaname), - ((t->alltables) ? "*" : t->tablename)); - skip = false; - } - } - } - - /* table was not found */ - if (skip) - { - MemoryContextSwitchTo(old); - MemoryContextReset(data->context); - return; - } + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + return; } pg_decode_write_change(ctx, txn, relation, change);