Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-4114] Test: Scale to zero units #347

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9d7bfed
deployment test "zero-units"
BalabaDmitri Feb 6, 2024
938035f
deployment test "zero-units"
BalabaDmitri Mar 4, 2024
7dc328b
Zero-units: continuous writes ON, deploy 3 units, check, scale to 0 u…
BalabaDmitri Mar 12, 2024
b762ec8
Zero-units: continuous writes ON, deploy 3 units, check, scale to 0 u…
BalabaDmitri Mar 12, 2024
0ca9740
Zero-units: continuous writes ON, deploy 3 units, check, scale to 0 u…
BalabaDmitri Mar 12, 2024
04bc51c
run format & lint
BalabaDmitri Mar 12, 2024
171b53f
reduce time out
BalabaDmitri Mar 12, 2024
27c97f4
merge from remote main
BalabaDmitri Mar 12, 2024
8382d0d
remove replication storage list
BalabaDmitri Mar 12, 2024
d467d8c
checking after scale to 2 and checking after scale up to 3
BalabaDmitri Mar 12, 2024
526357b
checking after scale to 2 and checking after scale up to 3
BalabaDmitri Mar 12, 2024
4b64ce9
checking after scale to 2 and checking after scale up to 3
BalabaDmitri Mar 12, 2024
927ad24
run format & lint
BalabaDmitri Mar 12, 2024
a18b1d3
handle error: storage belongs to different cluster
BalabaDmitri Apr 3, 2024
18211ed
handle error: storage belongs to different cluster
BalabaDmitri Apr 4, 2024
d917d88
handling different versions of Postgres of unit
BalabaDmitri Apr 12, 2024
0a0486f
fix unit fixed setting postgresql version into app_peer_data
BalabaDmitri Apr 17, 2024
ab160f3
merge canonical/main
BalabaDmitri Apr 18, 2024
263a1ef
format
Apr 18, 2024
a1b24dd
fix record of postgres version in databags
BalabaDmitri Apr 27, 2024
19574bd
Merge remote-tracking branch 'canorigin/main' into deployment-zero-units
BalabaDmitri Apr 29, 2024
6873326
format & lint
BalabaDmitri Apr 29, 2024
6716eaf
merge canonical/postgresql-operator
BalabaDmitri May 7, 2024
41bfc2f
Merge remote-tracking branch 'canorigin/main' into deployment-zero-units
BalabaDmitri Jun 10, 2024
2b7db14
checking blocked status based using blocking message
BalabaDmitri Jun 11, 2024
ef84bf6
Merge branch 'main' of https://github.com/canonical/postgresql-operat…
BalabaDmitri Jun 11, 2024
e670781
Merge branch 'canonical_main' into deployment-zero-units
BalabaDmitri Jun 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
TLS_CERT_FILE,
TLS_KEY_FILE,
UNIT_SCOPE,
UPGRADE_RELATION,
USER,
USER_PASSWORD_KEY,
)
Expand All @@ -96,6 +97,7 @@

PRIMARY_NOT_REACHABLE_MESSAGE = "waiting for primary to be reachable from this unit"
EXTENSIONS_DEPENDENCY_MESSAGE = "Unsatisfied plugin dependencies. Please check the logs"
DIFFERENT_VERSIONS_PSQL_BLOCKING_MESSAGE = "Please select the correct version of postgresql to use. You cannot use different versions of postgresql!"

Scopes = Literal[APP_SCOPE, UNIT_SCOPE]

Expand Down Expand Up @@ -144,6 +146,9 @@ def __init__(self, *args):
self.framework.observe(self.on.config_changed, self._on_config_changed)
self.framework.observe(self.on.get_primary_action, self._on_get_primary)
self.framework.observe(self.on[PEER].relation_changed, self._on_peer_relation_changed)
self.framework.observe(
self.on[UPGRADE_RELATION].relation_changed, self._on_upgrade_relation_changed
)
self.framework.observe(self.on.secret_changed, self._on_peer_relation_changed)
self.framework.observe(self.on[PEER].relation_departed, self._on_peer_relation_departed)
self.framework.observe(self.on.pgdata_storage_detaching, self._on_pgdata_storage_detaching)
Expand Down Expand Up @@ -497,6 +502,11 @@ def _on_peer_relation_changed(self, event: HookEvent):
try:
# Update the members of the cluster in the Patroni configuration on this unit.
self.update_config()
if self._patroni.cluster_system_id_mismatch(unit_name=self.unit.name):
self.unit.status = BlockedStatus(
"Failed to start postgresql. The storage belongs to a third-party cluster"
)
return
except RetryError:
self.unit.status = BlockedStatus("failed to update cluster members on member")
return
Expand Down Expand Up @@ -537,6 +547,19 @@ def _on_peer_relation_changed(self, event: HookEvent):

self._update_new_unit_status()

self._validate_database_version()

