Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add presence_stream_id column to users_to_send_full_presence_to table
Browse files Browse the repository at this point in the history
We now return full presence in a /sync response based on the provided sync token,
rather than if the user has received the response already. This is necessary in
circumstances where users have more than one device - which they often do!

This comment also modifies the spot in PresenceHandler where we check if
the user should receive all presence. We only need to make the check if
from_key is not None. If from_key is None, the user will be receiving
all presence states anyways.
  • Loading branch information
anoadragon453 committed May 5, 2021
1 parent caee7da commit 46eccf1
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 48 deletions.
34 changes: 12 additions & 22 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1412,22 +1412,22 @@ async def get_new_events(
user_id = user.to_string()
stream_change_cache = self.store.presence_stream_cache

# Check if this user should receive all current, online user presence
user_in_users_to_send_full_presence_to = (
await self.store.is_user_in_users_to_send_full_presence_to(user_id)
)

with Measure(self.clock, "presence.get_new_events"):
if user_in_users_to_send_full_presence_to:
# This user has been specified by a module to receive all current, online
# user presence. Removing from_key and setting include_offline to false
# will do effectively this.
from_key = None
include_offline = False

if from_key is not None:
from_key = int(from_key)

# Check if this user should receive all current, online user presence. We only
# bother to do this if from_key is set, as otherwise the user will receive all
# user presence anyways.
if await self.store.should_user_receive_full_presence_with_token(
user_id, from_key
):
# This user has been specified by a module to receive all current, online
# user presence. Removing from_key and setting include_offline to false
# will do effectively this.
from_key = None
include_offline = False

max_token = self.store.get_current_presence_token()
if from_key == max_token:
# This is necessary as due to the way stream ID generators work
Expand Down Expand Up @@ -1461,12 +1461,6 @@ async def get_new_events(
user_id, include_offline, from_key
)

# Remove the user from the list of users to receive all presence
if user_in_users_to_send_full_presence_to:
await self.store.remove_user_from_users_to_send_full_presence_to(
user_id
)

return presence_updates, max_token

# Make mypy happy. users_interested_in should now be a set
Expand Down Expand Up @@ -1516,10 +1510,6 @@ async def get_new_events(
)
presence_updates = list(users_to_state.values())

# Remove the user from the list of users to receive all presence
if user_in_users_to_send_full_presence_to:
await self.store.remove_user_from_users_to_send_full_presence_to(user_id)

if not include_offline:
# Filter out offline presence states
presence_updates = self._filter_offline_presence_state(presence_updates)
Expand Down
50 changes: 25 additions & 25 deletions synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,62 +210,62 @@ async def get_presence_for_users(self, user_ids):

return {row["user_id"]: UserPresenceState(**row) for row in rows}

async def is_user_in_users_to_send_full_presence_to(self, user_id: str) -> bool:
"""Check whether the given user is one we need to send full presence to.
async def should_user_receive_full_presence_with_token(
self,
user_id: str,
from_token: int,
) -> bool:
"""Check whether the given user should receive full presence using the stream token
they're updating from.
Args:
user_id: The ID of the user to check.
from_token: The stream token included in their /sync token.
Returns:
True if the user should have full presence sent to them, False otherwise.
"""

def _is_user_in_users_to_send_full_presence_to_txn(txn):
def _should_user_receive_full_presence_with_token_txn(txn):
sql = """
SELECT 1 FROM users_to_send_full_presence_to
WHERE user_id = ?
AND presence_stream_id >= ?
"""
txn.execute(sql, (user_id,))
txn.execute(sql, (user_id, from_token))
return bool(txn.fetchone())

return await self.db_pool.runInteraction(
"is_user_in_users_to_send_full_presence_to",
_is_user_in_users_to_send_full_presence_to_txn,
"should_user_receive_full_presence_with_token",
_should_user_receive_full_presence_with_token_txn,
)

async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]):
"""Adds to the list of users who should receive a full snapshot of presence
upon their next sync.
Entries are kept for at least USERS_TO_SEND_FULL_PRESENCE_TO_ENTRY_LIFETIME_MS,
and are culled whenever this method is called.
Args:
user_ids: An iterable of user IDs.
"""
# Add user entries to the table

# Add user entries to the table, updating the presence_stream_id column if the user already
# exists in the table.
await self.db_pool.simple_upsert_many(
table="users_to_send_full_presence_to",
key_names=("user_id",),
key_values=[(user_id,) for user_id in user_ids],
value_names=(),
value_values=(),
value_names=("presence_stream_id",),
# We save the current presence stream ID token along with the user ID entry so
# that when a user /sync's, even if they syncing multiple times across separate
# devices at different times, each device will receive full presence once - when
# the presence stream ID in their sync token is less than the one in the table
# for their user ID.
value_values=(
(self._presence_id_gen.get_current_token(),) for _ in user_ids
),
desc="add_users_to_send_full_presence_to",
)

async def remove_user_from_users_to_send_full_presence_to(self, user_id: str):
"""Removes a user from those to send full presence to. This should only be done
once we have sent full presence to them following a successful sync.
Args:
user_id: The ID of the user to remove from the table.
"""
await self.db_pool.simple_delete(
table="users_to_send_full_presence_to",
keyvalues={"user_id": user_id},
desc="remove_user_from_users_to_send_full_presence_to",
)

async def get_presence_for_all_users(
self,
include_offline: bool = True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
CREATE TABLE IF NOT EXISTS users_to_send_full_presence_to(
-- The user ID to send full presence to.
user_id TEXT PRIMARY KEY,
-- A presence stream ID token - the current presence stream token when the row was last upserted.
-- If a user calls /sync and this token is part of the update they're to receive, we also include
-- full user presence in the response.
-- This allows multiple devices for a user to receive full presence whenever they next call /sync.
presence_stream_id BIGINT,
FOREIGN KEY (user_id)
REFERENCES users (name)
);
);

0 comments on commit 46eccf1

Please sign in to comment.