Skip to content

Commit

Permalink
[DPE-2955] Cross-region async replication integration tests (#453)
Browse files Browse the repository at this point in the history
* Add async replication implementation

Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>

* Add async replication integration tests

Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>

* Add test for scaling

Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>

* Backup standby pgdata folder

Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>

* Fix OS call

Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>

* Fix unit tests

Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>

* Improve comments and logs

Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>

* Fix juju3 markers

Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>

* Revert permission change

Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>

* Add optional type hint

Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>

* Add relation name to secret label and revert poetry.lock

Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>

* Reload Patroni configuration when member is not ready yet

Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>

---------

Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>
  • Loading branch information
marceloneppel committed May 3, 2024
1 parent 4310dc9 commit 2c25228
Show file tree
Hide file tree
Showing 3 changed files with 713 additions and 51 deletions.
166 changes: 127 additions & 39 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import psycopg2
import requests
import yaml
from juju.model import Model
from pytest_operator.plugin import OpsTest
from tenacity import (
RetryError,
Expand Down Expand Up @@ -86,32 +87,47 @@ async def are_all_db_processes_down(ops_test: OpsTest, process: str) -> bool:


async def are_writes_increasing(
ops_test, down_unit: str = None, use_ip_from_inside: bool = False
ops_test, down_unit: str = None, use_ip_from_inside: bool = False, extra_model: Model = None
) -> None:
"""Verify new writes are continuing by counting the number of writes."""
writes, _ = await count_writes(
ops_test, down_unit=down_unit, use_ip_from_inside=use_ip_from_inside
ops_test,
down_unit=down_unit,
use_ip_from_inside=use_ip_from_inside,
extra_model=extra_model,
)
for member, count in writes.items():
for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)):
with attempt:
more_writes, _ = await count_writes(
ops_test, down_unit=down_unit, use_ip_from_inside=use_ip_from_inside
ops_test,
down_unit=down_unit,
use_ip_from_inside=use_ip_from_inside,
extra_model=extra_model,
)
assert more_writes[member] > count, f"{member}: writes not continuing to DB"
assert (
more_writes[member] > count
), f"{member}: writes not continuing to DB (current writes: {more_writes[member]} - previous writes: {count})"


async def app_name(ops_test: OpsTest, application_name: str = "postgresql") -> Optional[str]:
async def app_name(
ops_test: OpsTest, application_name: str = "postgresql", model: Model = None
) -> Optional[str]:
"""Returns the name of the cluster running PostgreSQL.
This is important since not all deployments of the PostgreSQL charm have the application name
"postgresql".
Note: if multiple clusters are running PostgreSQL this will return the one first found.
"""
status = await ops_test.model.get_status()
for app in ops_test.model.applications:
if application_name in status["applications"][app]["charm"]:
if model is None:
model = ops_test.model
status = await model.get_status()
for app in model.applications:
if (
application_name in status["applications"][app]["charm"]
and APPLICATION_NAME not in status["applications"][app]["charm"]
):
return app

return None
Expand Down Expand Up @@ -207,13 +223,18 @@ async def is_cluster_updated(
), "secondary not up to date with the cluster after restarting."


async def check_writes(ops_test, use_ip_from_inside: bool = False) -> int:
async def check_writes(
ops_test, use_ip_from_inside: bool = False, extra_model: Model = None
) -> int:
"""Gets the total writes from the test charm and compares to the writes from db."""
total_expected_writes = await stop_continuous_writes(ops_test)
actual_writes, max_number_written = await count_writes(
ops_test, use_ip_from_inside=use_ip_from_inside
ops_test, use_ip_from_inside=use_ip_from_inside, extra_model=extra_model
)
for member, count in actual_writes.items():
print(
f"member: {member}, count: {count}, max_number_written: {max_number_written[member]}, total_expected_writes: {total_expected_writes}"
)
assert (
count == max_number_written[member]
), f"{member}: writes to the db were missed: count of actual writes different from the max number written."
Expand All @@ -222,30 +243,44 @@ async def check_writes(ops_test, use_ip_from_inside: bool = False) -> int:


async def count_writes(
ops_test: OpsTest, down_unit: str = None, use_ip_from_inside: bool = False
ops_test: OpsTest,
down_unit: str = None,
use_ip_from_inside: bool = False,
extra_model: Model = None,
) -> Tuple[Dict[str, int], Dict[str, int]]:
"""Count the number of writes in the database."""
app = await app_name(ops_test)
password = await get_password(ops_test, app, down_unit)
for unit in ops_test.model.applications[app].units:
if unit.name != down_unit:
cluster = get_patroni_cluster(
await (
get_ip_from_inside_the_unit(ops_test, unit.name)
if use_ip_from_inside
else get_unit_ip(ops_test, unit.name)
)
)
break
members = []
for model in [ops_test.model, extra_model]:
if model is None:
continue
for unit in model.applications[app].units:
if unit.name != down_unit:
members_data = get_patroni_cluster(
await (
get_ip_from_inside_the_unit(ops_test, unit.name)
if use_ip_from_inside
else get_unit_ip(ops_test, unit.name)
)
)["members"]
for index, member_data in enumerate(members_data):
members_data[index]["model"] = model.info.name
members.extend(members_data)
break
down_ips = []
if down_unit:
for unit in ops_test.model.applications[app].units:
if unit.name == down_unit:
down_ips.append(unit.public_address)
down_ips.append(await get_unit_ip(ops_test, unit.name))
return count_writes_on_members(members, password, down_ips)


