Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't keep old stream_ordering_to_exterm around #15382

Merged
merged 7 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15382.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve DB performance of clearing out old data from `stream_ordering_to_exterm`.
10 changes: 10 additions & 0 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@ async def get_user_ids_changed(
possibly_changed = set(changed)
possibly_left = set()
for room_id in rooms_changed:
# Check if the forward extremities have changed. If not then we know
# the current state won't have changed, and so we can skip this room.
try:
if not await self.store.have_room_forward_extremities_changed_since(
room_id, stream_ordering
):
continue
except errors.StoreError:
pass

current_state_ids = await self._state_storage.get_current_state_ids(
room_id, await_full_state=False
)
Expand Down
52 changes: 42 additions & 10 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,38 @@ def _get_min_depth_interaction(

return int(min_depth) if min_depth is not None else None

async def have_room_forward_extremities_changed_since(
self,
room_id: str,
stream_ordering: int,
) -> bool:
"""Check if the forward extremities in a room have changed since the
given stream ordering

Throws a StoreError if we have since purged the index for
stream_orderings from that point.
"""

if stream_ordering <= self.stream_ordering_month_ago: # type: ignore[attr-defined]
raise StoreError(400, f"stream_ordering too old {stream_ordering}")

sql = """
SELECT 1 FROM stream_ordering_to_exterm
WHERE stream_ordering > ? AND room_id = ?
LIMIT 1
"""

def have_room_forward_extremities_changed_since_txn(
txn: LoggingTransaction,
) -> bool:
txn.execute(sql, (stream_ordering, room_id))
return txn.fetchone() is not None

return await self.db_pool.runInteraction(
"have_room_forward_extremities_changed_since",
have_room_forward_extremities_changed_since_txn,
)

@cancellable
async def get_forward_extremities_for_room_at_stream_ordering(
self, room_id: str, stream_ordering: int
Expand Down Expand Up @@ -1232,10 +1264,17 @@ def get_forward_extremeties_for_room_txn(txn: LoggingTransaction) -> List[str]:
txn.execute(sql, (stream_ordering, room_id))
return [event_id for event_id, in txn]

return await self.db_pool.runInteraction(
event_ids = await self.db_pool.runInteraction(
"get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
)

# If we didn't find any IDs, then we must have cleared out the
# associated `stream_ordering_to_exterm`.
if not event_ids:
raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,))

return event_ids

Comment on lines +1271 to +1277
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new error condition, i.e. was it impossible for the interaction to return a falsey value before?

I think before this could fail, but we'd try to keep it working for the last month's worth of stream orderings?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not, and note that the one caller treats empty list and StoreError the same.

This is and was hit when we asked for the extremities at a recent ish stream ordering where we had purged all entries in the room before that stream ordering (we'd keep around one entry per room but that might be after that).

def _get_connected_batch_event_backfill_results_txn(
self, txn: LoggingTransaction, insertion_event_id: str, limit: int
) -> List[BackfillQueueNavigationItem]:
Expand Down Expand Up @@ -1664,19 +1703,12 @@ async def get_successor_events(self, event_id: str) -> List[str]:
@wrap_as_background_process("delete_old_forward_extrem_cache")
async def _delete_old_forward_extrem_cache(self) -> None:
def _delete_old_forward_extrem_cache_txn(txn: LoggingTransaction) -> None:
# Delete entries older than a month, while making sure we don't delete
# the only entries for a room.
Comment on lines -1667 to -1668
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chesterton's Fence: do we know why this was here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been trying to figure this out, but couldn't see any reason for it until last night.

I think it may have been to protect against the case of a room that hasn't been updated in the last month, where you want to return the most recent extremities, so that the caller then becomes a no-op.

Let me think how best to keep that optimisation.

sql = """
DELETE FROM stream_ordering_to_exterm
WHERE
room_id IN (
SELECT room_id
FROM stream_ordering_to_exterm
WHERE stream_ordering > ?
) AND stream_ordering < ?
WHERE stream_ordering < ?
"""
txn.execute(
sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago) # type: ignore[attr-defined]
sql, (self.stream_ordering_month_ago) # type: ignore[attr-defined]
)

await self.db_pool.runInteraction(
Expand Down