From 07db85d3d3e52f5d1a62d2ffa9ad3f76874ae24e Mon Sep 17 00:00:00 2001 From: Bastien Penavayre Date: Fri, 24 May 2024 16:53:05 +0200 Subject: [PATCH] message_middleware: make message passing from Replication.Server -> Replication.Publisher configurable the aim of this is to allow the end user to implement a backpresure system if need be for instance using broadway / dets / etc. --- lib/walex/config/config.ex | 11 +++++++++-- lib/walex/replication/publisher.ex | 8 ++++++-- lib/walex/replication/server.ex | 12 +++++++----- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/lib/walex/config/config.ex b/lib/walex/config/config.ex index 7515a1b..c87360b 100644 --- a/lib/walex/config/config.ex +++ b/lib/walex/config/config.ex @@ -4,9 +4,10 @@ defmodule WalEx.Config do """ use Agent + alias WalEx.Replication.Publisher alias WalEx.Config.Registry, as: WalExRegistry - @allowed_config_value ~w(database hostname name password port publication username webhook_signing_secret slot_name durable_slot)a + @allowed_config_value ~w(database hostname name password port publication username webhook_signing_secret slot_name durable_slot message_middleware)a @allowed_config_values ~w(destinations event_relay modules subscriptions)a def start_link(opts) do @@ -127,7 +128,8 @@ defmodule WalEx.Config do webhook_signing_secret: Keyword.get(configs, :webhook_signing_secret), event_relay: Keyword.get(configs, :event_relay), slot_name: Keyword.get(configs, :slot_name) |> parse_slot_name(name), - durable_slot: Keyword.get(configs, :durable_slot, false) == true + durable_slot: Keyword.get(configs, :durable_slot, false) == true, + message_middleware: Keyword.get(configs, :message_middleware) |> parse_message_middleware() ] end @@ -200,6 +202,11 @@ defmodule WalEx.Config do defp parse_slot_name(nil, app_name), do: to_string(app_name) <> "_walex" defp parse_slot_name(slot_name, _), do: slot_name + defp parse_message_middleware(message_middleware) when is_function(message_middleware, 2), + do: message_middleware + + defp parse_message_middleware(nil), do: &Publisher.process_message_async/2 + defp set_url_opts(username, password, database, info) do url_opts = [ username: username, diff --git a/lib/walex/replication/publisher.ex b/lib/walex/replication/publisher.ex index 8dc9fda..2e1f7a5 100644 --- a/lib/walex/replication/publisher.ex +++ b/lib/walex/replication/publisher.ex @@ -27,12 +27,16 @@ defmodule WalEx.Replication.Publisher do GenServer.start_link(__MODULE__, opts, name: name) end - def process_message(message, app_name) do + def process_message_async(message, app_name) do name = registry_name(app_name) - GenServer.cast(name, %{message: message, app_name: app_name}) end + def process_message_sync(message, app_name) do + name = registry_name(app_name) + GenServer.call(name, %{message: message, app_name: app_name}, :infinity) + end + defp registry_name(app_name) do Config.Registry.set_name(:set_gen_server, __MODULE__, app_name) end diff --git a/lib/walex/replication/server.ex b/lib/walex/replication/server.ex index 07a4daf..d1de33c 100644 --- a/lib/walex/replication/server.ex +++ b/lib/walex/replication/server.ex @@ -9,7 +9,6 @@ defmodule WalEx.Replication.Server do alias WalEx.Config.Registry, as: WalExRegistry alias WalEx.Decoder - alias WalEx.Replication.Publisher alias WalEx.Replication.QueryBuilder def start_link(opts) do @@ -48,12 +47,14 @@ defmodule WalEx.Replication.Server do [ slot_name: slot_name, publication: publication, - durable_slot: durable_slot + durable_slot: durable_slot, + message_middleware: message_middleware ] = WalEx.Config.get_configs(app_name, [ :slot_name, :publication, - :durable_slot + :durable_slot, + :message_middleware ]) state = %{ @@ -61,7 +62,8 @@ defmodule WalEx.Replication.Server do app_name: app_name, slot_name: slot_name, publication: publication, - durable_slot: durable_slot + durable_slot: durable_slot, + message_middleware: message_middleware } {:ok, state} @@ -138,7 +140,7 @@ defmodule WalEx.Replication.Server do def handle_data(<>, state) do rest |> Decoder.decode_message() - |> Publisher.process_message(state.app_name) + |> state.message_middleware.(state.app_name) {:noreply, state} end