Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert internal pusher dicts to attrs #8940

Merged
merged 8 commits into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8940.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add type hints to push module.
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ files =
synapse/state,
synapse/storage/databases/main/appservice.py,
synapse/storage/databases/main/events.py,
synapse/storage/databases/main/pusher.py,
synapse/storage/databases/main/registration.py,
synapse/storage/databases/main/stream.py,
synapse/storage/databases/main/ui_auth.py,
Expand Down
60 changes: 53 additions & 7 deletions synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,70 @@
# limitations under the License.

import abc
from typing import TYPE_CHECKING, Any, Dict
from typing import TYPE_CHECKING, Any, Dict, Optional

from synapse.types import RoomStreamToken
import attr

from synapse.types import JsonDict, RoomStreamToken

if TYPE_CHECKING:
from synapse.app.homeserver import HomeServer


@attr.s(slots=True)
class PusherConfig:
"""Parameters necessary to configure a pusher."""

id = attr.ib(type=Optional[str])
user_name = attr.ib(type=str)
access_token = attr.ib(type=Optional[int])
profile_tag = attr.ib(type=str)
kind = attr.ib(type=str)
app_id = attr.ib(type=str)
app_display_name = attr.ib(type=str)
device_display_name = attr.ib(type=str)
pushkey = attr.ib(type=str)
ts = attr.ib(type=int)
lang = attr.ib(type=Optional[str])
data = attr.ib(type=Optional[JsonDict])
last_stream_ordering = attr.ib(type=Optional[int])
last_success = attr.ib(type=Optional[int])
failing_since = attr.ib(type=Optional[int])

def as_dict(self) -> Dict[str, Any]:
"""Information that can be retrieved about a pusher after creation."""
return {
"app_display_name": self.app_display_name,
"app_id": self.app_id,
"data": self.data,
"device_display_name": self.device_display_name,
"kind": self.kind,
"lang": self.lang,
"profile_tag": self.profile_tag,
"pushkey": self.pushkey,
}


@attr.s(slots=True)
class ThrottleParams:
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""Parameters for controlling the rate of sending pushes via email."""

last_sent_ts = attr.ib(type=int)
throttle_ms = attr.ib(type=int)


class Pusher(metaclass=abc.ABCMeta):
def __init__(self, hs: "HomeServer", pusherdict: Dict[str, Any]):
def __init__(self, hs: "HomeServer", pusher_config: PusherConfig):
self.hs = hs
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()

self.pusher_id = pusherdict["id"]
self.user_id = pusherdict["user_name"]
self.app_id = pusherdict["app_id"]
self.pushkey = pusherdict["pushkey"]
self.pusher_id = pusher_config.id
self.user_id = pusher_config.user_name
self.app_id = pusher_config.app_id
self.pushkey = pusher_config.pushkey

self.last_stream_ordering = pusher_config.last_stream_ordering

# This is the highest stream ordering we know it's safe to process.
# When new events arrive, we'll be given a window of new events: we
Expand Down
27 changes: 14 additions & 13 deletions synapse/push/emailpusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional

from twisted.internet.base import DelayedCall
from twisted.internet.error import AlreadyCalled, AlreadyCancelled

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher
from synapse.push import Pusher, PusherConfig, ThrottleParams
from synapse.push.mailer import Mailer

if TYPE_CHECKING:
Expand Down Expand Up @@ -60,15 +60,14 @@ class EmailPusher(Pusher):
factor out the common parts
"""

def __init__(self, hs: "HomeServer", pusherdict: Dict[str, Any], mailer: Mailer):
super().__init__(hs, pusherdict)
def __init__(self, hs: "HomeServer", pusher_config: PusherConfig, mailer: Mailer):
super().__init__(hs, pusher_config)
self.mailer = mailer

self.store = self.hs.get_datastore()
self.email = pusherdict["pushkey"]
self.last_stream_ordering = pusherdict["last_stream_ordering"]
self.email = pusher_config.pushkey
self.timed_call = None # type: Optional[DelayedCall]
self.throttle_params = {} # type: Dict[str, Dict[str, int]]
self.throttle_params = {} # type: Dict[str, ThrottleParams]
self._inited = False

self._is_processing = False
Expand Down Expand Up @@ -132,6 +131,7 @@ async def _process(self) -> None:

