Skip to content

Commit

Permalink
Merge pull request #31 from cpursley/replication-supervisor
Browse files Browse the repository at this point in the history
Replication supervisor
  • Loading branch information
cpursley committed Jan 4, 2024
2 parents 37e4ebd + a4222e6 commit 3bc2e32
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 113 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ walex-*.tar
.elixir_ls/

# VSCode
.vscode/
.vscode/

# .DS_Store files from macOS
.DS_Store
**/.DS_Store
21 changes: 16 additions & 5 deletions lib/walex/replication/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule WalEx.Replication.Publisher do
"""
use GenServer

alias WalEx.{Changes, Destinations, Events, Types}
alias WalEx.{Changes, Config, Destinations, Events, Types}
alias WalEx.Decoder.Messages

defmodule(State,
Expand All @@ -18,16 +18,27 @@ defmodule WalEx.Replication.Publisher do

defstruct [:relations]

def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
def start_link(opts) do
name =
opts
|> Keyword.get(:app_name)
|> registry_name

GenServer.start_link(__MODULE__, opts, name: name)
end

def process_message(message, app_name) do
GenServer.cast(__MODULE__, %{message: message, app_name: app_name})
name = registry_name(app_name)

GenServer.cast(name, %{message: message, app_name: app_name})
end

defp registry_name(app_name) do
Config.Registry.set_name(:set_gen_server, __MODULE__, app_name)
end

@impl true
def init(_) do
def init(_opts) do
Process.flag(:message_queue_data, :off_heap)

{:ok, %State{}}
Expand Down
5 changes: 0 additions & 5 deletions lib/walex/replication/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ defmodule WalEx.Replication.Server do
@moduledoc """
This module is responsible for setting up the replication connection
"""

use Postgrex.ReplicationConnection

alias WalEx.Config.Registry, as: WalExRegistry
Expand Down Expand Up @@ -35,10 +34,6 @@ defmodule WalEx.Replication.Server do
def init(opts) do
app_name = Keyword.get(opts, :app_name)

if is_nil(Process.whereis(Publisher)) do
{:ok, _pid} = Publisher.start_link([])
end

{:ok, %{step: :disconnected, app_name: app_name}}
end

Expand Down
7 changes: 5 additions & 2 deletions lib/walex/replication/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule WalEx.Replication.Supervisor do

use Supervisor

alias WalEx.Replication.Server
alias WalEx.Replication.{Publisher, Server}

def start_link(opts) do
app_name = Keyword.get(opts, :app_name)
Expand All @@ -19,7 +19,10 @@ defmodule WalEx.Replication.Supervisor do
|> Keyword.get(:configs)
|> Keyword.get(:app_name)

children = [{Server, app_name: app_name}]
children = [
{Publisher, app_name: app_name},
{Server, app_name: app_name}
]

Supervisor.init(children, strategy: :one_for_all)
end
Expand Down
6 changes: 5 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ defmodule WalEx.MixProject do
aliases: aliases(),
name: "WalEx",
source_url: "https://github.com/cpursley/walex",
test_coverage: [tool: ExCoveralls]
test_coverage: [tool: ExCoveralls],
elixirc_paths: elixirc_paths(Mix.env())
]
end

Expand Down Expand Up @@ -72,4 +73,7 @@ defmodule WalEx.MixProject do
]
]
end

defp elixirc_paths(:test), do: ["lib", "test/support"]
defp elixirc_paths(_), do: ["lib"]
end
12 changes: 12 additions & 0 deletions test/support/test_helpers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule WalEx.Support.TestHelpers do
def find_worker_pid(supervisor_pid, child_module) do
supervisor_pid
|> Supervisor.which_children()
|> find_pid(child_module)
end

defp find_pid(children, module_name) do
{_, pid, _, _} = Enum.find(children, fn {module, _, _, _} -> module == module_name end)
pid
end
end
68 changes: 25 additions & 43 deletions test/walex/config/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,29 @@ defmodule WalEx.ConfigTest do
alias WalEx.Config
alias Config.Registry, as: WalExRegistry

@base_configs [
name: :test_name,
hostname: "hostname",
username: "username",
password: "password",
database: "database",
port: 5432,
subscriptions: ["subscriptions"],
publication: "publication",
modules: [MyApp.CustomModule],
ssl: false,
ssl_opts: [verify: :verify_none]
]

setup_all do
assert {:ok, pid} = WalExRegistry.start_registry()
assert is_pid(pid)
{:ok, _pid} = WalExRegistry.start_registry()
:timer.sleep(1000)
:ok
end

describe "start_link/2" do
test "should start a process" do
assert {:ok, pid} = Config.start_link(configs: get_base_configs())
{:ok, pid} = Config.start_link(configs: @base_configs)
assert is_pid(pid)
end

Expand All @@ -26,7 +39,7 @@ defmodule WalEx.ConfigTest do
modules: ["modules"]
]

assert {:ok, pid} = Config.start_link(configs: configs)
{:ok, pid} = Config.start_link(configs: configs)
assert is_pid(pid)

assert [
Expand All @@ -52,8 +65,7 @@ defmodule WalEx.ConfigTest do

describe "get_configs/" do
setup do
assert {:ok, pid} = Config.start_link(configs: get_base_configs())
assert is_pid(pid)
{:ok, _pid} = Config.start_link(configs: @base_configs)
:ok
end

Expand All @@ -79,8 +91,7 @@ defmodule WalEx.ConfigTest do

describe "get_configs/2" do
setup do
assert {:ok, pid} = Config.start_link(configs: get_base_configs())
assert is_pid(pid)
{:ok, _pid} = Config.start_link(configs: @base_configs)
:ok
end

Expand All @@ -100,7 +111,7 @@ defmodule WalEx.ConfigTest do

test "should filter configs by process name" do
configs =
get_base_configs()
@base_configs
|> Keyword.replace(:name, :other_name)
|> Keyword.replace(:database, "other_database")

Expand All @@ -127,8 +138,7 @@ defmodule WalEx.ConfigTest do

describe "add_config/3" do
setup do
{:ok, pid} = Config.start_link(configs: get_base_configs())
assert is_pid(pid)
{:ok, _pid} = Config.start_link(configs: @base_configs)
:ok
end

Expand All @@ -152,8 +162,7 @@ defmodule WalEx.ConfigTest do

describe "remove_config/3" do
setup do
{:ok, pid} = Config.start_link(configs: get_base_configs())
assert is_pid(pid)
{:ok, _pid} = Config.start_link(configs: @base_configs)
:ok
end

Expand All @@ -175,8 +184,7 @@ defmodule WalEx.ConfigTest do

describe "replace_config/3" do
setup do
{:ok, pid} = Config.start_link(configs: get_base_configs())
assert is_pid(pid)
{:ok, _pid} = Config.start_link(configs: @base_configs)
:ok
end

Expand All @@ -191,8 +199,7 @@ defmodule WalEx.ConfigTest do

describe "build_module_names/3" do
setup do
{:ok, pid} = Config.start_link(configs: get_base_configs())
assert is_pid(pid)
{:ok, _pid} = Config.start_link(configs: @base_configs)
:ok
end

Expand Down Expand Up @@ -220,8 +227,7 @@ defmodule WalEx.ConfigTest do

describe "to_module_name/1" do
setup do
{:ok, pid} = Config.start_link(configs: get_base_configs())
assert is_pid(pid)
{:ok, _pid} = Config.start_link(configs: @base_configs)
:ok
end

Expand All @@ -237,28 +243,4 @@ defmodule WalEx.ConfigTest do
assert "TestName" == Config.to_module_name(:"Elixir.TestName")
end
end

defp get_base_configs(keys \\ []) do
configs = [
name: :test_name,
hostname: "hostname",
username: "username",
password: "password",
database: "database",
port: 5432,
subscriptions: ["subscriptions"],
publication: "publication",
modules: [MyApp.CustomModule],
ssl: false,
ssl_opts: [verify: :verify_none]
]

case keys do
[] ->
configs

_keys ->
Keyword.take(configs, keys)
end
end
end
13 changes: 3 additions & 10 deletions test/walex/config/registry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule WalEx.Config.RegistryTest do

describe "set_name/3" do
setup do
assert {:ok, _pid} = WalExRegistry.start_registry()
{:ok, _pid} = WalExRegistry.start_registry()
:ok
end

Expand All @@ -35,21 +35,14 @@ defmodule WalEx.Config.RegistryTest do

describe "get_state/3" do
setup do
assert {:ok, _pid} = WalExRegistry.start_registry()
{:ok, _pid} = WalExRegistry.start_registry()
:ok
end

test "should set agent state" do
name = WalExRegistry.set_name(:set_agent, __MODULE__, :app_name_test)

configs = [
my_atom: :test_config,
my_number: 99,
my_string: "test config",
my_map: %{john: :doe},
my_list: [1, 9, 9, 4]
]

configs = []
Agent.start_link(fn -> configs end, name: name)

assert configs == WalExRegistry.get_state(:get_agent, __MODULE__, :app_name_test)
Expand Down
35 changes: 16 additions & 19 deletions test/walex/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,34 @@ defmodule WalEx.DatabaseTest do
@password "postgres"
@database "todos_test"

@base_configs [
name: :todos,
hostname: @hostname,
username: @username,
password: @password,
database: @database,
port: 5432,
subscriptions: ["user", "todo"],
publication: "events"
]

describe "logical replication" do
setup do
{:ok, pid} = start_database()
{:ok, database_pid} = start_database()

%{pid: pid}
%{database_pid: database_pid}
end

test "should have logical replication set up", %{pid: pid} do
test "should have logical replication set up", %{database_pid: pid} do
show_wall_level = "SHOW wal_level;"

assert is_pid(pid)
assert [%{"wal_level" => "logical"}] == query(pid, show_wall_level)
end

test "should start replication slot", %{pid: database_pid} do
assert {:ok, replication_pid} = WalExSupervisor.start_link(get_configs())
test "should start replication slot", %{database_pid: database_pid} do
assert {:ok, replication_pid} = WalExSupervisor.start_link(@base_configs)
assert is_pid(replication_pid)
assert is_pid(database_pid)

pg_replication_slots = "SELECT slot_name, slot_type, active FROM \"pg_replication_slots\";"

Expand All @@ -38,19 +48,6 @@ defmodule WalEx.DatabaseTest do
end
end

def get_configs do
[
name: :todos,
hostname: @hostname,
username: @username,
password: @password,
database: @database,
port: 5432,
subscriptions: ["user", "todo"],
publication: "events"
]
end

def start_database do
Postgrex.start_link(
hostname: @hostname,
Expand Down
1 change: 0 additions & 1 deletion test/walex/decoder/decoder_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ defmodule WalEx.DecoderTest do
]
}

# Adding assertion for "numeric" types, which was missing from the original implementation
assert WalEx.Decoder.decode_message(
<<82, 0, 0, 71, 92, 112, 117, 98, 108, 105, 99, 0, 116, 101, 109, 112, 0, 100, 0, 1,
0, 116, 101, 115, 116, 0, 0, 0, 6, 164, 255, 255, 255, 255>>
Expand Down
Loading

0 comments on commit 3bc2e32

Please sign in to comment.