def _on_upgrade_relation_changed(self, event: HookEvent):
if not self.unit.is_leader():
return

if self.upgrade.idle:
logger.debug("Defer _on_upgrade_relation_changed: upgrade in progress")
event.defer()
return

self._set_workload_version(self._patroni.get_postgresql_version())

# Split off into separate function, because of complexity _on_peer_relation_changed
def _start_stop_pgbackrest_service(self, event: HookEvent) -> None:
# Start or stop the pgBackRest TLS server service when TLS certificate change.
Expand Down Expand Up @@ -1034,7 +1057,7 @@ def _on_start(self, event: StartEvent) -> None:

self.unit_peer_data.update({"ip": self.get_hostname_by_unit(None)})

self.unit.set_workload_version(self._patroni.get_postgresql_version())
self._set_workload_version(self._patroni.get_postgresql_version())

# Open port
try:
Expand Down Expand Up @@ -1640,6 +1663,26 @@ def client_relations(self) -> List[Relation]:
relations.append(relation)
return relations

def _set_workload_version(self, psql_version):
"""Record the version of the software running as the workload. Also writes the version into the databags."""
self.unit.set_workload_version(psql_version)
if self.unit.is_leader():
self.app_peer_data.update({"database-version": psql_version})

def _validate_database_version(self):
"""Checking that only one version of Postgres is used."""
peer_db_version = self.app_peer_data.get("database-version")

if self.unit.is_leader() and peer_db_version is None:
_psql_version = self._patroni.get_postgresql_version()
if _psql_version is not None:
self.app_peer_data.update({"database-version": _psql_version})
return

if peer_db_version != self._patroni.get_postgresql_version():
self.unit.status = BlockedStatus(DIFFERENT_VERSIONS_PSQL_BLOCKING_MESSAGE)
return


if __name__ == "__main__":
main(PostgresqlOperatorCharm)
37 changes: 37 additions & 0 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

"""Helper class used to manage cluster lifecycle."""

import glob
import logging
import os
import pwd
Expand Down Expand Up @@ -713,3 +714,39 @@ def update_synchronous_node_count(self, units: int = None) -> None:
# Check whether the update was unsuccessful.
if r.status_code != 200:
raise UpdateSyncNodeCountError(f"received {r.status_code}")

def cluster_system_id_mismatch(self, unit_name: str) -> bool:
"""Check if the Patroni service is down.

If there is the error storage belongs to third-party cluster in its logs.

Returns:
"True" if an error occurred due to the fact that the storage belongs to someone else's cluster.
"""
last_log_file = self._last_patroni_log_file()
unit_name = unit_name.replace("/", "-")
if (
f" CRITICAL: system ID mismatch, node {unit_name} belongs to a different cluster:"
in last_log_file
):
return True
return False

def _last_patroni_log_file(self) -> str:
"""Get last log file content of Patroni service.

If there is no available log files, empty line will be returned.

Returns:
Content of last log file of Patroni service.
"""
log_files = glob.glob(f"{PATRONI_LOGS_PATH}/*.log")
if len(log_files) == 0:
return ""
latest_file = max(log_files, key=os.path.getmtime)
try:
with open(latest_file) as last_log_file:
return last_log_file.read()
except OSError as e:
logger.exception("Failed to read last patroni log file", exc_info=e)
return ""
1 change: 1 addition & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
LEGACY_DB = "db"
LEGACY_DB_ADMIN = "db-admin"
PEER = "database-peers"
UPGRADE_RELATION = "upgrade"
ALL_CLIENT_RELATIONS = [DATABASE, LEGACY_DB, LEGACY_DB_ADMIN]
ALL_LEGACY_RELATIONS = [LEGACY_DB, LEGACY_DB_ADMIN]
API_REQUEST_TIMEOUT = 5
Expand Down
107 changes: 105 additions & 2 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
PATRONI_SERVICE_DEFAULT_PATH = f"/etc/systemd/system/{SERVICE_NAME}"
RESTART_CONDITION = "no"
ORIGINAL_RESTART_CONDITION = "always"
SECOND_APPLICATION = "second-cluster"


class MemberNotListedOnClusterError(Exception):
Expand Down Expand Up @@ -897,11 +898,20 @@ def storage_id(ops_test, unit_name):
return line.split()[1]