if not self._inited:
# this is our first loop: load up the throttle params
assert self.pusher_id is not None
self.throttle_params = await self.store.get_throttle_params_by_room(
self.pusher_id
)
Expand All @@ -157,6 +157,7 @@ async def _unsafe_process(self) -> None:
being run.
"""
start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
assert start is not None
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email(
self.user_id, start, self.max_stream_ordering
)
Expand Down Expand Up @@ -244,13 +245,13 @@ def seconds_until(self, ts_msec: int) -> float:

def get_room_throttle_ms(self, room_id: str) -> int:
if room_id in self.throttle_params:
return self.throttle_params[room_id]["throttle_ms"]
return self.throttle_params[room_id].throttle_ms
else:
return 0

def get_room_last_sent_ts(self, room_id: str) -> int:
if room_id in self.throttle_params:
return self.throttle_params[room_id]["last_sent_ts"]
return self.throttle_params[room_id].last_sent_ts
else:
return 0

Expand Down Expand Up @@ -301,10 +302,10 @@ async def sent_notif_update_throttle(
new_throttle_ms = min(
current_throttle_ms * THROTTLE_MULTIPLIER, THROTTLE_MAX_MS
)
self.throttle_params[room_id] = {
"last_sent_ts": self.clock.time_msec(),
"throttle_ms": new_throttle_ms,
}
self.throttle_params[room_id] = ThrottleParams(
self.clock.time_msec(), new_throttle_ms,
)
assert self.pusher_id is not None
await self.store.set_throttle_params(
self.pusher_id, room_id, self.throttle_params[room_id]
)
Expand Down
36 changes: 18 additions & 18 deletions synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from synapse.events import EventBase
from synapse.logging import opentracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher, PusherConfigException
from synapse.push import Pusher, PusherConfig, PusherConfigException

from . import push_rule_evaluator, push_tools

Expand Down Expand Up @@ -62,33 +62,29 @@ class HttpPusher(Pusher):
# This one's in ms because we compare it against the clock
GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000

def __init__(self, hs: "HomeServer", pusherdict: Dict[str, Any]):
super().__init__(hs, pusherdict)
def __init__(self, hs: "HomeServer", pusher_config: PusherConfig):
super().__init__(hs, pusher_config)
self.storage = self.hs.get_storage()
self.app_display_name = pusherdict["app_display_name"]
self.device_display_name = pusherdict["device_display_name"]
self.pushkey_ts = pusherdict["ts"]
self.data = pusherdict["data"]
self.last_stream_ordering = pusherdict["last_stream_ordering"]
self.app_display_name = pusher_config.app_display_name
self.device_display_name = pusher_config.device_display_name
self.pushkey_ts = pusher_config.ts
self.data = pusher_config.data
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.failing_since = pusherdict["failing_since"]
self.failing_since = pusher_config.failing_since
self.timed_call = None
self._is_processing = False
self._group_unread_count_by_room = hs.config.push_group_unread_count_by_room

if "data" not in pusherdict:
raise PusherConfigException("No 'data' key for HTTP pusher")
self.data = pusherdict["data"]
self.data = pusher_config.data
if self.data is None:
raise PusherConfigException("'data' key can not be null for HTTP pusher")

self.name = "%s/%s/%s" % (
pusherdict["user_name"],
pusherdict["app_id"],
pusherdict["pushkey"],
pusher_config.user_name,
pusher_config.app_id,
pusher_config.pushkey,
)

if self.data is None:
raise PusherConfigException("data can not be null for HTTP pusher")

# Validate that there's a URL and it is of the proper form.
if "url" not in self.data:
raise PusherConfigException("'url' required in data for HTTP pusher")
Expand Down Expand Up @@ -180,6 +176,7 @@ async def _unsafe_process(self) -> None:
Never call this directly: use _process which will only allow this to
run once per pusher.
"""
assert self.last_stream_ordering is not None
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http(
self.user_id, self.last_stream_ordering, self.max_stream_ordering
)
Expand Down Expand Up @@ -208,6 +205,7 @@ async def _unsafe_process(self) -> None:
http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"]
assert self.last_stream_ordering is not None
pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.pushkey,
Expand Down Expand Up @@ -314,6 +312,8 @@ async def _build_notification_dict(
# or may do so (i.e. is encrypted so has unknown effects).
priority = "high"

# This was checked in the __init__, but mypy doesn't seem to know that.
assert self.data is not None
if self.data.get("format") == "event_id_only":
d = {
"notification": {
Expand Down
24 changes: 12 additions & 12 deletions synapse/push/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional
from typing import TYPE_CHECKING, Callable, Dict, Optional

from synapse.push import Pusher
from synapse.push import Pusher, PusherConfig
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
from synapse.push.mailer import Mailer
Expand All @@ -34,7 +34,7 @@ def __init__(self, hs: "HomeServer"):

self.pusher_types = {
"http": HttpPusher
} # type: Dict[str, Callable[[HomeServer, dict], Pusher]]
} # type: Dict[str, Callable[[HomeServer, PusherConfig], Pusher]]

logger.info("email enable notifs: %r", hs.config.email_enable_notifs)
if hs.config.email_enable_notifs:
Expand All @@ -47,18 +47,18 @@ def __init__(self, hs: "HomeServer"):

logger.info("defined email pusher type")

def create_pusher(self, pusherdict: Dict[str, Any]) -> Optional[Pusher]:
kind = pusherdict["kind"]
def create_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]:
kind = pusher_config.kind
f = self.pusher_types.get(kind, None)
if not f:
return None
logger.debug("creating %s pusher for %r", kind, pusherdict)
return f(self.hs, pusherdict)
logger.debug("creating %s pusher for %r", kind, pusher_config)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the attrs class give a sane repr?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attrs gives decent reprs, something like (the data is made up...):

"PusherConfig(id=1, user_name='@clokep:matrix.org', access_token=2, profile_tag='tag', kind='http', app_id='foobarbaz', app_display_name='Element Web', device_display_name='MBP', pushkey='asdffdsa', ts=1608127939, lang='en', data={'url': 'https://matrix.org/_matrix/push/v1/send'}, last_stream_ordering=0, last_success=1608127939, failing_since=None)"

return f(self.hs, pusher_config)

def _create_email_pusher(
self, _hs: "HomeServer", pusherdict: Dict[str, Any]
self, _hs: "HomeServer", pusher_config: PusherConfig
) -> EmailPusher:
app_name = self._app_name_from_pusherdict(pusherdict)
app_name = self._app_name_from_pusherdict(pusher_config)
mailer = self.mailers.get(app_name)
if not mailer:
mailer = Mailer(
Expand All @@ -68,10 +68,10 @@ def _create_email_pusher(
template_text=self._notif_template_text,
)
self.mailers[app_name] = mailer
return EmailPusher(self.hs, pusherdict, mailer)
return EmailPusher(self.hs, pusher_config, mailer)

def _app_name_from_pusherdict(self, pusherdict: Dict[str, Any]) -> str:
data = pusherdict["data"]
def _app_name_from_pusherdict(self, pusher_config: PusherConfig) -> str:
data = pusher_config.data

if isinstance(data, dict):
brand = data.get("brand")
Expand Down
Loading