Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Handle half-created indices in receipts index background update #14650

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 46 additions & 9 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,48 @@ def register_background_index_update(
The named index will be dropped upon completion of the new index.
"""

async def updater(progress: JsonDict, batch_size: int) -> int:
await self.create_index_in_background(
index_name=index_name,
table=table,
columns=columns,
where_clause=where_clause,
unique=unique,
psql_only=psql_only,
replaces_index=replaces_index,
)
await self._end_background_update(update_name)
return 1

self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
updater, oneshot=True
)

async def create_index_in_background(
self,
index_name: str,
table: str,
columns: Iterable[str],
where_clause: Optional[str] = None,
unique: bool = False,
psql_only: bool = False,
replaces_index: Optional[str] = None,
) -> None:
"""Add an index in the background.

Args:
update_name: update_name to register for
index_name: name of index to add
table: table to add index to
columns: columns/expressions to include in index
where_clause: A WHERE clause to specify a partial unique index.
unique: true to make a UNIQUE index
psql_only: true to only create this index on psql databases (useful
for virtual sqlite tables)
replaces_index: The name of an index that this index replaces.
The named index will be dropped upon completion of the new index.
"""

def create_index_psql(conn: Connection) -> None:
conn.rollback()
# postgres insists on autocommit for the index
Expand Down Expand Up @@ -618,16 +660,11 @@ def create_index_sqlite(conn: Connection) -> None:
else:
runner = create_index_sqlite

async def updater(progress: JsonDict, batch_size: int) -> int:
if runner is not None:
logger.info("Adding index %s to %s", index_name, table)
await self.db_pool.runWithConnection(runner)
await self._end_background_update(update_name)
return 1
if runner is None:
return

self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
updater, oneshot=True
)
logger.info("Adding index %s to %s", index_name, table)
await self.db_pool.runWithConnection(runner)

async def _end_background_update(self, update_name: str) -> None:
"""Removes a completed background update task from the queue.
Expand Down
18 changes: 12 additions & 6 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,9 +999,12 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
_remote_duplicate_receipts_txn,
)

await self._create_receipts_index(
"receipts_linearized_unique_index",
"receipts_linearized",
await self.db_pool.updates.create_index_in_background(
index_name="receipts_linearized_unique_index",
table="receipts_linearized",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)

await self.db_pool.updates._end_background_update(
Expand Down Expand Up @@ -1050,9 +1053,12 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
_remote_duplicate_receipts_txn,
)

await self._create_receipts_index(
"receipts_graph_unique_index",
"receipts_graph",
Comment on lines -1053 to -1055
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is _create_receipts_index now dead code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot, thanks.

await self.db_pool.updates.create_index_in_background(
index_name="receipts_graph_unique_index",
table="receipts_graph",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)

await self.db_pool.updates._end_background_update(
Expand Down