diff --git a/.gitignore b/.gitignore index e8d58ca..fad61a8 100644 --- a/.gitignore +++ b/.gitignore @@ -28,4 +28,8 @@ walex-*.tar .elixir_ls/ # VSCode -.vscode/ \ No newline at end of file +.vscode/ + +# .DS_Store files from macOS +.DS_Store +**/.DS_Store \ No newline at end of file diff --git a/lib/walex/replication/publisher.ex b/lib/walex/replication/publisher.ex index f4f71c0..dc88843 100644 --- a/lib/walex/replication/publisher.ex +++ b/lib/walex/replication/publisher.ex @@ -4,7 +4,7 @@ defmodule WalEx.Replication.Publisher do """ use GenServer - alias WalEx.{Changes, Destinations, Events, Types} + alias WalEx.{Changes, Config, Destinations, Events, Types} alias WalEx.Decoder.Messages defmodule(State, @@ -18,16 +18,27 @@ defmodule WalEx.Replication.Publisher do defstruct [:relations] - def start_link(args) do - GenServer.start_link(__MODULE__, args, name: __MODULE__) + def start_link(opts) do + name = + opts + |> Keyword.get(:app_name) + |> registry_name + + GenServer.start_link(__MODULE__, opts, name: name) end def process_message(message, app_name) do - GenServer.cast(__MODULE__, %{message: message, app_name: app_name}) + name = registry_name(app_name) + + GenServer.cast(name, %{message: message, app_name: app_name}) + end + + defp registry_name(app_name) do + Config.Registry.set_name(:set_gen_server, __MODULE__, app_name) end @impl true - def init(_) do + def init(_opts) do Process.flag(:message_queue_data, :off_heap) {:ok, %State{}} diff --git a/lib/walex/replication/server.ex b/lib/walex/replication/server.ex index a5c34c7..cb8cf69 100644 --- a/lib/walex/replication/server.ex +++ b/lib/walex/replication/server.ex @@ -5,7 +5,6 @@ defmodule WalEx.Replication.Server do @moduledoc """ This module is responsible for setting up the replication connection """ - use Postgrex.ReplicationConnection alias WalEx.Config.Registry, as: WalExRegistry @@ -35,10 +34,6 @@ defmodule WalEx.Replication.Server do def init(opts) do app_name = Keyword.get(opts, :app_name) - if is_nil(Process.whereis(Publisher)) do - {:ok, _pid} = Publisher.start_link([]) - end - {:ok, %{step: :disconnected, app_name: app_name}} end diff --git a/lib/walex/replication/supervisor.ex b/lib/walex/replication/supervisor.ex index 0201203..b39a7f7 100755 --- a/lib/walex/replication/supervisor.ex +++ b/lib/walex/replication/supervisor.ex @@ -3,7 +3,7 @@ defmodule WalEx.Replication.Supervisor do use Supervisor - alias WalEx.Replication.Server + alias WalEx.Replication.{Publisher, Server} def start_link(opts) do app_name = Keyword.get(opts, :app_name) @@ -19,7 +19,10 @@ defmodule WalEx.Replication.Supervisor do |> Keyword.get(:configs) |> Keyword.get(:app_name) - children = [{Server, app_name: app_name}] + children = [ + {Publisher, app_name: app_name}, + {Server, app_name: app_name} + ] Supervisor.init(children, strategy: :one_for_all) end diff --git a/mix.exs b/mix.exs index 8d73da8..6e9dbd5 100644 --- a/mix.exs +++ b/mix.exs @@ -14,7 +14,8 @@ defmodule WalEx.MixProject do aliases: aliases(), name: "WalEx", source_url: "https://github.com/cpursley/walex", - test_coverage: [tool: ExCoveralls] + test_coverage: [tool: ExCoveralls], + elixirc_paths: elixirc_paths(Mix.env()) ] end @@ -72,4 +73,7 @@ defmodule WalEx.MixProject do ] ] end + + defp elixirc_paths(:test), do: ["lib", "test/support"] + defp elixirc_paths(_), do: ["lib"] end diff --git a/test/support/test_helpers.ex b/test/support/test_helpers.ex new file mode 100644 index 0000000..34a2a8e --- /dev/null +++ b/test/support/test_helpers.ex @@ -0,0 +1,12 @@ +defmodule WalEx.Support.TestHelpers do + def find_worker_pid(supervisor_pid, child_module) do + supervisor_pid + |> Supervisor.which_children() + |> find_pid(child_module) + end + + defp find_pid(children, module_name) do + {_, pid, _, _} = Enum.find(children, fn {module, _, _, _} -> module == module_name end) + pid + end +end diff --git a/test/walex/config/config_test.exs b/test/walex/config/config_test.exs index 722a6b2..c3e4c83 100644 --- a/test/walex/config/config_test.exs +++ b/test/walex/config/config_test.exs @@ -4,16 +4,29 @@ defmodule WalEx.ConfigTest do alias WalEx.Config alias Config.Registry, as: WalExRegistry + @base_configs [ + name: :test_name, + hostname: "hostname", + username: "username", + password: "password", + database: "database", + port: 5432, + subscriptions: ["subscriptions"], + publication: "publication", + modules: [MyApp.CustomModule], + ssl: false, + ssl_opts: [verify: :verify_none] + ] + setup_all do - assert {:ok, pid} = WalExRegistry.start_registry() - assert is_pid(pid) + {:ok, _pid} = WalExRegistry.start_registry() :timer.sleep(1000) :ok end describe "start_link/2" do test "should start a process" do - assert {:ok, pid} = Config.start_link(configs: get_base_configs()) + {:ok, pid} = Config.start_link(configs: @base_configs) assert is_pid(pid) end @@ -26,7 +39,7 @@ defmodule WalEx.ConfigTest do modules: ["modules"] ] - assert {:ok, pid} = Config.start_link(configs: configs) + {:ok, pid} = Config.start_link(configs: configs) assert is_pid(pid) assert [ @@ -52,8 +65,7 @@ defmodule WalEx.ConfigTest do describe "get_configs/" do setup do - assert {:ok, pid} = Config.start_link(configs: get_base_configs()) - assert is_pid(pid) + {:ok, _pid} = Config.start_link(configs: @base_configs) :ok end @@ -79,8 +91,7 @@ defmodule WalEx.ConfigTest do describe "get_configs/2" do setup do - assert {:ok, pid} = Config.start_link(configs: get_base_configs()) - assert is_pid(pid) + {:ok, _pid} = Config.start_link(configs: @base_configs) :ok end @@ -100,7 +111,7 @@ defmodule WalEx.ConfigTest do test "should filter configs by process name" do configs = - get_base_configs() + @base_configs |> Keyword.replace(:name, :other_name) |> Keyword.replace(:database, "other_database") @@ -127,8 +138,7 @@ defmodule WalEx.ConfigTest do describe "add_config/3" do setup do - {:ok, pid} = Config.start_link(configs: get_base_configs()) - assert is_pid(pid) + {:ok, _pid} = Config.start_link(configs: @base_configs) :ok end @@ -152,8 +162,7 @@ defmodule WalEx.ConfigTest do describe "remove_config/3" do setup do - {:ok, pid} = Config.start_link(configs: get_base_configs()) - assert is_pid(pid) + {:ok, _pid} = Config.start_link(configs: @base_configs) :ok end @@ -175,8 +184,7 @@ defmodule WalEx.ConfigTest do describe "replace_config/3" do setup do - {:ok, pid} = Config.start_link(configs: get_base_configs()) - assert is_pid(pid) + {:ok, _pid} = Config.start_link(configs: @base_configs) :ok end @@ -191,8 +199,7 @@ defmodule WalEx.ConfigTest do describe "build_module_names/3" do setup do - {:ok, pid} = Config.start_link(configs: get_base_configs()) - assert is_pid(pid) + {:ok, _pid} = Config.start_link(configs: @base_configs) :ok end @@ -220,8 +227,7 @@ defmodule WalEx.ConfigTest do describe "to_module_name/1" do setup do - {:ok, pid} = Config.start_link(configs: get_base_configs()) - assert is_pid(pid) + {:ok, _pid} = Config.start_link(configs: @base_configs) :ok end @@ -237,28 +243,4 @@ defmodule WalEx.ConfigTest do assert "TestName" == Config.to_module_name(:"Elixir.TestName") end end - - defp get_base_configs(keys \\ []) do - configs = [ - name: :test_name, - hostname: "hostname", - username: "username", - password: "password", - database: "database", - port: 5432, - subscriptions: ["subscriptions"], - publication: "publication", - modules: [MyApp.CustomModule], - ssl: false, - ssl_opts: [verify: :verify_none] - ] - - case keys do - [] -> - configs - - _keys -> - Keyword.take(configs, keys) - end - end end diff --git a/test/walex/config/registry_test.exs b/test/walex/config/registry_test.exs index f242800..a3b5c61 100644 --- a/test/walex/config/registry_test.exs +++ b/test/walex/config/registry_test.exs @@ -13,7 +13,7 @@ defmodule WalEx.Config.RegistryTest do describe "set_name/3" do setup do - assert {:ok, _pid} = WalExRegistry.start_registry() + {:ok, _pid} = WalExRegistry.start_registry() :ok end @@ -35,21 +35,14 @@ defmodule WalEx.Config.RegistryTest do describe "get_state/3" do setup do - assert {:ok, _pid} = WalExRegistry.start_registry() + {:ok, _pid} = WalExRegistry.start_registry() :ok end test "should set agent state" do name = WalExRegistry.set_name(:set_agent, __MODULE__, :app_name_test) - configs = [ - my_atom: :test_config, - my_number: 99, - my_string: "test config", - my_map: %{john: :doe}, - my_list: [1, 9, 9, 4] - ] - + configs = [] Agent.start_link(fn -> configs end, name: name) assert configs == WalExRegistry.get_state(:get_agent, __MODULE__, :app_name_test) diff --git a/test/walex/database_test.exs b/test/walex/database_test.exs index 67deb58..4879063 100644 --- a/test/walex/database_test.exs +++ b/test/walex/database_test.exs @@ -8,24 +8,34 @@ defmodule WalEx.DatabaseTest do @password "postgres" @database "todos_test" + @base_configs [ + name: :todos, + hostname: @hostname, + username: @username, + password: @password, + database: @database, + port: 5432, + subscriptions: ["user", "todo"], + publication: "events" + ] + describe "logical replication" do setup do - {:ok, pid} = start_database() + {:ok, database_pid} = start_database() - %{pid: pid} + %{database_pid: database_pid} end - test "should have logical replication set up", %{pid: pid} do + test "should have logical replication set up", %{database_pid: pid} do show_wall_level = "SHOW wal_level;" assert is_pid(pid) assert [%{"wal_level" => "logical"}] == query(pid, show_wall_level) end - test "should start replication slot", %{pid: database_pid} do - assert {:ok, replication_pid} = WalExSupervisor.start_link(get_configs()) + test "should start replication slot", %{database_pid: database_pid} do + assert {:ok, replication_pid} = WalExSupervisor.start_link(@base_configs) assert is_pid(replication_pid) - assert is_pid(database_pid) pg_replication_slots = "SELECT slot_name, slot_type, active FROM \"pg_replication_slots\";" @@ -38,19 +48,6 @@ defmodule WalEx.DatabaseTest do end end - def get_configs do - [ - name: :todos, - hostname: @hostname, - username: @username, - password: @password, - database: @database, - port: 5432, - subscriptions: ["user", "todo"], - publication: "events" - ] - end - def start_database do Postgrex.start_link( hostname: @hostname, diff --git a/test/walex/decoder/decoder_test.exs b/test/walex/decoder/decoder_test.exs index 54a9f94..96e7257 100644 --- a/test/walex/decoder/decoder_test.exs +++ b/test/walex/decoder/decoder_test.exs @@ -77,7 +77,6 @@ defmodule WalEx.DecoderTest do ] } - # Adding assertion for "numeric" types, which was missing from the original implementation assert WalEx.Decoder.decode_message( <<82, 0, 0, 71, 92, 112, 117, 98, 108, 105, 99, 0, 116, 101, 109, 112, 0, 100, 0, 1, 0, 116, 101, 115, 116, 0, 0, 0, 6, 164, 255, 255, 255, 255>> diff --git a/test/walex/event_test.exs b/test/walex/event_test.exs new file mode 100644 index 0000000..b78994c --- /dev/null +++ b/test/walex/event_test.exs @@ -0,0 +1,145 @@ +defmodule WalEx.EventTest do + use ExUnit.Case, async: false + import WalEx.Support.TestHelpers + + alias WalEx.Supervisor, as: WalExSupervisor + + @app_name :test_app + @hostname "localhost" + @username "postgres" + @password "postgres" + @database "todos_test" + + @base_configs [ + name: @app_name, + hostname: @hostname, + username: @username, + password: @password, + database: @database, + port: 5432, + subscriptions: ["user", "todo"], + publication: ["events"], + modules: [TestApp.TestModule] + ] + + describe "process_all/1" do + setup do + {:ok, database_pid} = start_database() + {:ok, supervisor_pid} = WalExSupervisor.start_link(@base_configs) + + %{database_pid: database_pid, supervisor_pid: supervisor_pid} + end + + test "should successfully receive Transaction", %{database_pid: database_pid} do + events_pid = Process.whereis(WalEx.Events) + assert is_pid(events_pid) + + update_user(database_pid) + + # https://www.thegreatcodeadventure.com/testing-genservers-with-erlang-trace/ + :erlang.trace(events_pid, true, [:receive]) + + assert_receive { + :trace, + ^events_pid, + :receive, + {:"$gen_call", _pid_and_ref, + { + :process, + %WalEx.Changes.Transaction{ + changes: [ + %WalEx.Changes.UpdatedRecord{ + type: "UPDATE", + old_record: _old_record, + record: %{ + id: 1, + name: "John Doe", + age: 30, + created_at: _created_at, + email: "john.doe@example.com", + updated_at: _updated_at + }, + schema: "public", + table: "user", + columns: _columns, + commit_timestamp: _updated_record_commit_timestamp + } + ], + commit_timestamp: _transaction_commit_timestamp + }, + :test_app + }} + } + end + + test "should restart the Publisher & Events processes when error", %{ + database_pid: database_pid, + supervisor_pid: supervisor_pid + } do + events_pid = Process.whereis(WalEx.Events) + assert is_pid(events_pid) + + replication_supervisor_pid = + find_worker_pid(supervisor_pid, WalEx.Replication.Supervisor) + + assert is_pid(replication_supervisor_pid) + + replication_publisher_pid = + find_worker_pid(replication_supervisor_pid, WalEx.Replication.Publisher) + + assert is_pid(replication_publisher_pid) + + update_user(database_pid) + + # https://smartlogic.io/blog/test-process-monitoring/ + process_ref = Process.monitor(events_pid) + + assert_receive { + :DOWN, + ^process_ref, + :process, + ^events_pid, + {%RuntimeError{message: "Process error"}, _stacktrace} + } + + # Wait for supervisor to restart Events GenServer and Publisher + :timer.sleep(500) + + new_events_pid = Process.whereis(WalEx.Events) + + assert is_pid(new_events_pid) + refute events_pid == new_events_pid + + new_replication_publisher_pid = + find_worker_pid(replication_supervisor_pid, WalEx.Replication.Publisher) + + assert is_pid(new_replication_publisher_pid) + refute replication_publisher_pid == new_replication_publisher_pid + end + end + + defp update_user(database_pid) do + update_user = """ + UPDATE \"user\" SET age = 30 WHERE id = 1 + """ + + Postgrex.query!(database_pid, update_user, []) + end + + defp start_database do + Postgrex.start_link( + hostname: @hostname, + username: @username, + password: @password, + database: @database + ) + end +end + +defmodule TestApp.TestModule do + use WalEx.Event, name: :test_app + + def process_all(%WalEx.Changes.Transaction{}) do + raise RuntimeError, "Process error" + end +end diff --git a/test/walex/supervisor_test.exs b/test/walex/supervisor_test.exs index 7eed5db..a06749e 100644 --- a/test/walex/supervisor_test.exs +++ b/test/walex/supervisor_test.exs @@ -1,13 +1,43 @@ defmodule WalEx.SupervisorTest do use ExUnit.Case, async: false + import WalEx.Support.TestHelpers alias WalEx.Supervisor, as: WalExSupervisor + alias WalEx.Replication + + @base_configs [ + name: :test_name, + hostname: "hostname", + username: "username", + password: "password", + database: "todos_test", + port: 5432, + subscriptions: ["subscriptions"], + publication: "publication" + ] describe "start_link/2" do - test "should start a process" do - assert {:ok, pid} = WalExSupervisor.start_link(get_base_configs()) + test "should start Supervisor and child processes" do + assert {:ok, walex_supervisor_pid} = WalExSupervisor.start_link(@base_configs) + assert is_pid(walex_supervisor_pid) + + assert %{active: 6, workers: 5, supervisors: 1, specs: 6} = + Supervisor.count_children(walex_supervisor_pid) + + replication_supervisor_pid = + find_worker_pid(walex_supervisor_pid, Replication.Supervisor) + + assert is_pid(replication_supervisor_pid) + + replication_publisher_pid = + find_worker_pid(replication_supervisor_pid, Replication.Publisher) + + assert is_pid(replication_publisher_pid) - assert is_pid(pid) + replication_server_pid = + find_worker_pid(replication_supervisor_pid, Replication.Server) + + assert is_pid(replication_server_pid) end test "should raise if any required config is missing" do @@ -17,7 +47,7 @@ defmodule WalEx.SupervisorTest do end test "should start multiple supervision trees" do - configs_1 = get_base_configs() + configs_1 = @base_configs configs_2 = Keyword.put(configs_1, :name, :other_name) configs_3 = Keyword.put(configs_1, :name, :another_name) @@ -31,28 +61,8 @@ defmodule WalEx.SupervisorTest do end test "should start WalEx.Registry" do - assert {:ok, _pid} = WalExSupervisor.start_link(get_base_configs()) - - pid = Process.whereis(:walex_registry) - assert is_pid(pid) - end - end - - defp get_base_configs(keys \\ []) do - configs = [ - name: :test_name, - hostname: "hostname", - username: "username", - password: "password", - database: "todos_test", - port: 5432, - subscriptions: ["subscriptions"], - publication: "publication" - ] - - case keys do - [] -> configs - _keys -> Keyword.take(configs, keys) + assert {:ok, _pid} = WalExSupervisor.start_link(@base_configs) + assert is_pid(Process.whereis(:walex_registry)) end end end