Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding sync minor performance speed up using new table #17787

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17787.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Sliding sync minor performance speed up using new table.
35 changes: 25 additions & 10 deletions synapse/handlers/sliding_sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
Requester,
SlidingSyncStreamToken,
StateMap,
StrCollection,
StreamKeyType,
StreamToken,
)
Expand Down Expand Up @@ -293,27 +294,41 @@ async def handle_room(room_id: str) -> None:
# to record rooms as having updates even if there might not actually
# be anything new for the user (e.g. due to event filters, events
# having happened after the user left, etc).
unsent_room_ids = []
if from_token:
# The set of rooms that the client (may) care about, but aren't
# in any list range (or subscribed to).
missing_rooms = all_rooms - relevant_room_map.keys()
missing_rooms = {
room_id
for room_id in all_rooms - relevant_room_map.keys()
# We only care about updates to *joined* rooms
if room_membership_for_user_map[room_id].membership
== Membership.JOIN
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we need to make sure we handle leaves that happen in this section

}

# We now just go and try fetching any events in the above rooms
# to see if anything has happened since the `from_token`.
#
# TODO: Replace this with something faster. When we land the
# sliding sync tables that record the most recent event
# positions we can use that.
missing_event_map_by_room = (
await self.store.get_room_events_stream_for_rooms(
room_ids=missing_rooms,
from_key=to_token.room_key,
to_key=from_token.stream_token.room_key,
limit=1,
unsent_room_ids: StrCollection
if await self.store.have_finished_sliding_sync_background_jobs():
unsent_room_ids = await (
self.store.get_rooms_that_have_updates_since_sliding_sync_table(
room_ids=missing_rooms,
from_key=from_token.stream_token.room_key,
)
)
)
unsent_room_ids = list(missing_event_map_by_room)
else:
missing_event_map_by_room = (
await self.store.get_room_events_stream_for_rooms(
room_ids=missing_rooms,
from_key=to_token.room_key,
to_key=from_token.stream_token.room_key,
limit=1,
)
)
unsent_room_ids = list(missing_event_map_by_room)

new_connection_state.rooms.record_unsent_rooms(
unsent_room_ids, from_token.stream_token.room_key
Expand Down
42 changes: 42 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,48 @@ def get_rooms_that_changed(
if self._events_stream_cache.has_entity_changed(room_id, from_id)
}

async def get_rooms_that_have_updates_since_sliding_sync_table(
self,
room_ids: StrCollection,
from_key: RoomStreamToken,
) -> StrCollection:
"""Return the rooms that probably have had updates since the given
token."""
# If the stream change cache is valid for the stream token, we can just
# use the result of that.
if from_key.stream >= self._events_stream_cache.get_earliest_known_position():
return self._events_stream_cache.get_entities_changed(
room_ids, from_key.stream
)

def get_rooms_that_have_updates_since_sliding_sync_table_txn(
txn: LoggingTransaction,
) -> StrCollection:
sql = """
SELECT room_id
FROM sliding_sync_joined_rooms
WHERE {clause}
AND event_stream_ordering > ?
"""

results: Set[str] = set()
for batch in batch_iter(room_ids, 1000):
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", batch
)

args.append(from_key.stream)
txn.execute(sql.format(clause=clause), args)

results.update(row[0] for row in txn)

return results

return await self.db_pool.runInteraction(
"get_rooms_that_have_updates_since_sliding_sync_table",
get_rooms_that_have_updates_since_sliding_sync_table_txn,
)

async def paginate_room_events_by_stream_ordering(
self,
*,
Expand Down
Loading