Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

skip some dict munging in event persistence #11560

Merged
merged 1 commit into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11560.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Minor efficiency improvements in event persistence.
59 changes: 54 additions & 5 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,9 @@ async def simple_insert_many(
) -> None:
"""Executes an INSERT query on the named table.

The input is given as a list of dicts, with one dict per row.
Generally simple_insert_many_values should be preferred for new code.

Args:
table: string giving the table name
values: dict of new column names and values for them
Expand All @@ -909,6 +912,9 @@ def simple_insert_many_txn(
) -> None:
"""Executes an INSERT query on the named table.

The input is given as a list of dicts, with one dict per row.
Generally simple_insert_many_values_txn should be preferred for new code.

Args:
txn: The transaction to use.
table: string giving the table name
Expand All @@ -933,23 +939,66 @@ def simple_insert_many_txn(
if k != keys[0]:
raise RuntimeError("All items must have the same keys")

return DatabasePool.simple_insert_many_values_txn(txn, table, keys[0], vals)

async def simple_insert_many_values(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incidentally, ideas for better names for this thing are welcome.

self,
table: str,
keys: Collection[str],
values: Iterable[Iterable[Any]],
desc: str,
) -> None:
"""Executes an INSERT query on the named table.

The input is given as a list of rows, where each row is a list of values.
(Actually any iterable is fine.)

Args:
table: string giving the table name
keys: list of column names
values: for each row, a list of values in the same order as `keys`
desc: description of the transaction, for logging and metrics
"""
await self.runInteraction(
desc, self.simple_insert_many_values_txn, table, keys, values
)

@staticmethod
def simple_insert_many_values_txn(
txn: LoggingTransaction,
table: str,
keys: Collection[str],
values: Iterable[Iterable[Any]],
) -> None:
"""Executes an INSERT query on the named table.

The input is given as a list of rows, where each row is a list of values.
(Actually any iterable is fine.)

Args:
txn: The transaction to use.
table: string giving the table name
keys: list of column names
values: for each row, a list of values in the same order as `keys`
"""

if isinstance(txn.database_engine, PostgresEngine):
# We use `execute_values` as it can be a lot faster than `execute_batch`,
# but it's only available on postgres.
sql = "INSERT INTO %s (%s) VALUES ?" % (
table,
", ".join(k for k in keys[0]),
", ".join(k for k in keys),
)

txn.execute_values(sql, vals, fetch=False)
txn.execute_values(sql, values, fetch=False)
else:
sql = "INSERT INTO %s (%s) VALUES(%s)" % (
table,
", ".join(k for k in keys[0]),
", ".join("?" for _ in keys[0]),
", ".join(k for k in keys),
", ".join("?" for _ in keys),
)

txn.execute_batch(sql, vals)
txn.execute_batch(sql, values)

async def simple_upsert(
self,
Expand Down
114 changes: 59 additions & 55 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import (
TYPE_CHECKING,
Any,
Collection,
Dict,
Generator,
Iterable,
Expand Down Expand Up @@ -1319,14 +1320,13 @@ def _update_outliers_txn(self, txn, events_and_contexts):

return [ec for ec in events_and_contexts if ec[0] not in to_remove]

def _store_event_txn(self, txn, events_and_contexts):
def _store_event_txn(
self,
txn: LoggingTransaction,
events_and_contexts: Collection[Tuple[EventBase, EventContext]],
) -> None:
"""Insert new events into the event, event_json, redaction and
state_events tables.

Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
"""

if not events_and_contexts:
Expand All @@ -1339,46 +1339,58 @@ def event_dict(event):
d.pop("redacted_because", None)
return d

self.db_pool.simple_insert_many_txn(
self.db_pool.simple_insert_many_values_txn(
txn,
table="event_json",
values=[
{
"event_id": event.event_id,
"room_id": event.room_id,
"internal_metadata": json_encoder.encode(
event.internal_metadata.get_dict()
),
"json": json_encoder.encode(event_dict(event)),
"format_version": event.format_version,
}
keys=("event_id", "room_id", "internal_metadata", "json", "format_version"),
values=(
(
event.event_id,
event.room_id,
json_encoder.encode(event.internal_metadata.get_dict()),
json_encoder.encode(event_dict(event)),
event.format_version,
)
for event, _ in events_and_contexts
],
),
)

self.db_pool.simple_insert_many_txn(
self.db_pool.simple_insert_many_values_txn(
txn,
table="events",
values=[
{
"instance_name": self._instance_name,
"stream_ordering": event.internal_metadata.stream_ordering,
"topological_ordering": event.depth,
"depth": event.depth,
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"processed": True,
"outlier": event.internal_metadata.is_outlier(),
"origin_server_ts": int(event.origin_server_ts),
"received_ts": self._clock.time_msec(),
"sender": event.sender,
"contains_url": (
"url" in event.content and isinstance(event.content["url"], str)
),
}
keys=(
"instance_name",
"stream_ordering",
"topological_ordering",
"depth",
"event_id",
"room_id",
"type",
"processed",
"outlier",
"origin_server_ts",
"received_ts",
"sender",
"contains_url",
),
values=(
(
self._instance_name,
event.internal_metadata.stream_ordering,
event.depth, # topological_ordering
event.depth, # depth
event.event_id,
event.room_id,
event.type,
True, # processed
event.internal_metadata.is_outlier(),
int(event.origin_server_ts),
self._clock.time_msec(),
event.sender,
"url" in event.content and isinstance(event.content["url"], str),
)
for event, _ in events_and_contexts
],
),
)

# If we're persisting an unredacted event we go and ensure
Expand All @@ -1397,23 +1409,15 @@ def event_dict(event):
)
txn.execute(sql + clause, [False] + args)

state_events_and_contexts = [
ec for ec in events_and_contexts if ec[0].is_state()
]

state_values = []
for event, _ in state_events_and_contexts:
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
}

state_values.append(vals)

self.db_pool.simple_insert_many_txn(
txn, table="state_events", values=state_values
self.db_pool.simple_insert_many_values_txn(
txn,
table="state_events",
keys=("event_id", "room_id", "type", "state_key"),
values=(
(event.event_id, event.room_id, event.type, event.state_key)
for event, _ in events_and_contexts
if event.is_state()
),
)

def _store_rejected_events_txn(self, txn, events_and_contexts):
Expand Down