async def add_unit_with_storage(ops_test, app, storage):
async def add_unit_with_storage(
ops_test, app, storage, is_blocked: bool = False, blocked_message: str = ""
):
"""Adds unit with storage.

Note: this function exists as a temporary solution until this issue is resolved:
https://github.com/juju/python-libjuju/issues/695

Args:
ops_test: The ops test framework instance
app: The name of the application
storage: Unique storage identifier
is_blocked: Checking blocked status
blocked_message: Check message in blocked status
"""
expected_units = len(ops_test.model.applications[app].units) + 1
prev_units = [unit.name for unit in ops_test.model.applications[app].units]
Expand All @@ -910,7 +920,22 @@ async def add_unit_with_storage(ops_test, app, storage):
return_code, _, _ = await ops_test.juju(*add_unit_cmd)
assert return_code == 0, "Failed to add unit with storage"
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=2000)
if is_blocked:
assert (
is_blocked and blocked_message != ""
), "The blocked status check should be checked along with the message"
application = ops_test.model.applications[app]
await ops_test.model.block_until(
lambda: any(
unit.workload_status == "blocked"
and unit.workload_status_message == blocked_message
for unit in application.units
),
# "blocked" in {unit.workload_status for unit in application.units},
timeout=1500,
)
else:
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1500)
assert (
len(ops_test.model.applications[app].units) == expected_units
), "New unit not added to model"
Expand Down Expand Up @@ -954,6 +979,84 @@ async def reused_full_cluster_recovery_storage(ops_test: OpsTest, unit_name) ->
return True


async def get_db_connection(ops_test, dbname, is_primary=True, replica_unit_name=""):
"""Returns a PostgreSQL connection string.

Args:
ops_test: The ops test framework instance
dbname: The name of the database
is_primary: Whether to use a primary unit (default is True, so it uses the primary
replica_unit_name: The name of the replica unit

Returns:
a PostgreSQL connection string
"""
unit_name = await get_primary(ops_test, APP_NAME)
password = await get_password(ops_test, APP_NAME)
address = get_unit_address(ops_test, unit_name)
if not is_primary and replica_unit_name != "":
unit_name = replica_unit_name
address = ops_test.model.applications[APP_NAME].units[unit_name].public_address
connection_string = (
f"dbname='{dbname}' user='operator'"
f" host='{address}' password='{password}' connect_timeout=10"
)
return connection_string, unit_name


async def validate_test_data(connection_string):
"""Checking test data.

Args:
connection_string: Database connection string
"""
with psycopg2.connect(connection_string) as connection:
connection.autocommit = True
with connection.cursor() as cursor:
cursor.execute("SELECT data FROM test;")
data = cursor.fetchone()
assert data[0] == "some data"
connection.close()


async def create_test_data(connection_string):
"""Creating test data in the database.

Args:
connection_string: Database connection string
"""
with psycopg2.connect(connection_string) as connection:
connection.autocommit = True
with connection.cursor() as cursor:
# Check that it's possible to write and read data from the database that
# was created for the application.
cursor.execute("DROP TABLE IF EXISTS test;")
cursor.execute("CREATE TABLE test(data TEXT);")
cursor.execute("INSERT INTO test(data) VALUES('some data');")
cursor.execute("SELECT data FROM test;")
data = cursor.fetchone()
assert data[0] == "some data"
connection.close()


async def get_last_added_unit(ops_test, app, prev_units):
"""Returns a unit.

Args:
ops_test: The ops test framework instance
app: The name of the application
prev_units: List of unit names before adding the last unit

Returns:
last added unit
"""
curr_units = [unit.name for unit in ops_test.model.applications[app].units]
new_unit = list(set(curr_units) - set(prev_units))[0]
for unit in ops_test.model.applications[app].units:
if new_unit == unit.name:
return unit


async def is_storage_exists(ops_test: OpsTest, storage_id: str) -> bool:
"""Returns True if storage exists by provided storage ID."""
complete_command = [
Expand Down
36 changes: 18 additions & 18 deletions tests/integration/ha_tests/test_restore_cluster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
import asyncio
import logging

import pytest
Expand All @@ -16,13 +17,13 @@
set_password,
)
from .helpers import (
SECOND_APPLICATION,
add_unit_with_storage,
reused_full_cluster_recovery_storage,
storage_id,
)

FIRST_APPLICATION = "first-cluster"
SECOND_APPLICATION = "second-cluster"

logger = logging.getLogger(__name__)

Expand All @@ -37,24 +38,23 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None:
charm = await ops_test.build_charm(".")
async with ops_test.fast_forward():
# Deploy the first cluster with reusable storage
await ops_test.model.deploy(
charm,
application_name=FIRST_APPLICATION,
num_units=3,
series=CHARM_SERIES,
storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}},
config={"profile": "testing"},
await asyncio.gather(
ops_test.model.deploy(
charm,
application_name=FIRST_APPLICATION,
num_units=3,
series=CHARM_SERIES,
storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}},
config={"profile": "testing"},
),
ops_test.model.deploy(
charm,
application_name=SECOND_APPLICATION,
num_units=1,
series=CHARM_SERIES,
config={"profile": "testing"},
),
)

# Deploy the second cluster
await ops_test.model.deploy(
charm,
application_name=SECOND_APPLICATION,
num_units=1,
series=CHARM_SERIES,
config={"profile": "testing"},
)

await ops_test.model.wait_for_idle(status="active", timeout=1500)

# TODO have a better way to bootstrap clusters with existing storage
Expand Down
Loading
Loading