Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add automatic purge after all users forgotten a room
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Velten committed Apr 27, 2023
1 parent 6efa674 commit 20b2945
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 35 deletions.
1 change: 1 addition & 0 deletions changelog.d/15488.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add automatic purge after all users forgotten a room.
95 changes: 64 additions & 31 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def __init__(self, hs: "HomeServer"):
self._retention_allowed_lifetime_max = (
hs.config.retention.retention_allowed_lifetime_max
)
self._redaction_retention_period = hs.config.server.redaction_retention_period
self._is_master = hs.config.worker.worker_app is None

if hs.config.retention.retention_enabled and self._is_master:
Expand All @@ -176,6 +177,44 @@ def __init__(self, hs: "HomeServer"):
job.longest_max_lifetime,
)

if self._is_master:
self.clock.looping_call(
run_as_background_process,
3600 * 1000,
"purge_rooms",
self.purge_rooms,
)

async def purge_rooms(self) -> None:
rooms_to_purge = await self.store.get_rooms_to_purge()
for r in rooms_to_purge:
if r["status"] == "complete":
# TODO cleanup the table
continue

room_id = r["room_id"]
delete_id = r["delete_id"]

delete_status = self._delete_by_id.get(delete_id)
if delete_status is not None:
# a purge background task is already running (or has run)
# for this delete id
continue

purge_now = False
if r["timestamp"] is None:
purge_now = True
else:
time_since_marked = self.clock.time_msec() - r["timestamp"]
if time_since_marked >= self._redaction_retention_period:
purge_now = True

# TODO 2 stages purge, keep memberships for a while so we don't "break" sync
if purge_now:
self._delete_by_id[delete_id] = DeleteStatus()
self._delete_by_room.setdefault(room_id, []).append(delete_id)
await self.purge_room(room_id, delete_id, True)

async def purge_history_for_rooms_in_range(
self, min_ms: Optional[int], max_ms: Optional[int]
) -> None:
Expand Down Expand Up @@ -399,22 +438,30 @@ def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]:
"""
return self._delete_by_room.get(room_id)

async def purge_room(self, room_id: str, force: bool = False) -> None:
async def purge_room(
self,
room_id: str,
delete_id: str,
force: bool = False,
) -> None:
"""Purge the given room from the database.
This function is part the delete room v1 API.
Args:
room_id: room to be purged
force: set true to skip checking for joined users.
"""
logger.info("starting purge room_id %s", room_id)

async with self.pagination_lock.write(room_id):
# first check that we have no users in this room
if not force:
joined = await self.store.is_host_joined(room_id, self._server_name)
if joined:
raise SynapseError(400, "Users are still joined to this room")

await self._storage_controllers.purge_events.purge_room(room_id)
await self._storage_controllers.purge_events.purge_room(room_id, delete_id)

logger.info("purge complete for room_id %s", room_id)

@trace
async def get_messages(
Expand Down Expand Up @@ -654,36 +701,22 @@ async def _shutdown_and_purge_room(

self._purges_in_progress_by_room.add(room_id)
try:
async with self.pagination_lock.write(room_id):
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
self._delete_by_id[
delete_id
].shutdown_room = await self._room_shutdown_handler.shutdown_room(
room_id=room_id,
requester_user_id=requester_user_id,
new_room_user_id=new_room_user_id,
new_room_name=new_room_name,
message=message,
block=block,
)
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING

if purge:
logger.info("starting purge room_id %s", room_id)

# first check that we have no users in this room
if not force_purge:
joined = await self.store.is_host_joined(
room_id, self._server_name
)
if joined:
raise SynapseError(
400, "Users are still joined to this room"
)
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
self._delete_by_id[
delete_id
].shutdown_room = await self._room_shutdown_handler.shutdown_room(
room_id=room_id,
requester_user_id=requester_user_id,
new_room_user_id=new_room_user_id,
new_room_name=new_room_name,
message=message,
block=block,
)

await self._storage_controllers.purge_events.purge_room(room_id)
if purge:
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING
await self.purge_room(room_id, delete_id, force_purge)

logger.info("purge complete for room_id %s", room_id)
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
except Exception:
f = Failure()
Expand Down
3 changes: 3 additions & 0 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -2066,6 +2066,9 @@ async def forget(self, user: UserID, room_id: str) -> None:
# the table `current_state_events` and `get_current_state_events` is `None`.
await self.store.forget(user_id, room_id)

if await self.store.is_locally_forgotten_room(room_id):
await self.store.upsert_room_to_purge(room_id, None, self.clock.time_msec())


def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]:
"""
Expand Down
7 changes: 6 additions & 1 deletion synapse/rest/admin/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from synapse.types import JsonDict, RoomID, UserID, create_requester
from synapse.types.state import StateFilter
from synapse.util import json_decoder
from synapse.util.stringutils import random_string

if TYPE_CHECKING:
from synapse.api.auth import Auth
Expand Down Expand Up @@ -356,10 +357,14 @@ async def _delete_room(
block=block,
)

delete_id = random_string(16)

