Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Compute new chunks for new events #3240

Merged
merged 8 commits into from
May 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def __init__(self, db_conn, hs):
self._group_updates_id_gen = StreamIdGenerator(
db_conn, "local_group_updates", "stream_id",
)
self._chunk_id_gen = IdGenerator(db_conn, "events", "chunk_id")

if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = StreamIdGenerator(
Expand Down
215 changes: 200 additions & 15 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from twisted.internet import defer

from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
from synapse.util.async import ObservableDeferred
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import (
Expand Down Expand Up @@ -1019,13 +1020,19 @@ def _update_outliers_txn(self, txn, events_and_contexts):
}
)

sql = (
"UPDATE events SET outlier = ?"
" WHERE event_id = ?"
chunk_id, _ = self._insert_into_chunk_txn(
txn, event.room_id, event.event_id,
[eid for eid, _ in event.prev_events],
)
txn.execute(
sql,
(False, event.event_id,)

self._simple_update_txn(
txn,
table="events",
keyvalues={"event_id": event.event_id},
updatevalues={
"outlier": False,
"chunk_id": chunk_id,
},
)

# Update the event_backward_extremities table now that this
Expand Down Expand Up @@ -1108,12 +1115,21 @@ def event_dict(event):
],
)

self._simple_insert_many_txn(
txn,
table="events",
values=[
{
for event, _ in events_and_contexts:
if event.internal_metadata.is_outlier():
chunk_id, _topo = None, 0
else:
chunk_id, _topo = self._insert_into_chunk_txn(
txn, event.room_id, event.event_id,
[eid for eid, _ in event.prev_events],
)

self._simple_insert_txn(
txn,
table="events",
values={
"stream_ordering": event.internal_metadata.stream_ordering,
"chunk_id": chunk_id,
"topological_ordering": event.depth,
"depth": event.depth,
"event_id": event.event_id,
Expand All @@ -1129,10 +1145,8 @@ def event_dict(event):
"url" in event.content
and isinstance(event.content["url"], basestring)
),
}
for event, _ in events_and_contexts
],
)
},
)

def _store_rejected_events_txn(self, txn, events_and_contexts):
"""Add rows to the 'rejections' table for received events which were
Expand Down Expand Up @@ -1344,6 +1358,177 @@ def _store_redaction(self, txn, event):
(event.event_id, event.redacts)
)

def _insert_into_chunk_txn(self, txn, room_id, event_id, prev_event_ids):
"""Computes the chunk ID and topological ordering for an event and
handles updating chunk_graph table.

Args:
txn,
room_id (str)
event_id (str)
prev_event_ids (list[str])

