Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add initial power level event to batch of bulk persisted events when creating a new room. #14228

Merged
merged 16 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14228.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add initial power level event to batch of bulk persisted events when creating a new room.
4 changes: 3 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,9 @@ async def on_invite_request(

context = EventContext.for_outlier(self._storage_controllers)

await self._bulk_push_rule_evaluator.action_for_event_by_user(event, context)
await self._bulk_push_rule_evaluator.action_for_events_by_user(
[(event, context)]
)
try:
await self._federation_event_handler.persist_events_and_notify(
event.room_id, [(event, context)]
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2171,8 +2171,8 @@ async def _run_push_actions_and_persist_event(
min_depth,
)
else:
await self._bulk_push_rule_evaluator.action_for_event_by_user(
event, context
await self._bulk_push_rule_evaluator.action_for_events_by_user(
[(event, context)]
)

try:
Expand Down
14 changes: 3 additions & 11 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1433,17 +1433,9 @@ async def _persist_events(
a room that has been un-partial stated.
"""

for event, context in events_and_context:
# Skip push notification actions for historical messages
# because we don't want to notify people about old history back in time.
# The historical messages also do not have the proper `context.current_state_ids`
# and `state_groups` because they have `prev_events` that aren't persisted yet
# (historical messages persisted in reverse-chronological order).
if not event.internal_metadata.is_historical():
with opentracing.start_active_span("calculate_push_actions"):
await self._bulk_push_rule_evaluator.action_for_event_by_user(
event, context
)
await self._bulk_push_rule_evaluator.action_for_events_by_user(
events_and_context
)

try:
# If we're a worker we need to hit out to the master.
Expand Down
39 changes: 10 additions & 29 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,9 +1055,6 @@ async def _send_events_for_new_room(
event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}
depth = 1

# the last event sent/persisted to the db
last_sent_event_id: Optional[str] = None

# the most recently created event
prev_event: List[str] = []
# a map of event types, state keys -> event_ids. We collect these mappings this as events are
Expand Down Expand Up @@ -1102,26 +1099,6 @@ async def create_event(

return new_event, new_context

async def send(
event: EventBase,
context: synapse.events.snapshot.EventContext,
creator: Requester,
) -> int:
nonlocal last_sent_event_id

ev = await self.event_creation_handler.handle_new_client_event(
requester=creator,
events_and_context=[(event, context)],
ratelimit=False,
ignore_shadow_ban=True,
)

last_sent_event_id = ev.event_id

# we know it was persisted, so must have a stream ordering
assert ev.internal_metadata.stream_ordering
return ev.internal_metadata.stream_ordering

try:
config = self._presets_dict[preset_config]
except KeyError:
Expand All @@ -1135,10 +1112,14 @@ async def send(
)

logger.debug("Sending %s in new room", EventTypes.Member)
await send(creation_event, creation_context, creator)
ev = await self.event_creation_handler.handle_new_client_event(
requester=creator,
events_and_context=[(creation_event, creation_context)],
ratelimit=False,
ignore_shadow_ban=True,
)
last_sent_event_id = ev.event_id

# Room create event must exist at this point
assert last_sent_event_id is not None
member_event_id, _ = await self.room_member_handler.update_membership(
creator,
creator.user,
Expand All @@ -1157,6 +1138,7 @@ async def send(
depth += 1
state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id

events_to_send = []
# We treat the power levels override specially as this needs to be one
# of the first events that get sent into a room.
pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None)
Expand All @@ -1165,7 +1147,7 @@ async def send(
EventTypes.PowerLevels, pl_content, False
)
current_state_group = power_context._state_group
await send(power_event, power_context, creator)
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
events_to_send.append((power_event, power_context))
else:
power_level_content: JsonDict = {
"users": {creator_id: 100},
Expand Down Expand Up @@ -1214,9 +1196,8 @@ async def send(
False,
)
current_state_group = pl_context._state_group
await send(pl_event, pl_context, creator)
events_to_send.append((pl_event, pl_context))

events_to_send = []
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
room_alias_event, room_alias_context = await create_event(
EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True
Expand Down
74 changes: 61 additions & 13 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,21 @@ async def _get_rules_for_event(
return rules_by_user

async def _get_power_levels_and_sender_level(
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
self, event: EventBase, context: EventContext
self,
event: EventBase,
context: EventContext,
event_id_to_event: Mapping[str, EventBase],
) -> Tuple[dict, Optional[int]]:
"""
Given an event and an event context, get the power level event relevant to the event
and the power level of the sender of the event.
Args:
event: event to check
context: context of event to check
event_id_to_event: a mapping of event_id to event for a set of events being
batch persisted. This is needed as the sought-after power level event may
be in this batch rather than the DB
"""
# There are no power levels and sender levels possible to get from outlier
if event.internal_metadata.is_outlier():
return {}, None
Expand All @@ -177,15 +190,26 @@ async def _get_power_levels_and_sender_level(
)
pl_event_id = prev_state_ids.get(POWER_KEY)

# fastpath: if there's a power level event, that's all we need, and
# not having a power level event is an extreme edge case
if pl_event_id:
# fastpath: if there's a power level event, that's all we need, and
# not having a power level event is an extreme edge case
auth_events = {POWER_KEY: await self.store.get_event(pl_event_id)}
# Get the power level event from the batch, or fall back to the database.
pl_event = event_id_to_event.get(pl_event_id)
if pl_event:
auth_events = {POWER_KEY: pl_event}
else:
auth_events = {POWER_KEY: await self.store.get_event(pl_event_id)}
else:
auth_events_ids = self._event_auth_handler.compute_auth_events(
event, prev_state_ids, for_verification=False
)
auth_events_dict = await self.store.get_events(auth_events_ids)
# Some needed auth events might be in the batch, combine them with those
# fetched from the database.
for auth_event_id in auth_events_ids:
auth_event = event_id_to_event.get(auth_event_id)
if auth_event:
auth_events_dict[auth_event_id] = auth_event
auth_events = {(e.type, e.state_key): e for e in auth_events_dict.values()}

sender_level = get_user_power_level(event.sender, auth_events)
Expand All @@ -194,16 +218,38 @@ async def _get_power_levels_and_sender_level(

return pl_event.content if pl_event else {}, sender_level

@measure_func("action_for_event_by_user")
async def action_for_event_by_user(
self, event: EventBase, context: EventContext
async def action_for_events_by_user(
self, events_and_context: List[Tuple[EventBase, EventContext]]
) -> None:
"""Given an event and context, evaluate the push rules, check if the message
should increment the unread count, and insert the results into the
event_push_actions_staging table.
"""Given a list of events and their associated contexts, evaluate the push rules
for each event, check if the message should increment the unread count, and
insert the results into the event_push_actions_staging table.
"""
if not event.internal_metadata.is_notifiable():
# Push rules for events that aren't notifiable can't be processed by this
# For batched events the power level events may not have been persisted yet,
# so we pass in the batched events. Thus if the event cannot be found in the
# database we can check in the batch.
event_id_to_event = {e.event_id: e for e, _ in events_and_context}
for event, context in events_and_context:
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
await self._action_for_event_by_user(event, context, event_id_to_event)

@measure_func("action_for_event_by_user")
async def _action_for_event_by_user(
self,
event: EventBase,
context: EventContext,
event_id_to_event: Mapping[str, EventBase],
) -> None:

if (
not event.internal_metadata.is_notifiable()
or event.internal_metadata.is_historical()
):
# Push rules for events that aren't notifiable can't be processed by this and
# we want to skip push notification actions for historical messages
# because we don't want to notify people about old history back in time.
# The historical messages also do not have the proper `context.current_state_ids`
# and `state_groups` because they have `prev_events` that aren't persisted yet
# (historical messages persisted in reverse-chronological order).
return

# Disable counting as unread unless the experimental configuration is
Expand All @@ -223,7 +269,9 @@ async def action_for_event_by_user(
(
power_levels,
sender_power_level,
) = await self._get_power_levels_and_sender_level(event, context)
) = await self._get_power_levels_and_sender_level(
event, context, event_id_to_event
)

# Find the event's thread ID.
relation = relation_from_event(event)
Expand Down
2 changes: 1 addition & 1 deletion tests/push/test_bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ def test_action_for_event_by_user_handles_noninteger_power_levels(self) -> None:

bulk_evaluator = BulkPushRuleEvaluator(self.hs)
# should not raise
self.get_success(bulk_evaluator.action_for_event_by_user(event, context))
self.get_success(bulk_evaluator.action_for_events_by_user([(event, context)]))
2 changes: 1 addition & 1 deletion tests/replication/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def make_worker_hs(
config=worker_hs.config.server.listeners[0],
resource=resource,
server_version_string="1",
max_request_body_size=4096,
max_request_body_size=8192,
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
reactor=self.reactor,
)

Expand Down