Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Backfill the thread_id column for event_push_actions and event_push_s…
Browse files Browse the repository at this point in the history
…ummary.
  • Loading branch information
clokep committed Sep 12, 2022
1 parent c333a82 commit 9dc5e22
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 0 deletions.
81 changes: 81 additions & 0 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
)
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached

Expand Down Expand Up @@ -240,6 +241,86 @@ def __init__(
unique=True,
)

self.db_pool.updates.register_background_update_handler(
"event_push_backfill_thread_id",
self._background_backfill_thread_id,
)

async def _background_backfill_thread_id(
self, progress: JsonDict, batch_size: int
) -> int:
event_push_actions_done = progress.get("event_push_actions_done", False)

def add_thread_id_txn(
txn: LoggingTransaction, table_name: str, start_stream_ordering: int
) -> int:
sql = f"""
SELECT stream_ordering
FROM {table_name}
WHERE
thread_id IS NULL
AND stream_ordering > ?
ORDER BY stream_ordering
LIMIT ?
"""
txn.execute(sql, (start_stream_ordering, batch_size))

# No more rows to process.
rows = txn.fetchall()
if not rows:
progress[f"{table_name}_done"] = True
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
return 0

# Update the thread ID for any of those rows.
max_stream_ordering = rows[-1][0]

sql = f"""
UPDATE {table_name}
SET thread_id = 'main'
WHERE stream_ordering <= ? AND thread_id IS NULL
"""
txn.execute(sql, (max_stream_ordering,))

# Update progress.
processed_rows = txn.rowcount
progress[f"max_{table_name}_stream_ordering"] = max_stream_ordering
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)

return processed_rows

# First update the event_push_actions table, then the event_push_summary table.
#
# Note that the event_push_actions_staging table is ignored since it is
# assumed that items in that table will only exist for a short period of
# time.
if not event_push_actions_done:
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
"event_push_actions",
progress.get("max_event_push_actions_stream_ordering", 0),
)
else:
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
"event_push_summary",
progress.get("max_event_push_summary_stream_ordering", 0),
)

# Only done after the event_push_summary table is done.
if not result:
await self.db_pool.updates._end_background_update(
"event_push_backfill_thread_id"
)

return result

@cached(tree=True, max_entries=5000)
async def get_unread_event_push_actions_by_room_for_user(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ ALTER TABLE event_push_summary ADD COLUMN thread_id TEXT;
-- Update the unique index for `event_push_summary`.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7006, 'event_push_summary_unique_index2', '{}');

INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(7006, 'event_push_backfill_thread_id', '{}', 'event_push_summary_unique_index2');

0 comments on commit 9dc5e22

Please sign in to comment.