Skip to content

Commit

Permalink
keep-alive: limit wal_end to the oldest transaction still being proce…
Browse files Browse the repository at this point in the history
…ssed

Currently we always return the server's wal_end+1
This approach has a few limits, can cause some issues.

If we start processing wal_end A,
then receive a keep-alive request with wal_end=A+x and reply A+x+1,
this before A if finished processing,
then if our process crashes the server will restart at A+x+1 and
will never receive it again.

To avoid this, this PR returns wal_end+1 only if there are no processing transactions
if there are it returns the wal_end of the oldest transaction still in progress.

This way we will be guaranteed to start back from there(A and not A+x+1) if we crash.
  • Loading branch information
DaemonSnake committed May 22, 2024
1 parent 74c03ec commit 265670b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
5 changes: 4 additions & 1 deletion lib/walex/replication/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule WalEx.Replication.Server do
"""
use Postgrex.ReplicationConnection

alias WalEx.Replication.Progress
alias WalEx.Config.Registry, as: WalExRegistry
alias WalEx.Decoder
alias WalEx.Replication.Publisher
Expand Down Expand Up @@ -144,9 +145,11 @@ defmodule WalEx.Replication.Server do
end

def handle_data(<<?k, wal_end::64, _clock::64, reply>>, state) do
wal_end = Progress.oldest_running_wal_end(state.app_name) || wal_end + 1

messages =
case reply do
1 -> [<<?r, wal_end + 1::64, wal_end + 1::64, wal_end + 1::64, current_time()::64, 0>>]
1 -> [<<?r, wal_end::64, wal_end::64, wal_end::64, current_time()::64, 0>>]
0 -> []
end

Expand Down
41 changes: 40 additions & 1 deletion test/walex/database_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule WalEx.DatabaseTest do
use ExUnit.Case, async: false
import WalEx.Support.TestHelpers
alias WalEx.Replication.Progress
alias WalEx.Supervisor, as: WalExSupervisor

require Logger
Expand All @@ -9,9 +10,10 @@ defmodule WalEx.DatabaseTest do
@username "postgres"
@password "postgres"
@database "todos_test"
@app_name :todos

@base_configs [
name: :todos,
name: @app_name,
hostname: @hostname,
username: @username,
password: @password,
Expand Down Expand Up @@ -196,6 +198,43 @@ defmodule WalEx.DatabaseTest do

assert [^durable_slot] = pg_replication_slots(database_pid)
end

test "wal_end in keep-alive is server's wal_end+1 when no transaction are in progress" do
TestSupervisor.start_link(@base_configs)
Progress.start_link(app_name: @app_name)

server_wal_end = 6846
clock = 42
keep_alive_request = <<?k, server_wal_end::64, clock::64, 1>>
state = %{app_name: @app_name}

assert {:noreply, [reply], ^state} =
WalEx.Replication.Server.handle_data(keep_alive_request, state)

<<?r, wal_end::64, wal_end::64, wal_end::64, _clock::64, 0>> = reply
assert wal_end == server_wal_end + 1
end

test "wal_end in keep-alive reply matches last in progress transaction" do
TestSupervisor.start_link(@base_configs)
Progress.start_link(app_name: @app_name)

not_finished = 123
server_wal_end = 568
clock = 42
keep_alive_request = <<?k, server_wal_end::64, clock::64, 1>>
state = %{app_name: @app_name}

Progress.begin(@app_name, {0, not_finished})

assert {:noreply, [reply], ^state} =
WalEx.Replication.Server.handle_data(keep_alive_request, state)

<<?r, wal_end::64, wal_end::64, wal_end::64, _clock::64, 0>> = reply
assert wal_end == not_finished

Progress.done(@app_name, {0, not_finished})
end
end

@linux_path "/usr/lib/postgresql"
Expand Down

0 comments on commit 265670b

Please sign in to comment.