From 89fe879fe6a1db294a3934ccd303bffa58ef1259 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 16 Sep 2021 07:15:39 +0100 Subject: [PATCH] wip --- synapse/handlers/appservice.py | 55 ++++++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 9d3394cba52a..ab706fc61e1b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -246,20 +246,26 @@ async def _notify_interested_services_ephemeral( stream_key: str, new_token: Optional[int], users: Collection[Union[str, UserID]], - ): + ) -> None: logger.debug("Checking interested services for %s" % (stream_key)) with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: # Only handle typing if we have the latest token if stream_key == "typing_key" and new_token is not None: + # Note that we don't persist the token (via set_type_stream_id_for_appservice) + # for typing_key due to performance reasons. + # + # Instead we simply grab the latest typing updates for the service + # in _handle_typing. events = await self._handle_typing(service, new_token) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) - # We don't persist the token for typing_key for performance reasons elif stream_key == "receipt_key": events = await self._handle_receipts(service) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) + + # Persist the latest handled stream token for this appservice await self.store.set_type_stream_id_for_appservice( service, "read_receipt", new_token ) @@ -267,6 +273,17 @@ async def _notify_interested_services_ephemeral( events = await self._handle_presence(service, users) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) + + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) + elif stream_key == "device_list_key": + events = await self._handle_device_list_updates(service, users) + if events: + self.scheduler.submit_ephemeral_events_for_as(service, events) + + # TODO: Are we doing this for device list updates? await self.store.set_type_stream_id_for_appservice( service, "presence", new_token ) @@ -274,6 +291,20 @@ async def _notify_interested_services_ephemeral( async def _handle_typing( self, service: ApplicationService, new_token: int ) -> List[JsonDict]: + """ + Given an application service, determine which events this appservice should receive + from the given typing event stream token and now. + + Args: + service: The application service to check for which events it should receive. + new_token: A typing event stream token. Typing events between this token and + the current event stream token will be checked. + Wait this is the latest token... + + Returns: + A list of JSON dictionaries containing data derived from the typing events that + should be sent to this appservice. + """ typing_source = self.event_sources.sources["typing"] # Get the typing events from just before current typing, _ = await typing_source.get_new_events_as( @@ -286,6 +317,19 @@ async def _handle_typing( return typing async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: + """ + Given an application service, determine which events this appservice should receive + from the given typing event stream token and now. + + Args: + service: The application service to check for which events it should receive. + new_token: A typing event stream token. Typing events between this token and + the current event stream token will be checked. + + Returns: + A list of JSON dictionaries containing data derived from the typing events that + should be sent to this appservice. + """ from_key = await self.store.get_type_stream_id_for_appservice( service, "read_receipt" ) @@ -329,6 +373,13 @@ async def _handle_presence( return events + async def _handle_device_list_updates( + self, + service: ApplicationService, + users: Collection[Union[str, UserID]], + ) -> List[JsonDict]: + events: List[JsonDict] = [] + async def query_user_exists(self, user_id: str) -> bool: """Check if any application service knows this user_id exists.