Skip to content

Commit

Permalink
Merge pull request #58 from tryriot/check-publication-exists
Browse files Browse the repository at this point in the history
Check publication exists
  • Loading branch information
cpursley committed May 3, 2024
2 parents f68e1df + f0f29a3 commit b6804d9
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 0 deletions.
4 changes: 4 additions & 0 deletions lib/walex/replication/query_builder.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
defmodule WalEx.Replication.QueryBuilder do
def publication_exists(state) do
"SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication}' LIMIT 1;"
end

def create_temporary_slot(state) do
"CREATE_REPLICATION_SLOT #{state.slot_name} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT;"
end
Expand Down
11 changes: 11 additions & 0 deletions lib/walex/replication/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,21 @@ defmodule WalEx.Replication.Server do

@impl true
def handle_connect(state) do
query = QueryBuilder.publication_exists(state)
{:query, query, %{state | step: :publication_exists}}
end

@impl true
def handle_result([%Postgrex.Result{num_rows: 1}], state = %{step: :publication_exists}) do
query = QueryBuilder.create_temporary_slot(state)
{:query, query, %{state | step: :create_slot}}
end

@impl true
def handle_result(results, %{step: :publication_exists}) do
raise "Publication does not exist. #{inspect(results)}"
end

@impl true
def handle_result([%Postgrex.Result{} | _results], state = %{step: :create_slot}) do
query = QueryBuilder.start_replication_slot(state)
Expand Down
10 changes: 10 additions & 0 deletions test/walex/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ defmodule WalEx.DatabaseTest do
assert [%{"wal_level" => "logical"}] == query(pid, "SHOW wal_level;")
end

test "error early if publication doesn't exists" do
Process.flag(:trap_exit, true)
config = Keyword.put(@base_configs, :publication, "non_existent_publication")

{_pid, ref} =
spawn_monitor(fn -> WalExSupervisor.start_link(config) end)

assert_receive {:DOWN, ^ref, _, _, {:shutdown, _}}
end

test "should start replication slot", %{database_pid: database_pid} do
assert {:ok, replication_pid} = WalExSupervisor.start_link(@base_configs)
assert is_pid(replication_pid)
Expand Down

0 comments on commit b6804d9

Please sign in to comment.