Skip to content

Commit

Permalink
NAS-131224 / 25.04 / Fix long-standing alert issue on HA (#14526)
Browse files Browse the repository at this point in the history
* simplify and optimize plugins/alert.py::__run_source

* fix alerts

* add __run_other_node_alert_source

* address review
  • Loading branch information
yocalebo committed Sep 17, 2024
1 parent f171de7 commit 961b26d
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 90 deletions.
4 changes: 1 addition & 3 deletions src/middlewared/middlewared/alert/source/failover.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ class FailoverAlertSource(AlertSource):
run_on_backup_node = False

async def check(self):
if not await self.middleware.call('failover.licensed'):
return []
elif not await self.middleware.call('failover.internal_interfaces'):
if not await self.middleware.call('failover.internal_interfaces'):
return [Alert(FailoverInterfaceNotFoundAlertClass)]

try:
Expand Down
3 changes: 1 addition & 2 deletions src/middlewared/middlewared/alert/source/failover_disks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class FailoverDisksAlertSource(AlertSource):
run_on_backup_node = False

async def check(self):
licensed = await self.middleware.call('failover.licensed')
if licensed and (md := await self.middleware.call('failover.mismatch_disks')):
if (md := await self.middleware.call('failover.mismatch_disks')):
if md['missing_remote']:
return [Alert(
DisksAreNotPresentOnStandbyNodeAlertClass, {'serials': ', '.join(md['missing_remote'])}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ class FailoverCriticalAlertSource(AlertSource):
run_on_backup_node = False

async def check(self):
licensed = await self.middleware.call('failover.licensed')
if licensed and not await self.middleware.call('interface.query', [('failover_critical', '=', True)]):
if not await self.middleware.call('interface.query', [('failover_critical', '=', True)]):
return [Alert(NoCriticalFailoverInterfaceFoundAlertClass)]
else:
return []
3 changes: 1 addition & 2 deletions src/middlewared/middlewared/alert/source/failover_nics.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class FailoverNetworkCardsAlertSource(AlertSource):
run_on_backup_node = False

async def check(self):
licensed = await self.middleware.call('failover.licensed')
if licensed and (interfaces := await self.middleware.call('failover.mismatch_nics')):
if (interfaces := await self.middleware.call('failover.mismatch_nics')):
if interfaces['missing_remote']:
return [Alert(
NetworkCardsMismatchOnStandbyNodeAlertClass, {'interfaces': ', '.join(interfaces['missing_remote'])}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ def __init__(self, middleware):
self.incident_id = None

async def check(self):
if not await self.middleware.call('failover.licensed'):
return []

try:
await self.middleware.call('failover.call_remote', 'core.ping', [], {'timeout': 2})
except Exception:
Expand Down
173 changes: 95 additions & 78 deletions src/middlewared/middlewared/plugins/alert.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from dataclasses import dataclass
from collections import defaultdict, namedtuple
import copy
from datetime import datetime, timezone
import errno
from itertools import zip_longest
import os
import textwrap
import time
Expand Down Expand Up @@ -38,17 +40,23 @@
from middlewared.utils.plugins import load_modules, load_classes
from middlewared.utils.python import get_middlewared_dir
from middlewared.utils.time_utils import utc_now

from middlewared.plugins.failover_.remote import NETWORK_ERRORS

POLICIES = ["IMMEDIATELY", "HOURLY", "DAILY", "NEVER"]
DEFAULT_POLICY = "IMMEDIATELY"

ALERT_SOURCES = {}
ALERT_SERVICES_FACTORIES = {}
SEND_ALERTS_ON_READY = False

AlertSourceLock = namedtuple("AlertSourceLock", ["source_name", "expires_at"])

SEND_ALERTS_ON_READY = False

@dataclass(slots=True, frozen=True, kw_only=True)
class AlertFailoverInfo:
this_node: str
other_node: str
run_on_backup_node: bool
run_failover_related: bool


class AlertModel(sa.Model):
Expand Down Expand Up @@ -655,31 +663,80 @@ async def __should_run_or_send_alerts(self):

return True

async def __run_alerts(self):
master_node = "A"
backup_node = "B"
product_type = await self.middleware.call("alert.product_type")
run_on_backup_node = False
run_failover_related = False
if product_type == "SCALE_ENTERPRISE":
if await self.middleware.call("failover.licensed"):
if await self.middleware.call("failover.node") == "B":
master_node = "B"
backup_node = "A"
async def __get_failover_info(self):
this_node, other_node = "A", "B"
run_on_backup_node = run_failover_related = False
run_failover_related = await self.middleware.call("failover.licensed")
if run_failover_related:
if await self.middleware.call("failover.node") != "A":
this_node, other_node = "B", "A"

run_failover_related = time.monotonic() > self.blocked_failover_alerts_until
if run_failover_related:
try:
remote_version = await self.middleware.call("failover.call_remote", "system.version")
remote_system_state = await self.middleware.call("failover.call_remote", "system.state")
remote_failover_status = await self.middleware.call("failover.call_remote",
"failover.status")
args = ([], {"connect_timeout": 2})
rem_ver = await self.middleware.call("failover.call_remote", "system.version", *args)
rem_state = await self.middleware.call("failover.call_remote", "system.state", *args)
rem_fstat = await self.middleware.call("failover.call_remote", "failover.status", *args)
except Exception:
pass
else:
if remote_version == await self.middleware.call("system.version"):
if remote_system_state == "READY" and remote_failover_status == "BACKUP":
run_on_backup_node = True
run_on_backup_node = all((
await self.middleware.call("system.version") == rem_ver,
rem_state == "READY",
rem_fstat == "BACKUP",
))

return AlertFailoverInfo(
this_node=this_node,
other_node=other_node,
run_on_backup_node=run_on_backup_node,
run_failover_related=run_failover_related
)

run_failover_related = time.monotonic() > self.blocked_failover_alerts_until
async def __handle_locked_alert_source(self, name, this_node, other_node):
this_node_alerts, other_node_alerts = [], []
locked = self.blocked_sources[name]
if locked:
self.logger.debug("Not running alert source %r because it is blocked", name)
for i in filter(lambda x: x.source == name, self.alerts):
if i.node == this_node:
this_node_alerts.append(i)
elif i.node == other_node:
other_node_alerts.append(i)
return this_node_alerts, other_node_alerts, locked

async def __run_other_node_alert_source(self, name):
keys = ("args", "datetime", "last_occurrence", "dismissed", "mail",)
other_node_alerts = []
try:
try:
for alert in await self.middleware.call("failover.call_remote", "alert.run_source", [name]):
other_node_alerts.append([
Alert(**dict(
{k: v for k, v in alert.items() if k in keys},
klass=AlertClass.class_by_name[alert["klass"]],
_source=alert["source"],
_key=alert["key"]
))
])
except CallError as e:
if e.errno not in NETWORK_ERRORS + (CallError.EALERTCHECKERUNAVAILABLE,):
raise
except ReserveFDException:
self.logger.debug('Failed to reserve a privileged port')
except Exception as e:
other_node_alerts = [Alert(
AlertSourceRunFailedOnBackupNodeAlertClass,
args={"source_name": name, "traceback": str(e)},
_source=name
)]

return other_node_alerts

async def __run_alerts(self):
product_type = await self.middleware.call("alert.product_type")
fi = await self.__get_failover_info()
for k, source_lock in list(self.sources_locks.items()):
if source_lock.expires_at <= time.monotonic():
await self.unblock_source(k)
Expand All @@ -688,77 +745,37 @@ async def __run_alerts(self):
if product_type not in alert_source.products:
continue

if alert_source.failover_related and not run_failover_related:
if alert_source.failover_related and not fi.run_failover_related:
continue

if not alert_source.schedule.should_run(utc_now(), self.alert_source_last_run[alert_source.name]):
continue

self.alert_source_last_run[alert_source.name] = utc_now()

alerts_a = [alert
for alert in self.alerts
if alert.node == master_node and alert.source == alert_source.name]
locked = False
if self.blocked_sources[alert_source.name]:
self.logger.debug("Not running alert source %r because it is blocked", alert_source.name)
locked = True
else:
this_node_alerts, other_node_alerts, locked = await self.__handle_locked_alert_source(
alert_source.name, fi.this_node, fi.other_node
)
if not locked:
self.logger.trace("Running alert source: %r", alert_source.name)

try:
alerts_a = await self.__run_source(alert_source.name)
this_node_alerts = await self.__run_source(alert_source.name)
except UnavailableException:
pass
for alert in alerts_a:
alert.node = master_node

alerts_b = []
if run_on_backup_node and alert_source.run_on_backup_node:
try:
alerts_b = [alert
for alert in self.alerts
if alert.node == backup_node and alert.source == alert_source.name]
try:
if not locked:
alerts_b = await self.middleware.call("failover.call_remote", "alert.run_source",
[alert_source.name])

alerts_b = [Alert(**dict({k: v for k, v in alert.items()
if k in ["args", "datetime", "last_occurrence", "dismissed",
"mail"]},
klass=AlertClass.class_by_name[alert["klass"]],
_source=alert["source"],
_key=alert["key"]))
for alert in alerts_b]
except CallError as e:
if e.errno in [errno.ECONNABORTED, errno.ECONNREFUSED, errno.ECONNRESET, errno.EHOSTDOWN,
errno.ETIMEDOUT, CallError.EALERTCHECKERUNAVAILABLE]:
pass
else:
raise
except ReserveFDException:
self.logger.debug('Failed to reserve a privileged port')
except Exception as e:
alerts_b = [
Alert(AlertSourceRunFailedOnBackupNodeAlertClass,
args={
"source_name": alert_source.name,
"traceback": str(e),
},
_source=alert_source.name)
]

for alert in alerts_b:
alert.node = backup_node
if fi.run_on_backup_node and alert_source.run_on_backup_node:
other_node_alerts = await self.__run_other_node_alert_source(alert_source.name)

for alert in alerts_a + alerts_b:
self.__handle_alert(alert)
for talert, oalert in zip_longest(this_node_alerts, other_node_alerts, fillvalue=None):
if talert is not None:
talert.node = fi.this_node
self.__handle_alert(talert)
if oalert is not None:
oalert.node = fi.other_node
self.__handle_alert(oalert)

self.alerts = (
[a for a in self.alerts if a.source != alert_source.name] +
alerts_a +
alerts_b
[a for a in self.alerts if a.source != alert_source.name] + this_node_alerts + other_node_alerts
)

def __handle_alert(self, alert):
Expand Down

0 comments on commit 961b26d

Please sign in to comment.