Skip to content

Commit

Permalink
Combine common code into functions
Browse files Browse the repository at this point in the history
Instead of repeating code into different wal2json format output
functions, combine the common code into functions and use them.
  • Loading branch information
eulerto committed Apr 26, 2021
1 parent 8139a6d commit a96dd31
Showing 1 changed file with 101 additions and 140 deletions.
241 changes: 101 additions & 140 deletions wal2json.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ static bool string_to_SelectTable(char *rawstring, char separator, List **select
static bool split_string_to_list(char *rawstring, char separator, List **sl);
static bool split_string_to_oid_list(char *rawstring, char separator, List **sl);

static bool pg_filter_by_action(int change_type, JsonAction actions);
static bool pg_filter_by_table(List *filter_tables, char *schemaname, char *tablename);
static bool pg_add_by_table(List *add_tables, char *schemaname, char *tablename);

/* version 1 */
static void pg_decode_begin_txn_v1(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
Expand Down Expand Up @@ -1468,6 +1472,83 @@ pk_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple
pfree(pktypes.data);
}

static bool
pg_filter_by_action(int change_type, JsonAction actions)
{
if (change_type == REORDER_BUFFER_CHANGE_INSERT && !actions.insert)
{
elog(DEBUG3, "ignore INSERT");
return true;
}
if (change_type == REORDER_BUFFER_CHANGE_UPDATE && !actions.update)
{
elog(DEBUG3, "ignore UPDATE");
return true;
}
if (change_type == REORDER_BUFFER_CHANGE_DELETE && !actions.delete)
{
elog(DEBUG3, "ignore DELETE");
return true;
}

return false;
}

static bool
pg_filter_by_table(List *filter_tables, char *schemaname, char *tablename)
{
if (list_length(filter_tables) > 0)
{
ListCell *lc;

foreach(lc, filter_tables)
{
SelectTable *t = lfirst(lc);

if (t->allschemas || strcmp(t->schemaname, schemaname) == 0)
{
if (t->alltables || strcmp(t->tablename, tablename) == 0)
{
elog(DEBUG2, "\"%s\".\"%s\" was filtered out",
((t->allschemas) ? "*" : t->schemaname),
((t->alltables) ? "*" : t->tablename));
return true;
}
}
}
}

return false;
}

static bool
pg_add_by_table(List *add_tables, char *schemaname, char *tablename)
{
if (list_length(add_tables) > 0)
{
ListCell *lc;

/* all tables in all schemas are added by default */
foreach(lc, add_tables)
{
SelectTable *t = lfirst(lc);

if (t->allschemas || strcmp(t->schemaname, schemaname) == 0)
{
if (t->alltables || strcmp(t->tablename, tablename) == 0)
{
elog(DEBUG2, "\"%s\".\"%s\" was added",
((t->allschemas) ? "*" : t->schemaname),
((t->alltables) ? "*" : t->tablename));
return true;
}
}
}
}

return false;
}

/* Callback for individual changed tuples */
static void
pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Expand Down Expand Up @@ -1502,21 +1583,9 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,

data = ctx->output_plugin_private;

if (change->action == REORDER_BUFFER_CHANGE_INSERT && !data->actions.insert)
{
elog(DEBUG3, "ignore INSERT");
return;
}
if (change->action == REORDER_BUFFER_CHANGE_UPDATE && !data->actions.update)
{
elog(DEBUG3, "ignore UPDATE");
/* filter changes by action */
if (pg_filter_by_action(change->action, data->actions))
return;
}
if (change->action == REORDER_BUFFER_CHANGE_DELETE && !data->actions.delete)
{
elog(DEBUG3, "ignore DELETE");
return;
}

class_form = RelationGetForm(relation);
tupdesc = RelationGetDescr(relation);
Expand All @@ -1535,69 +1604,21 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
RelationGetIndexList(relation);

/* Filter tables, if available */
if (list_length(data->filter_tables) > 0)
if (pg_filter_by_table(data->filter_tables, schemaname, tablename))
{
ListCell *lc;
bool skip = false;

foreach(lc, data->filter_tables)
{
SelectTable *t = lfirst(lc);

if (t->allschemas || strcmp(t->schemaname, schemaname) == 0)
{
if (t->alltables || strcmp(t->tablename, tablename) == 0)
{
elog(DEBUG2, "\"%s\".\"%s\" was filtered out",
((t->allschemas) ? "*" : t->schemaname),
((t->alltables) ? "*" : t->tablename));
skip = true;
}
}
}

/* table was found */
if (skip)
{
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
return;
}
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
return;
}

/* Add tables */
if (list_length(data->add_tables) > 0)
if (!pg_add_by_table(data->add_tables, schemaname, tablename))
{
ListCell *lc;
bool skip = true;

/* all tables in all schemas are added by default */
foreach(lc, data->add_tables)
{
SelectTable *t = lfirst(lc);

if (t->allschemas || strcmp(t->schemaname, schemaname) == 0)
{
if (t->alltables || strcmp(t->tablename, tablename) == 0)
{
elog(DEBUG2, "\"%s\".\"%s\" was added",
((t->allschemas) ? "*" : t->schemaname),
((t->alltables) ? "*" : t->tablename));
skip = false;
}
}
}

/* table was not found */
if (skip)
{
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
return;
}
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
return;
}

/* Sanity checks */
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
Expand Down Expand Up @@ -2083,7 +2104,6 @@ pg_decode_write_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relat
/* make sure rd_pkindex and rd_replidindex are set */
RelationGetIndexList(relation);

/* sanity checks */
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
Expand Down Expand Up @@ -2266,21 +2286,9 @@ pg_decode_change_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
char *schemaname;
char *tablename;

if (change->action == REORDER_BUFFER_CHANGE_INSERT && !data->actions.insert)
{
elog(DEBUG3, "ignore INSERT");
return;
}
if (change->action == REORDER_BUFFER_CHANGE_UPDATE && !data->actions.update)
{
elog(DEBUG3, "ignore UPDATE");
/* filter changes by action */
if (pg_filter_by_action(change->action, data->actions))
return;
}
if (change->action == REORDER_BUFFER_CHANGE_DELETE && !data->actions.delete)
{
elog(DEBUG3, "ignore DELETE");
return;
}

/* avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
Expand All @@ -2290,66 +2298,19 @@ pg_decode_change_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
tablename = RelationGetRelationName(relation);

/* Exclude tables, if available */
if (list_length(data->filter_tables) > 0)
if (pg_filter_by_table(data->filter_tables, schemaname, tablename))
{
ListCell *lc;
bool skip = false;

foreach(lc, data->filter_tables)
{
SelectTable *t = lfirst(lc);

if (t->allschemas || strcmp(t->schemaname, schemaname) == 0)
{
if (t->alltables || strcmp(t->tablename, tablename) == 0)
{
elog(DEBUG2, "\"%s\".\"%s\" was filtered out",
((t->allschemas) ? "*" : t->schemaname),
((t->alltables) ? "*" : t->tablename));
skip = true;
}
}
}

/* table was found */
if (skip)
{
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
return;
}
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
return;
}

/* Add tables */
if (list_length(data->add_tables) > 0)
if (!pg_add_by_table(data->add_tables, schemaname, tablename))
{
ListCell *lc;
bool skip = true;

/* all tables in all schemas are added by default */
foreach(lc, data->add_tables)
{
SelectTable *t = lfirst(lc);

if (t->allschemas || strcmp(t->schemaname, schemaname) == 0)
{
if (t->alltables || strcmp(t->tablename, tablename) == 0)
{
elog(DEBUG2, "\"%s\".\"%s\" was added",
((t->allschemas) ? "*" : t->schemaname),
((t->alltables) ? "*" : t->tablename));
skip = false;
}
}
}

/* table was not found */
if (skip)
{
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
return;
}
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
return;
}

pg_decode_write_change(ctx, txn, relation, change);
Expand Down

0 comments on commit a96dd31

Please sign in to comment.