Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow retrieving the relations of a redacted event #12130

Merged
merged 24 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
995c7b2
Requesting of relations of redacted events is allowed.
clokep Feb 9, 2022
1b182a0
Refactor to keep a map of input event ID -> event.
clokep Mar 1, 2022
a1fe62a
Include bundled aggregations for redacted events.
clokep Mar 1, 2022
0ed4bac
Add tests for thread relations.
clokep Mar 1, 2022
05ecc61
Newsfragment
clokep Mar 2, 2022
f9ed38c
Pass both event_id and event for caching.
clokep Mar 8, 2022
c97c4a8
Fix incorrect redaction parameters.
clokep Mar 8, 2022
16cf9c0
Properly invalidate edits when redacting a parent event.
clokep Mar 8, 2022
252c851
Merge remote-tracking branch 'origin/develop' into clokep/relation-re…
clokep Mar 8, 2022
6c58509
Allow for ignoring some arguments when caching.
clokep Mar 9, 2022
a4a1c31
Require caches to use kwargs.
clokep Mar 9, 2022
18805d0
Make a function private.
clokep Mar 9, 2022
020f0c6
Add a test for keyword arguments.
clokep Mar 9, 2022
f69b643
Keyword-only arguments are not supported.
clokep Mar 9, 2022
8fa311e
Newsfragment
clokep Mar 9, 2022
2d41a6c
Add a comment.
clokep Mar 9, 2022
4460e92
Properly handle passing a uncached parameter as an arg.
clokep Mar 9, 2022
02fc051
Remove debugging code.
clokep Mar 9, 2022
ad0a3a6
Merge remote-tracking branch 'origin/clokep/uncached-args' into cloke…
clokep Mar 9, 2022
ecbc2d9
Use uncached_args parameter instead of num_args.
clokep Mar 9, 2022
f15a031
Move cache invalidation to a single place.
clokep Mar 9, 2022
6b1cf03
Merge remote-tracking branch 'origin/develop' into clokep/relation-re…
clokep Mar 9, 2022
f885036
Newsfragment
clokep Mar 9, 2022
d2321b9
Avoid streaming caches which will be invalidated anyway.
clokep Mar 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12130.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug when redacting events with relations.
82 changes: 38 additions & 44 deletions synapse/rest/client/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from synapse.http.servlet import RestServlet, parse_integer, parse_string
from synapse.http.site import SynapseRequest
from synapse.rest.client._base import client_patterns
from synapse.storage.relations import AggregationPaginationToken, PaginationChunk
from synapse.storage.relations import AggregationPaginationToken
from synapse.types import JsonDict, StreamToken

