Skip to content

Commit

Permalink
message_middleware: make message passing from Replication.Server -> R…
Browse files Browse the repository at this point in the history
…eplication.Publisher configurable

the aim of this is to allow the end user to implement a backpresure system if need be
for instance using broadway / dets / etc.
  • Loading branch information
DaemonSnake committed May 24, 2024
1 parent c1972f4 commit 07db85d
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
11 changes: 9 additions & 2 deletions lib/walex/config/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ defmodule WalEx.Config do
"""
use Agent

alias WalEx.Replication.Publisher
alias WalEx.Config.Registry, as: WalExRegistry

@allowed_config_value ~w(database hostname name password port publication username webhook_signing_secret slot_name durable_slot)a
@allowed_config_value ~w(database hostname name password port publication username webhook_signing_secret slot_name durable_slot message_middleware)a
@allowed_config_values ~w(destinations event_relay modules subscriptions)a

def start_link(opts) do
Expand Down Expand Up @@ -127,7 +128,8 @@ defmodule WalEx.Config do
webhook_signing_secret: Keyword.get(configs, :webhook_signing_secret),
event_relay: Keyword.get(configs, :event_relay),
slot_name: Keyword.get(configs, :slot_name) |> parse_slot_name(name),
durable_slot: Keyword.get(configs, :durable_slot, false) == true
durable_slot: Keyword.get(configs, :durable_slot, false) == true,
message_middleware: Keyword.get(configs, :message_middleware) |> parse_message_middleware()
]
end

Expand Down Expand Up @@ -200,6 +202,11 @@ defmodule WalEx.Config do
defp parse_slot_name(nil, app_name), do: to_string(app_name) <> "_walex"
defp parse_slot_name(slot_name, _), do: slot_name

defp parse_message_middleware(message_middleware) when is_function(message_middleware, 2),
do: message_middleware

defp parse_message_middleware(nil), do: &Publisher.process_message_async/2

defp set_url_opts(username, password, database, info) do
url_opts = [
username: username,
Expand Down
8 changes: 6 additions & 2 deletions lib/walex/replication/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@ defmodule WalEx.Replication.Publisher do
GenServer.start_link(__MODULE__, opts, name: name)
end

def process_message(message, app_name) do
def process_message_async(message, app_name) do
name = registry_name(app_name)

GenServer.cast(name, %{message: message, app_name: app_name})
end

def process_message_sync(message, app_name) do
name = registry_name(app_name)
GenServer.call(name, %{message: message, app_name: app_name}, :infinity)
end

defp registry_name(app_name) do
Config.Registry.set_name(:set_gen_server, __MODULE__, app_name)
end
Expand Down
12 changes: 7 additions & 5 deletions lib/walex/replication/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ defmodule WalEx.Replication.Server do

alias WalEx.Config.Registry, as: WalExRegistry
alias WalEx.Decoder
alias WalEx.Replication.Publisher
alias WalEx.Replication.QueryBuilder

def start_link(opts) do
Expand Down Expand Up @@ -48,20 +47,23 @@ defmodule WalEx.Replication.Server do
[
slot_name: slot_name,
publication: publication,
durable_slot: durable_slot
durable_slot: durable_slot,
message_middleware: message_middleware
] =
WalEx.Config.get_configs(app_name, [
:slot_name,
:publication,
:durable_slot
:durable_slot,
:message_middleware
])

state = %{
step: :disconnected,
app_name: app_name,
slot_name: slot_name,
publication: publication,
durable_slot: durable_slot
durable_slot: durable_slot,
message_middleware: message_middleware
}

{:ok, state}
Expand Down Expand Up @@ -138,7 +140,7 @@ defmodule WalEx.Replication.Server do
def handle_data(<<?w, _wal_start::64, _wal_end::64, _clock::64, rest::binary>>, state) do
rest
|> Decoder.decode_message()
|> Publisher.process_message(state.app_name)
|> state.message_middleware.(state.app_name)

{:noreply, state}
end
Expand Down

0 comments on commit 07db85d

Please sign in to comment.