Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Revert "Fixup pusher pool notifications"
Browse files Browse the repository at this point in the history
This reverts commit e7fd336.
  • Loading branch information
erikjohnston committed Sep 9, 2020
1 parent e7fd336 commit dc9dcdb
Show file tree
Hide file tree
Showing 7 changed files with 8 additions and 23 deletions.
2 changes: 1 addition & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2970,7 +2970,7 @@ async def _notify_persisted_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
)

await self.pusher_pool.on_new_notifications(max_stream_id)
await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)

async def _clean_room_for_join(self, room_id: str) -> None:
"""Called to clean up any data in DB for a given room, ready for the
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ def is_inviter_member_event(e):
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)

await self.pusher_pool.on_new_notifications(max_stream_id)
await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)

def _notify():
try:
Expand Down
2 changes: 1 addition & 1 deletion synapse/push/emailpusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def on_stop(self):
pass
self.timed_call = None

def on_new_notifications(self, max_stream_ordering):
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
if self.max_stream_ordering:
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering
Expand Down
2 changes: 1 addition & 1 deletion synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def on_started(self, should_check_for_notifs):
if should_check_for_notifs:
self._start_processing()

def on_new_notifications(self, max_stream_ordering):
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering or 0
)
Expand Down
19 changes: 3 additions & 16 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,6 @@ def __init__(self, hs: "HomeServer"):
self._pusher_shard_config = hs.config.push.pusher_shard_config
self._instance_name = hs.get_instance_name()

# Record the last stream ID that we were poked about so we can get
# changes since then. We set this to the current max stream ID on
# startup as every individual pusher will have checked for changes on
# startup.
self._last_room_stream_id_seen = self.store.get_room_max_stream_ordering()

# map from user id to app_id:pushkey to pusher
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]

Expand Down Expand Up @@ -184,27 +178,20 @@ async def remove_pushers_by_access_token(self, user_id, access_tokens):
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])

async def on_new_notifications(self, max_stream_id):
async def on_new_notifications(self, min_stream_id, max_stream_id):
if not self.pushers:
# nothing to do here.
return

if max_stream_id < self._last_room_stream_id_seen:
# Nothing to do
return

prev_stream_id = self._last_room_stream_id_seen
self._last_room_stream_id_seen = max_stream_id

try:
users_affected = await self.store.get_push_action_users_in_range(
prev_stream_id, max_stream_id
min_stream_id, max_stream_id
)

for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
p.on_new_notifications(max_stream_id)
p.on_new_notifications(min_stream_id, max_stream_id)

except Exception:
logger.exception("Exception in pusher on_new_notifications")
Expand Down
3 changes: 1 addition & 2 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ async def on_rdata(
max_token = self.store.get_room_max_stream_ordering()
self.notifier.on_new_room_event(event, token, max_token, extra_users)

max_token = self.store.get_room_max_stream_ordering()
await self.pusher_pool.on_new_notifications(max_token)
await self.pusher_pool.on_new_notifications(token, token)

# Notify any waiting deferreds. The list is ordered by position so we
# just iterate through the list until we reach a position that is
Expand Down
1 change: 0 additions & 1 deletion tests/handlers/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ def make_homeserver(self, reactor, clock):
"get_user_directory_stream_pos",
"get_current_state_deltas",
"get_device_updates_by_remote",
"get_room_max_stream_ordering",
]
)

Expand Down

0 comments on commit dc9dcdb

Please sign in to comment.