Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add configurable slot name #57

Merged
merged 4 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions lib/walex/config/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule WalEx.Config do

alias WalEx.Config.Registry, as: WalExRegistry

@allowed_config_value ~w(database hostname name password port publication username webhook_signing_secret)a
@allowed_config_value ~w(database hostname name password port publication username webhook_signing_secret slot_name)a
@allowed_config_values ~w(destinations event_relay modules subscriptions)a

def start_link(opts) do
Expand Down Expand Up @@ -38,8 +38,11 @@ defmodule WalEx.Config do
end

def get_configs(app_name, keys) when is_list(keys) and keys != [] do
order_map = keys |> Enum.with_index() |> Map.new()

WalExRegistry.get_state(:get_agent, __MODULE__, app_name)
|> Keyword.take(keys)
|> Enum.sort_by(fn {k, _} -> Map.get(order_map, k) end)
end

def get_database(app_name), do: get_configs(app_name, :database)
Expand Down Expand Up @@ -122,7 +125,8 @@ defmodule WalEx.Config do
publication: Keyword.get(configs, :publication),
destinations: Keyword.put(destinations, :modules, module_names),
webhook_signing_secret: Keyword.get(configs, :webhook_signing_secret),
event_relay: Keyword.get(configs, :event_relay)
event_relay: Keyword.get(configs, :event_relay),
slot_name: Keyword.get(configs, :slot_name) |> parse_slot_name(name)
]
end

Expand Down Expand Up @@ -192,6 +196,9 @@ defmodule WalEx.Config do
do: {k, if(is_binary(v), do: URI.decode(v), else: v)}
end

defp parse_slot_name(nil, app_name), do: to_string(app_name) <> "_walex"
defp parse_slot_name(slot_name, _), do: slot_name

defp set_url_opts(username, password, database, info) do
url_opts = [
username: username,
Expand Down
9 changes: 9 additions & 0 deletions lib/walex/replication/query_builder.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule WalEx.Replication.QueryBuilder do
def create_temporary_slot(state) do
"CREATE_REPLICATION_SLOT #{state.slot_name} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT;"
end

def start_replication_slot(state) do
"START_REPLICATION SLOT #{state.slot_name} LOGICAL 0/0 (proto_version '1', publication_names '#{state.publication}')"
end
end
43 changes: 28 additions & 15 deletions lib/walex/replication/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule WalEx.Replication.Server do
alias WalEx.Config.Registry, as: WalExRegistry
alias WalEx.Decoder
alias WalEx.Replication.Publisher
alias WalEx.Replication.QueryBuilder

def start_link(opts) do
app_name = Keyword.get(opts, :app_name)
Expand All @@ -19,7 +20,17 @@ defmodule WalEx.Replication.Server do
end

defp set_pgx_replication_conn_opts(app_name) do
database_configs_keys = [:hostname, :username, :password, :port, :database, :ssl, :ssl_opts, :socket_options]
database_configs_keys = [
:hostname,
:username,
:password,
:port,
:database,
:ssl,
:ssl_opts,
:socket_options
]

extra_opts = [auto_reconnect: true]
database_configs = WalEx.Config.get_configs(app_name, database_configs_keys)

Expand All @@ -34,27 +45,31 @@ defmodule WalEx.Replication.Server do
def init(opts) do
app_name = Keyword.get(opts, :app_name)

{:ok, %{step: :disconnected, app_name: app_name}}
[slot_name: slot_name, publication: publication] =
WalEx.Config.get_configs(app_name, [
:slot_name,
:publication
])

state = %{
step: :disconnected,
app_name: app_name,
slot_name: slot_name,
publication: publication
}

{:ok, state}
end

@impl true
def handle_connect(state) do
query =
"CREATE_REPLICATION_SLOT #{slot_name(state.app_name)} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT;"

query = QueryBuilder.create_temporary_slot(state)
{:query, query, %{state | step: :create_slot}}
end

@impl true
def handle_result([%Postgrex.Result{} | _results], state = %{step: :create_slot}) do
publication =
state.app_name
|> WalEx.Config.get_configs([:publication])
|> Keyword.get(:publication)

query =
"START_REPLICATION SLOT #{slot_name(state.app_name)} LOGICAL 0/0 (proto_version '1', publication_names '#{publication}')"

query = QueryBuilder.start_replication_slot(state)
{:stream, query, [], %{state | step: :streaming}}
end

Expand All @@ -80,6 +95,4 @@ defmodule WalEx.Replication.Server do

@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
defp current_time, do: System.os_time(:microsecond) - @epoch

defp slot_name(app_name), do: to_string(app_name) <> "_walex"
end
9 changes: 5 additions & 4 deletions test/walex/config/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ defmodule WalEx.ConfigTest do
hostname: @hostname,
username: @username,
password: @password,
port: 5432,
database: @database,
port: 5432,
ssl: false,
ssl_opts: [verify: :verify_none]
] ==
Expand Down Expand Up @@ -91,7 +91,8 @@ defmodule WalEx.ConfigTest do
publication: "publication",
destinations: [modules: [MyApp.CustomModule]],
webhook_signing_secret: nil,
event_relay: nil
event_relay: nil,
slot_name: "my_app_walex"
] == Config.get_configs(@app_name)
end
end
Expand Down Expand Up @@ -130,15 +131,15 @@ defmodule WalEx.ConfigTest do
ssl: false,
ssl_opts: [verify: :verify_none]
] ==
Config.get_configs(@app_name, [:database, :name, :ssl, :ssl_opts])
Config.get_configs(@app_name, [:name, :database, :ssl, :ssl_opts])

assert [
name: :other_name,
database: "other_database",
ssl: false,
ssl_opts: [verify: :verify_none]
] ==
Config.get_configs(:other_name, [:database, :name, :ssl, :ssl_opts])
Config.get_configs(:other_name, [:name, :database, :ssl, :ssl_opts])
end
end

Expand Down
12 changes: 12 additions & 0 deletions test/walex/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ defmodule WalEx.DatabaseTest do
assert [@replication_slot | _replication_slots] = pg_replication_slots(database_pid)
end

test "user-defined slot_name", %{database_pid: database_pid} do
slot_name = "userdefined"

config = Keyword.put(@base_configs, :slot_name, slot_name)

assert {:ok, replication_pid} = WalExSupervisor.start_link(config)

assert is_pid(replication_pid)
assert [slot | _replication_slots] = pg_replication_slots(database_pid)
assert Map.fetch!(slot, "slot_name") == slot_name
end

test "should re-initiate after forcing database process termination" do
assert {:ok, supervisor_pid} = TestSupervisor.start_link(@base_configs)
database_pid = get_database_pid(supervisor_pid)
Expand Down
Loading