-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Compute new chunks for new events #3240
Changes from all commits
13dbcaf
6c1d13a
1810cc3
1cdd0d3
ecd4931
f687d8f
9e1d3f1
3847313
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
from twisted.internet import defer | ||
|
||
from synapse.storage.events_worker import EventsWorkerStore | ||
from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore | ||
from synapse.util.async import ObservableDeferred | ||
from synapse.util.frozenutils import frozendict_json_encoder | ||
from synapse.util.logcontext import ( | ||
|
@@ -1019,13 +1020,19 @@ def _update_outliers_txn(self, txn, events_and_contexts): | |
} | ||
) | ||
|
||
sql = ( | ||
"UPDATE events SET outlier = ?" | ||
" WHERE event_id = ?" | ||
chunk_id, _ = self._insert_into_chunk_txn( | ||
txn, event.room_id, event.event_id, | ||
[eid for eid, _ in event.prev_events], | ||
) | ||
txn.execute( | ||
sql, | ||
(False, event.event_id,) | ||
|
||
self._simple_update_txn( | ||
txn, | ||
table="events", | ||
keyvalues={"event_id": event.event_id}, | ||
updatevalues={ | ||
"outlier": False, | ||
"chunk_id": chunk_id, | ||
}, | ||
) | ||
|
||
# Update the event_backward_extremities table now that this | ||
|
@@ -1108,12 +1115,21 @@ def event_dict(event): | |
], | ||
) | ||
|
||
self._simple_insert_many_txn( | ||
txn, | ||
table="events", | ||
values=[ | ||
{ | ||
for event, _ in events_and_contexts: | ||
if event.internal_metadata.is_outlier(): | ||
chunk_id, _topo = None, 0 | ||
else: | ||
chunk_id, _topo = self._insert_into_chunk_txn( | ||
txn, event.room_id, event.event_id, | ||
[eid for eid, _ in event.prev_events], | ||
) | ||
|
||
self._simple_insert_txn( | ||
txn, | ||
table="events", | ||
values={ | ||
"stream_ordering": event.internal_metadata.stream_ordering, | ||
"chunk_id": chunk_id, | ||
"topological_ordering": event.depth, | ||
"depth": event.depth, | ||
"event_id": event.event_id, | ||
|
@@ -1129,10 +1145,8 @@ def event_dict(event): | |
"url" in event.content | ||
and isinstance(event.content["url"], basestring) | ||
), | ||
} | ||
for event, _ in events_and_contexts | ||
], | ||
) | ||
}, | ||
) | ||
|
||
def _store_rejected_events_txn(self, txn, events_and_contexts): | ||
"""Add rows to the 'rejections' table for received events which were | ||
|
@@ -1344,6 +1358,177 @@ def _store_redaction(self, txn, event): | |
(event.event_id, event.redacts) | ||
) | ||
|
||
def _insert_into_chunk_txn(self, txn, room_id, event_id, prev_event_ids): | ||
"""Computes the chunk ID and topological ordering for an event and | ||
handles updating chunk_graph table. | ||
|
||
Args: | ||
txn, | ||
room_id (str) | ||
event_id (str) | ||
prev_event_ids (list[str]) | ||
|
||
Returns: | ||
tuple[int, int]: Returns the chunk_id, topological_ordering for | ||
the event | ||
""" | ||
|
||
# We calculate the chunk for an event using the following rules: | ||
# | ||
# 1. If all prev events have the same chunk ID then use that chunk ID | ||
# 2. If we have none of the prev events but do have events pointing to | ||
# the event, then we use their chunk ID if: | ||
# - They're all in the same chunk, and | ||
# - All their prev events match the events being inserted | ||
# 3. Otherwise, create a new chunk and use that | ||
|
||
# Set of chunks that the event refers to. Includes None if there were | ||
# prev events that we don't have (or don't have a chunk for) | ||
prev_chunk_ids = set() | ||
|
||
for eid in prev_event_ids: | ||
chunk_id = self._simple_select_one_onecol_txn( | ||
txn, | ||
table="events", | ||
keyvalues={"event_id": eid}, | ||
retcol="chunk_id", | ||
allow_none=True, | ||
) | ||
|
||
prev_chunk_ids.add(chunk_id) | ||
|
||
forward_events = self._simple_select_onecol_txn( | ||
txn, | ||
table="event_edges", | ||
keyvalues={ | ||
"prev_event_id": event_id, | ||
"is_state": False, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess there isn't much of a reason to leave the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hrm. I guess I misunderstood what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed |
||
}, | ||
retcol="event_id", | ||
) | ||
|
||
# Set of chunks that refer to this event. | ||
forward_chunk_ids = set() | ||
|
||
# All the prev_events of events in `forward_events`. | ||
# Note that this will include the current event_id. | ||
sibling_events = set() | ||
for eid in forward_events: | ||
chunk_id = self._simple_select_one_onecol_txn( | ||
txn, | ||
table="events", | ||
keyvalues={"event_id": eid}, | ||
retcol="chunk_id", | ||
allow_none=True, | ||
) | ||
|
||
if chunk_id is not None: | ||
# chunk_id can be None if it's an outlier | ||
forward_chunk_ids.add(chunk_id) | ||
|
||
pes = self._simple_select_onecol_txn( | ||
txn, | ||
table="event_edges", | ||
keyvalues={ | ||
"event_id": eid, | ||
"is_state": False, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. again, why |
||
}, | ||
retcol="prev_event_id", | ||
) | ||
|
||
sibling_events.update(pes) | ||
|
||
table = ChunkDBOrderedListStore( | ||
txn, room_id, self.clock, | ||
) | ||
|
||
# If there is only one previous chunk (and that isn't None), then this | ||
# satisfies condition one. | ||
if len(prev_chunk_ids) == 1 and None not in prev_chunk_ids: | ||
chunk_id = list(prev_chunk_ids)[0] | ||
|
||
# This event is being inserted at the end of the chunk | ||
new_topo = self._simple_select_one_onecol_txn( | ||
txn, | ||
table="events", | ||
keyvalues={ | ||
"room_id": room_id, | ||
"chunk_id": chunk_id, | ||
}, | ||
retcol="MAX(topological_ordering)", | ||
) | ||
new_topo += 1 | ||
|
||
# If there is only one forward chunk and only one sibling event (which | ||
# would be the given event), then this satisfies condition two. | ||
elif len(forward_chunk_ids) == 1 and len(sibling_events) == 1: | ||
chunk_id = list(forward_chunk_ids)[0] | ||
|
||
# This event is being inserted at the start of the chunk | ||
new_topo = self._simple_select_one_onecol_txn( | ||
txn, | ||
table="events", | ||
keyvalues={ | ||
"room_id": room_id, | ||
"chunk_id": chunk_id, | ||
}, | ||
retcol="MIN(topological_ordering)", | ||
) | ||
new_topo -= 1 | ||
else: | ||
chunk_id = self._chunk_id_gen.get_next() | ||
new_topo = 0 | ||
|
||
# We've generated a new chunk, so we have to tell the | ||
# ChunkDBOrderedListStore about that. | ||
table.add_node(chunk_id) | ||
|
||
# We need to now update the database with any new edges between chunks | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it worth trying to optimise this, depending on which path we've taken above? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I see how? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. example 1: if it's a new chunk, it's not going to have any existing edges There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, good point well made |
||
current_prev_ids = self._simple_select_onecol_txn( | ||
txn, | ||
table="chunk_graph", | ||
keyvalues={ | ||
"chunk_id": chunk_id, | ||
}, | ||
retcol="prev_id", | ||
) | ||
|
||
current_forward_ids = self._simple_select_onecol_txn( | ||
txn, | ||
table="chunk_graph", | ||
keyvalues={ | ||
"prev_id": chunk_id, | ||
}, | ||
retcol="chunk_id", | ||
) | ||
|
||
for pid in prev_chunk_ids: | ||
if pid is not None and pid not in current_prev_ids and pid != chunk_id: | ||
# Note that the edge direction is reversed than what you might | ||
# expect. See ChunkDBOrderedListStore for more details. | ||
table.add_edge(pid, chunk_id) | ||
|
||
for fid in forward_chunk_ids: | ||
# Note that the edge direction is reversed than what you might | ||
# expect. See ChunkDBOrderedListStore for more details. | ||
if fid not in current_forward_ids and fid != chunk_id: | ||
table.add_edge(chunk_id, fid) | ||
|
||
# We now need to update the backwards extremities for the chunks. | ||
|
||
txn.executemany(""" | ||
INSERT INTO chunk_backwards_extremities (chunk_id, event_id) | ||
SELECT ?, ? WHERE ? NOT IN (SELECT event_id FROM events) | ||
""", [(chunk_id, eid, eid) for eid in prev_event_ids]) | ||
|
||
self._simple_delete_txn( | ||
txn, | ||
table="chunk_backwards_extremities", | ||
keyvalues={"event_id": event_id}, | ||
) | ||
|
||
return chunk_id, new_topo | ||
|
||
@defer.inlineCallbacks | ||
def have_events_in_timeline(self, event_ids): | ||
"""Given a list of event ids, check if we have already processed and | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as a general comment in this function, all these db hits look sloooow. do you plan to go via caches at some point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The queries should all hit indices and so should be fast, though yeah, the number of them about does mean even the RTT will start adding up. I'm not sure how much caches will help tbh, as for most cases what we fetch will change each time (though we could possible prefill the caches).
Really, I'd quite like to split a lot of the persist_event logic out into per room logic, so that if something goes slow for a particular room it won't block events in other rooms being persisted. I.e., when persisting an event it first gets added to a per room queue to have chunk/current_state/etc calculated, and then that result gets fed into the persist event queue. Maybe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup I'm worried about the RTT, and expecting that we ought to have prefilled caches in the common case.