Returns:
tuple[int, int]: Returns the chunk_id, topological_ordering for
the event
"""

# We calculate the chunk for an event using the following rules:
#
# 1. If all prev events have the same chunk ID then use that chunk ID
# 2. If we have none of the prev events but do have events pointing to
# the event, then we use their chunk ID if:
# - They're all in the same chunk, and
# - All their prev events match the events being inserted
# 3. Otherwise, create a new chunk and use that

# Set of chunks that the event refers to. Includes None if there were
# prev events that we don't have (or don't have a chunk for)
prev_chunk_ids = set()

for eid in prev_event_ids:
chunk_id = self._simple_select_one_onecol_txn(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a general comment in this function, all these db hits look sloooow. do you plan to go via caches at some point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queries should all hit indices and so should be fast, though yeah, the number of them about does mean even the RTT will start adding up. I'm not sure how much caches will help tbh, as for most cases what we fetch will change each time (though we could possible prefill the caches).

Really, I'd quite like to split a lot of the persist_event logic out into per room logic, so that if something goes slow for a particular room it won't block events in other rooms being persisted. I.e., when persisting an event it first gets added to a per room queue to have chunk/current_state/etc calculated, and then that result gets fed into the persist event queue. Maybe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup I'm worried about the RTT, and expecting that we ought to have prefilled caches in the common case.

txn,
table="events",
keyvalues={"event_id": eid},
retcol="chunk_id",
allow_none=True,
)

prev_chunk_ids.add(chunk_id)

forward_events = self._simple_select_onecol_txn(
txn,
table="event_edges",
keyvalues={
"prev_event_id": event_id,
"is_state": False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there isn't much of a reason to leave the prev_state out of this, since its simply an extra constraint. Though we no longer add prev_state stuff and in general I like that we are consistent with always using is_state: False. (It would in fact be nice to delete that entire column and all entries where it is true)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hrm. I guess I misunderstood what is_state meant. I assumed it meant that one or other of the events was a state event. But actually it means the edge is a prev_state edge rather than a prev_event edge?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed

},
retcol="event_id",
)

# Set of chunks that refer to this event.
forward_chunk_ids = set()

# All the prev_events of events in `forward_events`.
# Note that this will include the current event_id.
sibling_events = set()
for eid in forward_events:
chunk_id = self._simple_select_one_onecol_txn(
txn,
table="events",
keyvalues={"event_id": eid},
retcol="chunk_id",
allow_none=True,
)

if chunk_id is not None:
# chunk_id can be None if it's an outlier
forward_chunk_ids.add(chunk_id)

pes = self._simple_select_onecol_txn(
txn,
table="event_edges",
keyvalues={
"event_id": eid,
"is_state": False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, why

},
retcol="prev_event_id",
)

sibling_events.update(pes)

table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
)

# If there is only one previous chunk (and that isn't None), then this
# satisfies condition one.
if len(prev_chunk_ids) == 1 and None not in prev_chunk_ids:
chunk_id = list(prev_chunk_ids)[0]

# This event is being inserted at the end of the chunk
new_topo = self._simple_select_one_onecol_txn(
txn,
table="events",
keyvalues={
"room_id": room_id,
"chunk_id": chunk_id,
},
retcol="MAX(topological_ordering)",
)
new_topo += 1

# If there is only one forward chunk and only one sibling event (which
# would be the given event), then this satisfies condition two.
elif len(forward_chunk_ids) == 1 and len(sibling_events) == 1:
chunk_id = list(forward_chunk_ids)[0]

# This event is being inserted at the start of the chunk
new_topo = self._simple_select_one_onecol_txn(
txn,
table="events",
keyvalues={
"room_id": room_id,
"chunk_id": chunk_id,
},
retcol="MIN(topological_ordering)",
)
new_topo -= 1
else:
chunk_id = self._chunk_id_gen.get_next()
new_topo = 0

# We've generated a new chunk, so we have to tell the
# ChunkDBOrderedListStore about that.
table.add_node(chunk_id)

# We need to now update the database with any new edges between chunks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worth trying to optimise this, depending on which path we've taken above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I see how?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

example 1: if it's a new chunk, it's not going to have any existing edges
example 2: if we've established that there is exactly one prev_chunk_id, then we know that we do not need to add any new prev_chunk edges.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point well made

current_prev_ids = self._simple_select_onecol_txn(
txn,
table="chunk_graph",
keyvalues={
"chunk_id": chunk_id,
},
retcol="prev_id",
)

current_forward_ids = self._simple_select_onecol_txn(
txn,
table="chunk_graph",
keyvalues={
"prev_id": chunk_id,
},
retcol="chunk_id",
)

for pid in prev_chunk_ids:
if pid is not None and pid not in current_prev_ids and pid != chunk_id:
# Note that the edge direction is reversed than what you might
# expect. See ChunkDBOrderedListStore for more details.
table.add_edge(pid, chunk_id)

for fid in forward_chunk_ids:
# Note that the edge direction is reversed than what you might
# expect. See ChunkDBOrderedListStore for more details.
if fid not in current_forward_ids and fid != chunk_id:
table.add_edge(chunk_id, fid)

# We now need to update the backwards extremities for the chunks.

txn.executemany("""
INSERT INTO chunk_backwards_extremities (chunk_id, event_id)
SELECT ?, ? WHERE ? NOT IN (SELECT event_id FROM events)
""", [(chunk_id, eid, eid) for eid in prev_event_ids])

self._simple_delete_txn(
txn,
table="chunk_backwards_extremities",
keyvalues={"event_id": event_id},
)

return chunk_id, new_topo

@defer.inlineCallbacks
def have_events_in_timeline(self, event_ids):
"""Given a list of event ids, check if we have already processed and
Expand Down