From 265670bf779b22aa53e664070b1403be848d332b Mon Sep 17 00:00:00 2001 From: Bastien Penavayre Date: Fri, 10 May 2024 16:13:37 +0200 Subject: [PATCH] keep-alive: limit wal_end to the oldest transaction still being processed Currently we always return the server's wal_end+1 This approach has a few limits, can cause some issues. If we start processing wal_end A, then receive a keep-alive request with wal_end=A+x and reply A+x+1, this before A if finished processing, then if our process crashes the server will restart at A+x+1 and will never receive it again. To avoid this, this PR returns wal_end+1 only if there are no processing transactions if there are it returns the wal_end of the oldest transaction still in progress. This way we will be guaranteed to start back from there(A and not A+x+1) if we crash. --- lib/walex/replication/server.ex | 5 +++- test/walex/database_test.exs | 41 ++++++++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/lib/walex/replication/server.ex b/lib/walex/replication/server.ex index 07a4daf..780aca3 100644 --- a/lib/walex/replication/server.ex +++ b/lib/walex/replication/server.ex @@ -7,6 +7,7 @@ defmodule WalEx.Replication.Server do """ use Postgrex.ReplicationConnection + alias WalEx.Replication.Progress alias WalEx.Config.Registry, as: WalExRegistry alias WalEx.Decoder alias WalEx.Replication.Publisher @@ -144,9 +145,11 @@ defmodule WalEx.Replication.Server do end def handle_data(<>, state) do + wal_end = Progress.oldest_running_wal_end(state.app_name) || wal_end + 1 + messages = case reply do - 1 -> [<>] + 1 -> [<>] 0 -> [] end diff --git a/test/walex/database_test.exs b/test/walex/database_test.exs index 4b84e66..c1e4c61 100644 --- a/test/walex/database_test.exs +++ b/test/walex/database_test.exs @@ -1,6 +1,7 @@ defmodule WalEx.DatabaseTest do use ExUnit.Case, async: false import WalEx.Support.TestHelpers + alias WalEx.Replication.Progress alias WalEx.Supervisor, as: WalExSupervisor require Logger @@ -9,9 +10,10 @@ defmodule WalEx.DatabaseTest do @username "postgres" @password "postgres" @database "todos_test" + @app_name :todos @base_configs [ - name: :todos, + name: @app_name, hostname: @hostname, username: @username, password: @password, @@ -196,6 +198,43 @@ defmodule WalEx.DatabaseTest do assert [^durable_slot] = pg_replication_slots(database_pid) end + + test "wal_end in keep-alive is server's wal_end+1 when no transaction are in progress" do + TestSupervisor.start_link(@base_configs) + Progress.start_link(app_name: @app_name) + + server_wal_end = 6846 + clock = 42 + keep_alive_request = <> + state = %{app_name: @app_name} + + assert {:noreply, [reply], ^state} = + WalEx.Replication.Server.handle_data(keep_alive_request, state) + + <> = reply + assert wal_end == server_wal_end + 1 + end + + test "wal_end in keep-alive reply matches last in progress transaction" do + TestSupervisor.start_link(@base_configs) + Progress.start_link(app_name: @app_name) + + not_finished = 123 + server_wal_end = 568 + clock = 42 + keep_alive_request = <> + state = %{app_name: @app_name} + + Progress.begin(@app_name, {0, not_finished}) + + assert {:noreply, [reply], ^state} = + WalEx.Replication.Server.handle_data(keep_alive_request, state) + + <> = reply + assert wal_end == not_finished + + Progress.done(@app_name, {0, not_finished}) + end end @linux_path "/usr/lib/postgresql"