# Purge room
if purge:
try:
await pagination_handler.purge_room(room_id, force=force_purge)
await pagination_handler.purge_room(
room_id, delete_id, force=force_purge
)
except NotFoundError:
if block:
# We can block unknown rooms with this endpoint, in which case
Expand Down
2 changes: 2 additions & 0 deletions synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,8 +944,10 @@ async def on_GET(
class RoomForgetRestServlet(TransactionRestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.store = hs.get_datastores().main
self.room_member_handler = hs.get_room_member_handler()
self.auth = hs.get_auth()
self.clock = hs.get_clock()

def register(self, http_server: HttpServer) -> None:
PATTERNS = "/rooms/(?P<room_id>[^/]*)/forget"
Expand Down
11 changes: 10 additions & 1 deletion synapse/storage/controllers/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,23 @@ class PurgeEventsStorageController:

def __init__(self, hs: "HomeServer", stores: Databases):
self.stores = stores
self.clock = hs.get_clock()

async def purge_room(self, room_id: str) -> None:
async def purge_room(self, room_id: str, delete_id: str) -> None:
"""Deletes all record of a room"""

await self.stores.main.upsert_room_to_purge(
room_id, delete_id, self.clock.time_msec(), "purging"
)

with nested_logging_context(room_id):
state_groups_to_delete = await self.stores.main.purge_room(room_id)
await self.stores.state.purge_room_state(room_id, state_groups_to_delete)

await self.stores.main.upsert_room_to_purge(
room_id, delete_id, self.clock.time_msec(), "complete"
)

async def purge_history(
self, room_id: str, token: str, delete_local_events: bool
) -> None:
Expand Down
31 changes: 31 additions & 0 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Collection,
Dict,
FrozenSet,
Expand Down Expand Up @@ -1284,6 +1285,36 @@ async def is_locally_forgotten_room(self, room_id: str) -> bool:
# If any rows still exist it means someone has not forgotten this room yet
return not rows[0][0]

async def upsert_room_to_purge(
self,
room_id: str,
delete_id: Optional[str] = None,
timestamp: Optional[int] = None,
status: str = "waiting",
) -> None:
await self.db_pool.simple_upsert(
"rooms_to_purge",
{
"room_id": room_id,
"delete_id": delete_id,
},
{
"room_id": room_id,
"delete_id": delete_id,
"timestamp": timestamp,
"status": status,
},
desc="upsert_room_to_purge",
)

async def get_rooms_to_purge(self) -> List[Dict[str, Any]]:
return await self.db_pool.simple_select_list(
table="rooms_to_purge",
keyvalues={},
retcols=("room_id", "delete_id", "timestamp", "status"),
desc="rooms_to_purge_fetch",
)

async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]:
"""Get all rooms that the user has ever been in.
Expand Down
7 changes: 7 additions & 0 deletions synapse/storage/schema/main/delta/76/03_rooms_to_purge.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS rooms_to_purge(
room_id text NOT NULL,
delete_id text,
"timestamp" bigint,
"status" text NOT NULL,
UNIQUE(room_id, delete_id)
);
23 changes: 23 additions & 0 deletions tests/rest/admin/test_room.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from synapse.handlers.pagination import PaginationHandler, PurgeStatus
from synapse.rest.client import directory, events, login, room
from synapse.server import HomeServer
from synapse.types import UserID
from synapse.util import Clock
from synapse.util.stringutils import random_string

Expand Down Expand Up @@ -502,6 +503,9 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
)
self.url_status_by_delete_id = "/_synapse/admin/v2/rooms/delete_status/"

self.room_member_handler = hs.get_room_member_handler()
self.pagination_handler = hs.get_pagination_handler()

@parameterized.expand(
[
("DELETE", "/_synapse/admin/v2/rooms/%s"),
Expand Down Expand Up @@ -972,6 +976,25 @@ def test_shutdown_room_block_peek(self) -> None:
# Assert we can no longer peek into the room
self._assert_peek(self.room_id, expect_code=403)

@unittest.override_config({"redaction_retention_period": "0"})
def test_purge_forgotten_room(self) -> None:
# Create a test room
room_id = self.helper.create_room_as(
self.admin_user,
tok=self.admin_user_tok,
)

self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok)
self.get_success(
self.room_member_handler.forget(
UserID.from_string(self.admin_user), room_id
)
)

self.get_success(self.pagination_handler.purge_rooms())

self._is_purged(room_id)

def _is_blocked(self, room_id: str, expect: bool = True) -> None:
"""Assert that the room is blocked or not"""
d = self.store.is_room_blocked(room_id)
Expand Down
2 changes: 1 addition & 1 deletion tests/rest/admin/test_server_notice.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def test_send_server_notice_delete_room(self) -> None:
self.get_success(
self.room_shutdown_handler.shutdown_room(first_room_id, self.admin_user)
)
self.get_success(self.pagination_handler.purge_room(first_room_id))
self.get_success(self.pagination_handler.purge_room(first_room_id, "delete_id"))

# user is not member anymore
self._check_invite_and_join_status(self.other_user, 0, 0)
Expand Down
2 changes: 1 addition & 1 deletion tests/storage/test_purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def test_purge_room(self) -> None:

# Purge everything before this topological token
self.get_success(
self._storage_controllers.purge_events.purge_room(self.room_id)
self._storage_controllers.purge_events.purge_room(self.room_id, "delete_id")
)

# The events aren't found.
Expand Down

0 comments on commit 20b2945

Please sign in to comment.