def count_writes_on_members(members, password, down_ips) -> Tuple[Dict[str, int], Dict[str, int]]:
count = {}
maximum = {}
for member in cluster["members"]:
for member in members:
if member["role"] != "replica" and member["host"] not in down_ips:
host = member["host"]

Expand All @@ -254,12 +289,23 @@ async def count_writes(
f" host='{host}' password='{password}' connect_timeout=10"
)

with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor:
cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;")
results = cursor.fetchone()
count[member["name"]] = results[0]
maximum[member["name"]] = results[1]
connection.close()
member_name = f'{member["model"]}.{member["name"]}'
connection = None
try:
with psycopg2.connect(
connection_string
) as connection, connection.cursor() as cursor:
cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;")
results = cursor.fetchone()
count[member_name] = results[0]
maximum[member_name] = results[1]
except psycopg2.Error:
# Error raised when the connection is not possible.
count[member_name] = -1
maximum[member_name] = -1
finally:
if connection is not None:
connection.close()
return count, maximum


Expand Down Expand Up @@ -401,6 +447,42 @@ def get_random_unit(ops_test: OpsTest, app: str) -> str:
return random.choice(ops_test.model.applications[app].units).name


async def get_standby_leader(model: Model, application_name: str) -> str:
"""Get the standby leader name.
Args:
model: the model instance.
application_name: the name of the application to get the value for.
Returns:
the name of the standby leader.
"""
first_unit_ip = model.applications[application_name].units[0].public_address
cluster = get_patroni_cluster(first_unit_ip)
for member in cluster["members"]:
if member["role"] == "standby_leader":
return member["name"]


async def get_sync_standby(ops_test: OpsTest, model: Model, application_name: str) -> str:
"""Get the sync_standby name.
Args:
ops_test: the ops test instance.
model: the model instance.
application_name: the name of the application to get the value for.
Returns:
the name of the sync standby.
"""
any_unit = model.applications[application_name].units[0].name
first_unit_ip = await get_unit_ip(ops_test, any_unit, model)
cluster = get_patroni_cluster(first_unit_ip)
for member in cluster["members"]:
if member["role"] == "sync_standby":
return member["name"]


async def get_password(ops_test: OpsTest, app: str, down_unit: str = None) -> str:
"""Use the charm action to retrieve the password from provided application.
Expand All @@ -417,20 +499,24 @@ async def get_password(ops_test: OpsTest, app: str, down_unit: str = None) -> st
return action.results["password"]


async def get_unit_ip(ops_test: OpsTest, unit_name: str) -> str:
async def get_unit_ip(ops_test: OpsTest, unit_name: str, model: Model = None) -> str:
"""Wrapper for getting unit ip.
Args:
ops_test: The ops test object passed into every test case
unit_name: The name of the unit to get the address
model: Optional model instance to use
Returns:
The (str) ip of the unit
"""
application = unit_name.split("/")[0]
for unit in ops_test.model.applications[application].units:
if unit.name == unit_name:
break
return await instance_ip(ops_test, unit.machine.hostname)
if model is None:
application = unit_name.split("/")[0]
for unit in ops_test.model.applications[application].units:
if unit.name == unit_name:
break
return await instance_ip(ops_test, unit.machine.hostname)
else:
return get_unit_address(ops_test, unit_name)


@retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True)
Expand Down Expand Up @@ -673,24 +759,26 @@ async def is_secondary_up_to_date(
return True


async def start_continuous_writes(ops_test: OpsTest, app: str) -> None:
async def start_continuous_writes(ops_test: OpsTest, app: str, model: Model = None) -> None:
"""Start continuous writes to PostgreSQL."""
# Start the process by relating the application to the database or
# by calling the action if the relation already exists.
if model is None:
model = ops_test.model
relations = [
relation
for relation in ops_test.model.applications[app].relations
for relation in model.applications[app].relations
if not relation.is_peer
and f"{relation.requires.application_name}:{relation.requires.name}"
== f"{APPLICATION_NAME}:first-database"
]
if not relations:
await ops_test.model.relate(app, f"{APPLICATION_NAME}:first-database")
await ops_test.model.wait_for_idle(status="active", timeout=1000)
await model.relate(app, f"{APPLICATION_NAME}:first-database")
await model.wait_for_idle(status="active", timeout=1000)
for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True):
with attempt:
action = (
await ops_test.model.applications[APPLICATION_NAME]
await model.applications[APPLICATION_NAME]
.units[0]
.run_action("start-continuous-writes")
)
Expand Down
Loading

0 comments on commit 2c25228

Please sign in to comment.