Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use _invalidate_cache_and_stream_bulk in more places. #16616

Merged
merged 1 commit into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion changelog.d/16613.feature
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Improve the performance of claiming encryption keys in multi-worker deployments.
Improve the performance of some operations in multi-worker deployments.
1 change: 1 addition & 0 deletions changelog.d/16616.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve the performance of some operations in multi-worker deployments.
24 changes: 18 additions & 6 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,8 +747,16 @@ def _add_account_data_for_user(
)

# Invalidate the cache for any ignored users which were added or removed.
for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
self._invalidate_cache_and_stream_bulk(
txn,
self.ignored_by,
[
(ignored_user_id,)
for ignored_user_id in (
previously_ignored_users ^ currently_ignored_users
)
],
)
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))

async def remove_account_data_for_user(
Expand Down Expand Up @@ -824,10 +832,14 @@ def _remove_account_data_for_user_txn(
)

# Invalidate the cache for ignored users which were removed.
for ignored_user_id in previously_ignored_users:
self._invalidate_cache_and_stream(
txn, self.ignored_by, (ignored_user_id,)
)
self._invalidate_cache_and_stream_bulk(
txn,
self.ignored_by,
[
(ignored_user_id,)
for ignored_user_id in previously_ignored_users
],
)

# Invalidate for this user the cache tracking ignored users.
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
Expand Down
15 changes: 7 additions & 8 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -1222,14 +1222,13 @@ def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int:
)

# Iterate the parent IDs and invalidate caches.
for parent_id in {r[1] for r in relations_to_insert}:
cache_tuple = (parent_id,)
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined]
)
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined]
)
cache_tuples = {(r[1],) for r in relations_to_insert}
self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
txn, self.get_relations_for_event, cache_tuples # type: ignore[attr-defined]
)
self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
txn, self.get_thread_summary, cache_tuples # type: ignore[attr-defined]
)

if results:
latest_event_id = results[-1][0]
Expand Down
17 changes: 10 additions & 7 deletions synapse/storage/databases/main/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,16 @@ def store_server_keys_response_txn(txn: LoggingTransaction) -> None:
# invalidate takes a tuple corresponding to the params of
# _get_server_keys_json. _get_server_keys_json only takes one
# param, which is itself the 2-tuple (server_name, key_id).
for key_id in verify_keys:
self._invalidate_cache_and_stream(
txn, self._get_server_keys_json, ((server_name, key_id),)
)
self._invalidate_cache_and_stream(
txn, self.get_server_key_json_for_remote, (server_name, key_id)
)
self._invalidate_cache_and_stream_bulk(
txn,
self._get_server_keys_json,
[((server_name, key_id),) for key_id in verify_keys],
)
self._invalidate_cache_and_stream_bulk(
txn,
self.get_server_key_json_for_remote,
[(server_name, key_id) for key_id in verify_keys],
)

await self.db_pool.runInteraction(
"store_server_keys_response", store_server_keys_response_txn
Expand Down
9 changes: 5 additions & 4 deletions synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,11 @@ def _add_users_to_send_full_presence_to(txn: LoggingTransaction) -> None:
# for their user ID.
value_values=[(presence_stream_id,) for _ in user_ids],
)
for user_id in user_ids:
self._invalidate_cache_and_stream(
txn, self._get_full_presence_stream_token_for_user, (user_id,)
)
self._invalidate_cache_and_stream_bulk(
txn,
self._get_full_presence_stream_token_for_user,
[(user_id,) for user_id in user_ids],
)

return await self.db_pool.runInteraction(
"add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to
Expand Down
31 changes: 20 additions & 11 deletions synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,19 +295,28 @@ def _purge_history_txn(
# so make sure to keep this actually last.
txn.execute("DROP TABLE events_to_purge")

for event_id, should_delete in event_rows:
self._invalidate_cache_and_stream(
txn, self._get_state_group_for_event, (event_id,)
)
self._invalidate_cache_and_stream_bulk(
txn,
self._get_state_group_for_event,
[(event_id,) for event_id, _ in event_rows],
)

# XXX: This is racy, since have_seen_events could be called between the
# transaction completing and the invalidation running. On the other hand,
# that's no different to calling `have_seen_events` just before the
# event is deleted from the database.
# XXX: This is racy, since have_seen_events could be called between the
# transaction completing and the invalidation running. On the other hand,
# that's no different to calling `have_seen_events` just before the
# event is deleted from the database.
self._invalidate_cache_and_stream_bulk(
txn,
self.have_seen_event,
[
(room_id, event_id)
for event_id, should_delete in event_rows
if should_delete
],
)

for event_id, should_delete in event_rows:
if should_delete:
self._invalidate_cache_and_stream(
txn, self.have_seen_event, (room_id, event_id)
)
self.invalidate_get_event_cache_after_txn(txn, event_id)
Comment on lines +318 to 320
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This invalidates local caches, so I think is fine to leave in the loop.


logger.info("[purge] done")
Expand Down
20 changes: 10 additions & 10 deletions synapse/storage/databases/main/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,16 +561,15 @@ def set_shadow_banned_txn(txn: LoggingTransaction) -> None:
updatevalues={"shadow_banned": shadow_banned},
)
# In order for this to apply immediately, clear the cache for this user.
tokens = self.db_pool.simple_select_onecol_txn(
tokens = self.db_pool.simple_select_list_txn(
txn,
table="access_tokens",
keyvalues={"user_id": user_id},
retcol="token",
retcols=("token",),
)
Comment on lines +564 to +569
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of unpacking a tuple from the database (using the select_onecol version) and then repacking it into a tuple, it makes more sense to simply pass the tuples in.

self._invalidate_cache_and_stream_bulk(
txn, self.get_user_by_access_token, tokens
)
for token in tokens:
self._invalidate_cache_and_stream(
txn, self.get_user_by_access_token, (token,)
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))

await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn)
Expand Down Expand Up @@ -2683,10 +2682,11 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, int, Optional[str]]]:
)
tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]

for token, _, _ in tokens_and_devices:
self._invalidate_cache_and_stream(
txn, self.get_user_by_access_token, (token,)
)
self._invalidate_cache_and_stream_bulk(
txn,
self.get_user_by_access_token,
[(token,) for token, _, _ in tokens_and_devices],
)

txn.execute("DELETE FROM access_tokens WHERE %s" % where_clause, values)

Expand Down
Loading