Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fixup pusher pool notifications (#8287)
Browse files Browse the repository at this point in the history
`pusher_pool.on_new_notifications` expected a min and max stream ID, however that was not what we were passing in. Instead, let's just pass it the current max stream ID and have it track the last stream ID it got passed.

I believe that it mostly worked as we called the function for every event. However, it would break for events that got persisted out of order, i.e, that were persisted but the max stream ID wasn't incremented as not all preceding events had finished persisting, and push for that event would be delayed until another event got pushed to the effected users.
  • Loading branch information
erikjohnston authored Sep 9, 2020
1 parent dc9dcdb commit c9dbee5
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 8 deletions.
1 change: 1 addition & 0 deletions changelog.d/8287.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix edge case where push could get delayed for a user until a later event was pushed.
2 changes: 1 addition & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2970,7 +2970,7 @@ async def _notify_persisted_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
)

await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
await self.pusher_pool.on_new_notifications(max_stream_id)

async def _clean_room_for_join(self, room_id: str) -> None:
"""Called to clean up any data in DB for a given room, ready for the
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ def is_inviter_member_event(e):
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)

await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
await self.pusher_pool.on_new_notifications(max_stream_id)

def _notify():
try:
Expand Down
2 changes: 1 addition & 1 deletion synapse/push/emailpusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def on_stop(self):
pass
self.timed_call = None

def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
def on_new_notifications(self, max_stream_ordering):
if self.max_stream_ordering:
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering
Expand Down
2 changes: 1 addition & 1 deletion synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def on_started(self, should_check_for_notifs):
if should_check_for_notifs:
self._start_processing()

def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
def on_new_notifications(self, max_stream_ordering):
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering or 0
)
Expand Down
19 changes: 16 additions & 3 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ def __init__(self, hs: "HomeServer"):
self._pusher_shard_config = hs.config.push.pusher_shard_config
self._instance_name = hs.get_instance_name()

# Record the last stream ID that we were poked about so we can get
# changes since then. We set this to the current max stream ID on
# startup as every individual pusher will have checked for changes on
# startup.
self._last_room_stream_id_seen = self.store.get_room_max_stream_ordering()

# map from user id to app_id:pushkey to pusher
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]

Expand Down Expand Up @@ -178,20 +184,27 @@ async def remove_pushers_by_access_token(self, user_id, access_tokens):
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])

async def on_new_notifications(self, min_stream_id, max_stream_id):
async def on_new_notifications(self, max_stream_id):
if not self.pushers:
# nothing to do here.
return

if max_stream_id < self._last_room_stream_id_seen:
# Nothing to do
return

prev_stream_id = self._last_room_stream_id_seen
self._last_room_stream_id_seen = max_stream_id

try:
users_affected = await self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
prev_stream_id, max_stream_id
)

for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
p.on_new_notifications(min_stream_id, max_stream_id)
p.on_new_notifications(max_stream_id)

except Exception:
logger.exception("Exception in pusher on_new_notifications")
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ async def on_rdata(
max_token = self.store.get_room_max_stream_ordering()
self.notifier.on_new_room_event(event, token, max_token, extra_users)

await self.pusher_pool.on_new_notifications(token, token)
max_token = self.store.get_room_max_stream_ordering()
await self.pusher_pool.on_new_notifications(max_token)

# Notify any waiting deferreds. The list is ordered by position so we
# just iterate through the list until we reach a position that is
Expand Down
1 change: 1 addition & 0 deletions tests/handlers/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def make_homeserver(self, reactor, clock):
"get_user_directory_stream_pos",
"get_current_state_deltas",
"get_device_updates_by_remote",
"get_room_max_stream_ordering",
]
)

Expand Down

0 comments on commit c9dbee5

Please sign in to comment.