Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Durable slot #62

Merged
merged 4 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/walex/config/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule WalEx.Config do

alias WalEx.Config.Registry, as: WalExRegistry

@allowed_config_value ~w(database hostname name password port publication username webhook_signing_secret slot_name)a
@allowed_config_value ~w(database hostname name password port publication username webhook_signing_secret slot_name durable_slot)a
@allowed_config_values ~w(destinations event_relay modules subscriptions)a

def start_link(opts) do
Expand Down Expand Up @@ -126,7 +126,8 @@ defmodule WalEx.Config do
destinations: Keyword.put(destinations, :modules, module_names),
webhook_signing_secret: Keyword.get(configs, :webhook_signing_secret),
event_relay: Keyword.get(configs, :event_relay),
slot_name: Keyword.get(configs, :slot_name) |> parse_slot_name(name)
slot_name: Keyword.get(configs, :slot_name) |> parse_slot_name(name),
durable_slot: Keyword.get(configs, :durable_slot, false) == true
]
end

Expand Down
8 changes: 8 additions & 0 deletions lib/walex/replication/query_builder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,18 @@ defmodule WalEx.Replication.QueryBuilder do
"SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication}' LIMIT 1;"
end

def slot_exists(state) do
"SELECT active FROM pg_replication_slots WHERE slot_name = '#{state.slot_name}' LIMIT 1;"
end

def create_temporary_slot(state) do
"CREATE_REPLICATION_SLOT #{state.slot_name} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT;"
end

def create_durable_slot(state) do
"CREATE_REPLICATION_SLOT #{state.slot_name} LOGICAL pgoutput NOEXPORT_SNAPSHOT;"
end

def start_replication_slot(state) do
"START_REPLICATION SLOT #{state.slot_name} LOGICAL 0/0 (proto_version '1', publication_names '#{state.publication}')"
end
Expand Down
61 changes: 55 additions & 6 deletions lib/walex/replication/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,23 @@ defmodule WalEx.Replication.Server do
def init(opts) do
app_name = Keyword.get(opts, :app_name)

[slot_name: slot_name, publication: publication] =
[
slot_name: slot_name,
publication: publication,
durable_slot: durable_slot
] =
WalEx.Config.get_configs(app_name, [
:slot_name,
:publication
:publication,
:durable_slot
])

state = %{
step: :disconnected,
app_name: app_name,
slot_name: slot_name,
publication: publication
publication: publication,
durable_slot: durable_slot
}

{:ok, state}
Expand All @@ -69,13 +75,50 @@ defmodule WalEx.Replication.Server do

@impl true
def handle_result([%Postgrex.Result{num_rows: 1}], state = %{step: :publication_exists}) do
query = QueryBuilder.create_temporary_slot(state)
if state.durable_slot do
query = QueryBuilder.slot_exists(state)
{:query, query, %{state | step: :slot_exists}}
else
query = QueryBuilder.create_temporary_slot(state)
{:query, query, %{state | step: :create_slot}}
end
end

@impl true
def handle_result(results, %{step: :publication_exists} = state) do
case results do
[%Postgrex.Result{num_rows: 0}] ->
raise "Publication doesn't exists. publication: #{inspect(state.publication)}"

_ ->
raise "Unexpected result when checking if publication exists. #{inspect(results)}"
end
end

@impl true
def handle_result([%Postgrex.Result{num_rows: 0}], state = %{step: :slot_exists}) do
query = QueryBuilder.create_durable_slot(state)
{:query, query, %{state | step: :create_slot}}
end

@impl true
def handle_result(results, %{step: :publication_exists}) do
raise "Publication does not exist. #{inspect(results)}"
def handle_result(
[%Postgrex.Result{columns: ["active"], rows: [[active]]}],
state = %{step: :slot_exists}
) do
case active do
"f" ->
query = QueryBuilder.start_replication_slot(state)
{:stream, query, [], %{state | step: :streaming}}

"t" ->
raise "Durable slot already active"
end
end

@impl true
def handle_result(results, %{step: :slot_exists}) do
raise "Failed to check if durable slot already exists. #{inspect(results)}"
end

@impl true
Expand All @@ -84,6 +127,12 @@ defmodule WalEx.Replication.Server do
{:stream, query, [], %{state | step: :streaming}}
end

@impl true
def handle_result(%Postgrex.Error{} = error, %{step: :create_slot}) do
# if durable slot, can happen if multiple instances try to create the same slot
raise "Failed to create replication slot, #{inspect(error)}"
end

@impl true
# https://www.postgresql.org/docs/14/protocol-replication.html
def handle_data(<<?w, _wal_start::64, _wal_end::64, _clock::64, rest::binary>>, state) do
Expand Down
9 changes: 8 additions & 1 deletion test/support/test_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,18 @@ defmodule WalEx.Support.TestHelpers do

