Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
anoadragon453 committed Sep 16, 2021
1 parent ac22650 commit 89fe879
Showing 1 changed file with 53 additions and 2 deletions.
55 changes: 53 additions & 2 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,34 +246,65 @@ async def _notify_interested_services_ephemeral(
stream_key: str,
new_token: Optional[int],
users: Collection[Union[str, UserID]],
):
) -> None:
logger.debug("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
# Only handle typing if we have the latest token
if stream_key == "typing_key" and new_token is not None:
# Note that we don't persist the token (via set_type_stream_id_for_appservice)
# for typing_key due to performance reasons.
#
# Instead we simply grab the latest typing updates for the service
# in _handle_typing.
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
# We don't persist the token for typing_key for performance reasons
elif stream_key == "receipt_key":
events = await self._handle_receipts(service)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)
elif stream_key == "presence_key":
events = await self._handle_presence(service, users)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)
elif stream_key == "device_list_key":
events = await self._handle_device_list_updates(service, users)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)

# TODO: Are we doing this for device list updates?
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)

async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
"""
Given an application service, determine which events this appservice should receive
from the given typing event stream token and now.
Args:
service: The application service to check for which events it should receive.
new_token: A typing event stream token. Typing events between this token and
the current event stream token will be checked.
Wait this is the latest token...
Returns:
A list of JSON dictionaries containing data derived from the typing events that
should be sent to this appservice.
"""
typing_source = self.event_sources.sources["typing"]
# Get the typing events from just before current
typing, _ = await typing_source.get_new_events_as(
Expand All @@ -286,6 +317,19 @@ async def _handle_typing(
return typing

async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
"""
Given an application service, determine which events this appservice should receive
from the given typing event stream token and now.
Args:
service: The application service to check for which events it should receive.
new_token: A typing event stream token. Typing events between this token and
the current event stream token will be checked.
Returns:
A list of JSON dictionaries containing data derived from the typing events that
should be sent to this appservice.
"""
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
Expand Down Expand Up @@ -329,6 +373,13 @@ async def _handle_presence(

return events

async def _handle_device_list_updates(
self,
service: ApplicationService,
users: Collection[Union[str, UserID]],
) -> List[JsonDict]:
events: List[JsonDict] = []

async def query_user_exists(self, user_id: str) -> bool:
"""Check if any application service knows this user_id exists.
Expand Down

0 comments on commit 89fe879

Please sign in to comment.