if TYPE_CHECKING:
Expand Down Expand Up @@ -82,28 +82,25 @@ async def on_GET(
from_token_str = parse_string(request, "from")
to_token_str = parse_string(request, "to")

if event.internal_metadata.is_redacted():
# If the event is redacted, return an empty list of relations
pagination_chunk = PaginationChunk(chunk=[])
else:
# Return the relations
from_token = None
if from_token_str:
from_token = await StreamToken.from_string(self.store, from_token_str)
to_token = None
if to_token_str:
to_token = await StreamToken.from_string(self.store, to_token_str)

pagination_chunk = await self.store.get_relations_for_event(
event_id=parent_id,
room_id=room_id,
relation_type=relation_type,
event_type=event_type,
limit=limit,
direction=direction,
from_token=from_token,
to_token=to_token,
)
# Return the relations
from_token = None
if from_token_str:
from_token = await StreamToken.from_string(self.store, from_token_str)
to_token = None
if to_token_str:
to_token = await StreamToken.from_string(self.store, to_token_str)

pagination_chunk = await self.store.get_relations_for_event(
event_id=parent_id,
room_id=room_id,
relation_type=relation_type,
event_type=event_type,
limit=limit,
direction=direction,
from_token=from_token,
to_token=to_token,
event=event,
)

events = await self.store.get_events_as_list(
[c["event_id"] for c in pagination_chunk.chunk]
Expand Down Expand Up @@ -193,27 +190,23 @@ async def on_GET(
from_token_str = parse_string(request, "from")
to_token_str = parse_string(request, "to")

if event.internal_metadata.is_redacted():
# If the event is redacted, return an empty list of relations
pagination_chunk = PaginationChunk(chunk=[])
else:
# Return the relations
from_token = None
if from_token_str:
from_token = AggregationPaginationToken.from_string(from_token_str)

to_token = None
if to_token_str:
to_token = AggregationPaginationToken.from_string(to_token_str)

pagination_chunk = await self.store.get_aggregation_groups_for_event(
event_id=parent_id,
room_id=room_id,
event_type=event_type,
limit=limit,
from_token=from_token,
to_token=to_token,
)
# Return the relations
from_token = None
if from_token_str:
from_token = AggregationPaginationToken.from_string(from_token_str)

to_token = None
if to_token_str:
to_token = AggregationPaginationToken.from_string(to_token_str)

pagination_chunk = await self.store.get_aggregation_groups_for_event(
event_id=parent_id,
room_id=room_id,
event_type=event_type,
limit=limit,
from_token=from_token,
to_token=to_token,
)

return 200, await pagination_chunk.to_dict(self.store)

Expand Down Expand Up @@ -302,6 +295,7 @@ async def on_GET(
limit=limit,
from_token=from_token,
to_token=to_token,
event=event,
)

events = await self.store.get_events_as_list(
Expand Down
4 changes: 4 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ def _invalidate_caches_for_event(

if redacts:
self._invalidate_get_event_cache(redacts)
# Caches which might leak edits must be invalidated for the event being
# redacted.
self.get_relations_for_event.invalidate((redacts,))
self.get_applicable_edit.invalidate((redacts,))

if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
Expand Down
13 changes: 10 additions & 3 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1812,9 +1812,7 @@ def _handle_event_relations(
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))

if rel_type == RelationTypes.THREAD:
txn.call_after(
self.store.get_thread_summary.invalidate, (parent_id, event.room_id)
)
txn.call_after(self.store.get_thread_summary.invalidate, (parent_id,))
# It should be safe to only invalidate the cache if the user has not
# previously participated in the thread, but that's difficult (and
# potentially error-prone) so it is always invalidated.
Expand Down Expand Up @@ -1980,6 +1978,15 @@ def _handle_redact_relations(
txn, self.store.get_thread_participated, (redacted_relates_to,)
)

# Caches which might leak edits must be invalidated for the event being
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels duplicative to what happens in _invalidate_caches_for_event, but I think that doesn't run on the main process?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_invalidate_caches_for_event happens on the main process? Otherwise the event caches on the main process would be wrong?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any place that would be called on the main process, it only gets called via CacheInvalidationWorkerStore.process_replication_rows (either directly or via _process_event_stream_row).

I see some calls directly to _invalidate_get_event_cache though? These lines should actually be next to that, most likely...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a change in f15a031 which consolidates the logic a bit, which gives apples-to-apples comparison of what to clear when redacting an event.

# redacted.
self.store._invalidate_cache_and_stream(
txn, self.store.get_relations_for_event, (redacted_event_id,)
)
self.store._invalidate_cache_and_stream(
txn, self.store.get_applicable_edit, (redacted_event_id,)
)

self.db_pool.simple_delete_txn(
txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
)
Expand Down
61 changes: 33 additions & 28 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(

self._msc3440_enabled = hs.config.experimental.msc3440_enabled

@cached(tree=True)
@cached(num_args=9, tree=True)
clokep marked this conversation as resolved.
Show resolved Hide resolved
async def get_relations_for_event(
self,
event_id: str,
Expand All @@ -103,6 +103,7 @@ async def get_relations_for_event(
direction: str = "b",
from_token: Optional[StreamToken] = None,
to_token: Optional[StreamToken] = None,
event: Optional[EventBase] = None,
) -> PaginationChunk:
"""Get a list of relations for an event, ordered by topological ordering.

Expand All @@ -117,14 +118,20 @@ async def get_relations_for_event(
oldest first (`"f"`).
from_token: Fetch rows from the given token, or from the start if None.
to_token: Fetch rows up to the given token, or up to the end if None.
event: The matching EventBase to event_id. This *must* be provided.

Returns:
List of event IDs that match relations requested. The rows are of
the form `{"event_id": "..."}`.
"""
# We don't use `event_id`, its there so that we can cache based on
# it. The `event_id` must match the `event.event_id`.
assert event is not None
assert event.event_id == event_id

where_clause = ["relates_to_id = ?", "room_id = ?"]
where_args: List[Union[str, int]] = [event_id, room_id]
where_args: List[Union[str, int]] = [event.event_id, room_id]
is_redacted = event.internal_metadata.is_redacted()

if relation_type is not None:
where_clause.append("relation_type = ?")
Expand Down Expand Up @@ -157,7 +164,7 @@ async def get_relations_for_event(
order = "ASC"

sql = """
SELECT event_id, topological_ordering, stream_ordering
SELECT event_id, relation_type, topological_ordering, stream_ordering
FROM event_relations
INNER JOIN events USING (event_id)
WHERE %s
Expand All @@ -178,9 +185,12 @@ def _get_recent_references_for_event_txn(
last_stream_id = None
events = []
for row in txn:
events.append({"event_id": row[0]})
last_topo_id = row[1]
last_stream_id = row[2]
# Do not include edits for redacted events as they leak event
# content.
if not is_redacted or row[1] != RelationTypes.REPLACE:
events.append({"event_id": row[0]})
last_topo_id = row[2]
last_stream_id = row[3]

# If there are more events, generate the next pagination key.
next_token = None
Expand Down Expand Up @@ -776,7 +786,7 @@ async def _get_bundled_aggregation_for_event(
)

references = await self.get_relations_for_event(
event_id, room_id, RelationTypes.REFERENCE, direction="f"
event_id, room_id, RelationTypes.REFERENCE, direction="f", event=event
)
if references.chunk:
aggregations.references = await references.to_dict(cast("DataStore", self))
Expand All @@ -797,41 +807,36 @@ async def get_bundled_aggregations(
A map of event ID to the bundled aggregation for the event. Not all
events may have bundled aggregations in the results.
"""
# The already processed event IDs. Tracked separately from the result
# since the result omits events which do not have bundled aggregations.
seen_event_ids = set()

# State events and redacted events do not get bundled aggregations.
events = [
event
for event in events
if not event.is_state() and not event.internal_metadata.is_redacted()
]
# De-duplicate events by ID to handle the same event requested multiple times.
#
# State events do not get bundled aggregations.
events_by_id = {
event.event_id: event for event in events if not event.is_state()
}

# event ID -> bundled aggregation in non-serialized form.
results: Dict[str, BundledAggregations] = {}

# Fetch other relations per event.
for event in events:
# De-duplicate events by ID to handle the same event requested multiple
# times. The caches that _get_bundled_aggregation_for_event use should
# capture this, but best to reduce work.
if event.event_id in seen_event_ids:
continue
seen_event_ids.add(event.event_id)

for event in events_by_id.values():
event_result = await self._get_bundled_aggregation_for_event(event, user_id)
if event_result:
results[event.event_id] = event_result

# Fetch any edits.
edits = await self._get_applicable_edits(seen_event_ids)
# Fetch any edits (but not for redacted events).
edits = await self._get_applicable_edits(
[
event_id
for event_id, event in events_by_id.items()
if not event.internal_metadata.is_redacted()
]
)
for event_id, edit in edits.items():
results.setdefault(event_id, BundledAggregations()).replace = edit

# Fetch thread summaries.
if self._msc3440_enabled:
summaries = await self._get_thread_summaries(seen_event_ids)
summaries = await self._get_thread_summaries(events_by_id.keys())
# Only fetch participated for a limited selection based on what had
# summaries.
participated = await self._get_threads_participated(
Expand Down
45 changes: 40 additions & 5 deletions tests/rest/client/test_relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1475,12 +1475,13 @@ def test_redact_parent_edit(self) -> None:
self.assertEqual(relations, {})

def test_redact_parent_annotation(self) -> None:
"""Test that annotations of an event are redacted when the original event
"""Test that annotations of an event are viewable when the original event
is redacted.
"""
# Add a relation
channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="👍")
self.assertEqual(200, channel.code, channel.json_body)
related_event_id = channel.json_body["event_id"]

# The relations should exist.
event_ids, relations = self._make_relation_requests()
Expand All @@ -1494,11 +1495,45 @@ def test_redact_parent_annotation(self) -> None:
# Redact the original event.
self._redact(self.parent_id)

# The relations are not returned.
# The relations are returned.
event_ids, relations = self._make_relation_requests()
self.assertEqual(event_ids, [])
self.assertEqual(relations, {})
self.assertEquals(event_ids, [related_event_id])
self.assertEquals(
relations["m.annotation"],
{"chunk": [{"type": "m.reaction", "key": "👍", "count": 1}]},
)

# There's nothing to aggregate.
chunk = self._get_aggregations()
self.assertEqual(chunk, [])
self.assertEqual(chunk, [{"count": 1, "key": "👍", "type": "m.reaction"}])

@unittest.override_config({"experimental_features": {"msc3440_enabled": True}})
def test_redact_parent_thread(self) -> None:
"""
Test that thread replies are still available when the root event is redacted.
"""
channel = self._send_relation(
RelationTypes.THREAD,
EventTypes.Message,
content={"body": "reply 1", "msgtype": "m.text"},
)
self.assertEqual(200, channel.code, channel.json_body)
related_event_id = channel.json_body["event_id"]

# Redact one of the reactions.
self._redact(self.parent_id)

# The unredacted relation should still exist.
event_ids, relations = self._make_relation_requests()
self.assertEquals(len(event_ids), 1)
self.assertDictContainsSubset(
{
"count": 1,
"current_user_participated": True,
},
relations[RelationTypes.THREAD],
)
self.assertEqual(
relations[RelationTypes.THREAD]["latest_event"]["event_id"],
related_event_id,
)