def pg_replication_slots(database_pid) do
pg_replication_slots_query =
"SELECT slot_name, slot_type, active FROM \"pg_replication_slots\";"
"SELECT slot_name, slot_type, active, temporary FROM \"pg_replication_slots\";"

query(database_pid, pg_replication_slots_query)
end

def pg_drop_slots(database_pid) do
pg_drop_slots_query =
"SELECT pg_drop_replication_slot(slot_name) FROM \"pg_replication_slots\";"

query(database_pid, pg_drop_slots_query)
end

def update_user(database_pid) do
update_user = """
UPDATE \"user\" SET age = 30 WHERE id = 1
Expand Down
3 changes: 2 additions & 1 deletion test/walex/config/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ defmodule WalEx.ConfigTest do
destinations: [modules: [MyApp.CustomModule]],
webhook_signing_secret: nil,
event_relay: nil,
slot_name: "my_app_walex"
slot_name: "my_app_walex",
durable_slot: false
] == Config.get_configs(@app_name)
end
end
Expand Down
61 changes: 57 additions & 4 deletions test/walex/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,17 @@ defmodule WalEx.DatabaseTest do
destinations: [modules: [TestModule]]
]

@replication_slot %{"active" => true, "slot_name" => "todos_walex", "slot_type" => "logical"}
@replication_slot %{
"active" => true,
"slot_name" => "todos_walex",
"slot_type" => "logical",
"temporary" => true
}

describe "logical replication" do
setup do
{:ok, database_pid} = start_database()
pg_drop_slots(database_pid)

%{database_pid: database_pid}
end
Expand All @@ -49,7 +55,7 @@ defmodule WalEx.DatabaseTest do
test "should start replication slot", %{database_pid: database_pid} do
assert {:ok, replication_pid} = WalExSupervisor.start_link(@base_configs)
assert is_pid(replication_pid)
assert [@replication_slot | _replication_slots] = pg_replication_slots(database_pid)
assert [@replication_slot] = pg_replication_slots(database_pid)
end

test "user-defined slot_name", %{database_pid: database_pid} do
Expand All @@ -60,7 +66,7 @@ defmodule WalEx.DatabaseTest do
assert {:ok, replication_pid} = WalExSupervisor.start_link(config)

assert is_pid(replication_pid)
assert [slot | _replication_slots] = pg_replication_slots(database_pid)
assert [slot] = pg_replication_slots(database_pid)
assert Map.fetch!(slot, "slot_name") == slot_name
end

Expand All @@ -69,7 +75,7 @@ defmodule WalEx.DatabaseTest do
database_pid = get_database_pid(supervisor_pid)

assert is_pid(database_pid)
assert [@replication_slot | _replication_slots] = pg_replication_slots(database_pid)
assert [@replication_slot] = pg_replication_slots(database_pid)

assert Process.exit(database_pid, :kill)
|> tap_debug("Forcefully killed database connection: ")
Expand Down Expand Up @@ -143,6 +149,53 @@ defmodule WalEx.DatabaseTest do
refute database_pid == new_database_pid
assert_update_user(new_database_pid)
end

test "durable replication slot", %{database_pid: database_pid} do
assert [] = pg_replication_slots(database_pid)

slot_name = "durable_slot"
durable_opts = Keyword.merge(@base_configs, durable_slot: true, slot_name: slot_name)

durable_slot = %{
"active" => true,
"slot_name" => slot_name,
"slot_type" => "logical",
"temporary" => false
}

stopped_slot = Map.replace(durable_slot, "active", false)

start_supervised!({WalExSupervisor, durable_opts},
restart: :temporary,
id: :ok_supervisor
)

assert [^durable_slot] = pg_replication_slots(database_pid)

other_app_opts = Keyword.replace!(durable_opts, :name, :other_app)

assert {:error, {{:shutdown, error}, _}} =
start_supervised({WalExSupervisor, other_app_opts},
restart: :temporary,
id: :other_app_supervisor
)

assert {:failed_to_start_child, WalEx.Replication.Supervisor, {:shutdown, error}} = error
assert {:failed_to_start_child, WalEx.Replication.Server, error} = error
assert %RuntimeError{message: "Durable slot already active"} = error

stop_supervised(:ok_supervisor)
# sleep to make sure that Postgres detect that the connection is closed
Process.sleep(1_000)
assert [^stopped_slot] = pg_replication_slots(database_pid)

start_supervised!({WalExSupervisor, durable_opts},
restart: :temporary,
id: :ok_supervisor
)

assert [^durable_slot] = pg_replication_slots(database_pid)
end
end

@linux_path "/usr/lib/postgresql"
Expand Down
Loading