Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix deleting device inbox when using background worker #16311

Merged
merged 2 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16311.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Delete device messages asynchronously and in staged batches using the task scheduler.
62 changes: 31 additions & 31 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,14 @@ def __init__(self, hs: "HomeServer"):
self._query_appservices_for_keys = (
hs.config.experimental.msc3984_appservice_key_query
)
self._task_scheduler = hs.get_task_scheduler()

self.device_list_updater = DeviceListWorkerUpdater(hs)

self._task_scheduler.register_action(
self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME
)

@trace
async def get_devices_by_user(self, user_id: str) -> List[JsonDict]:
"""
Expand Down Expand Up @@ -383,6 +388,32 @@ async def handle_room_un_partial_stated(self, room_id: str) -> None:
"Trying handling device list state for partial join: not supported on workers."
)

DEVICE_MSGS_DELETE_BATCH_LIMIT = 100

async def _delete_device_messages(
self,
task: ScheduledTask,
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
"""Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`."""
assert task.params is not None
user_id = task.params["user_id"]
device_id = task.params["device_id"]
up_to_stream_id = task.params["up_to_stream_id"]

res = await self.store.delete_messages_for_device(
user_id=user_id,
device_id=device_id,
up_to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
)

if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
return TaskStatus.COMPLETE, None, None
else:
# There is probably still device messages to be deleted, let's keep the task active and it will be run
# again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
return TaskStatus.ACTIVE, None, None


class DeviceHandler(DeviceWorkerHandler):
device_list_updater: "DeviceListUpdater"
Expand All @@ -394,7 +425,6 @@ def __init__(self, hs: "HomeServer"):
self._account_data_handler = hs.get_account_data_handler()
self._storage_controllers = hs.get_storage_controllers()
self.db_pool = hs.get_datastores().main.db_pool
self._task_scheduler = hs.get_task_scheduler()

self.device_list_updater = DeviceListUpdater(hs, self)

Expand Down Expand Up @@ -428,10 +458,6 @@ def __init__(self, hs: "HomeServer"):
self._delete_stale_devices,
)

self._task_scheduler.register_action(
self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME
)

def _check_device_name_length(self, name: Optional[str]) -> None:
"""
Checks whether a device name is longer than the maximum allowed length.
Expand Down Expand Up @@ -590,32 +616,6 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:

await self.notify_device_update(user_id, device_ids)

DEVICE_MSGS_DELETE_BATCH_LIMIT = 100

async def _delete_device_messages(
self,
task: ScheduledTask,
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
"""Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`."""
assert task.params is not None
user_id = task.params["user_id"]
device_id = task.params["device_id"]
up_to_stream_id = task.params["up_to_stream_id"]

res = await self.store.delete_messages_for_device(
user_id=user_id,
device_id=device_id,
up_to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
)

if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
return TaskStatus.COMPLETE, None, None
else:
# There is probably still device messages to be deleted, let's keep the task active and it will be run
# again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
return TaskStatus.ACTIVE, None, None

async def update_device(self, user_id: str, device_id: str, content: dict) -> None:
"""Update the given device

Expand Down
Loading