Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Prevent historical state from being pushed to an application service …
Browse files Browse the repository at this point in the history
…via `/transactions` (MSC2716) (#11265)

Mark historical state from the MSC2716 `/batch_send` endpoint as `historical` which makes it `backfilled` and have a negative `stream_ordering` so it doesn't get queried by `/transactions`.

Fix #11241

Complement tests: matrix-org/complement#221
  • Loading branch information
MadLittleMods authored Nov 18, 2021
1 parent 92b7538 commit 7ffddd8
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 2 deletions.
1 change: 1 addition & 0 deletions changelog.d/11265.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prevent [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical state events from being pushed to an application service via `/transactions`.
23 changes: 21 additions & 2 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,32 @@ async def push_bulk(
json_body=body,
args={"access_token": service.hs_token},
)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"push_bulk to %s succeeded! events=%s",
uri,
[event.get("event_id") for event in events],
)
sent_transactions_counter.labels(service.id).inc()
sent_events_counter.labels(service.id).inc(len(events))
return True
except CodeMessageException as e:
logger.warning("push_bulk to %s received %s", uri, e.code)
logger.warning(
"push_bulk to %s received code=%s msg=%s",
uri,
e.code,
e.msg,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
except Exception as ex:
logger.warning("push_bulk to %s threw exception %s", uri, ex)
logger.warning(
"push_bulk to %s threw exception(%s) %s args=%s",
uri,
type(ex).__name__,
ex,
ex.args,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
failed_transactions_counter.labels(service.id).inc()
return False

Expand Down
2 changes: 2 additions & 0 deletions synapse/handlers/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ async def persist_state_events_at_start(
action=membership,
content=event_dict["content"],
outlier=True,
historical=True,
prev_event_ids=[prev_event_id_for_state_chain],
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
Expand All @@ -240,6 +241,7 @@ async def persist_state_events_at_start(
),
event_dict,
outlier=True,
historical=True,
prev_event_ids=[prev_event_id_for_state_chain],
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
Expand Down
15 changes: 15 additions & 0 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ async def _local_membership_update(
content: Optional[dict] = None,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
) -> Tuple[str, int]:
"""
Internal membership update function to get an existing event or create
Expand All @@ -293,6 +294,9 @@ async def _local_membership_update(
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
Returns:
Tuple of event ID and stream ordering position
Expand Down Expand Up @@ -337,6 +341,7 @@ async def _local_membership_update(
auth_event_ids=auth_event_ids,
require_consent=require_consent,
outlier=outlier,
historical=historical,
)

prev_state_ids = await context.get_prev_state_ids()
Expand Down Expand Up @@ -433,6 +438,7 @@ async def update_membership(
new_room: bool = False,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]:
Expand All @@ -454,6 +460,9 @@ async def update_membership(
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
prev_event_ids: The event IDs to use as the prev events
auth_event_ids:
The event ids to use as the auth_events for the new event.
Expand Down Expand Up @@ -487,6 +496,7 @@ async def update_membership(
new_room=new_room,
require_consent=require_consent,
outlier=outlier,
historical=historical,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
)
Expand All @@ -507,6 +517,7 @@ async def update_membership_locked(
new_room: bool = False,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]:
Expand All @@ -530,6 +541,9 @@ async def update_membership_locked(
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
prev_event_ids: The event IDs to use as the prev events
auth_event_ids:
The event ids to use as the auth_events for the new event.
Expand Down Expand Up @@ -657,6 +671,7 @@ async def update_membership_locked(
content=content,
require_consent=require_consent,
outlier=outlier,
historical=historical,
)

latest_event_ids = await self.store.get_prev_events_for_room(room_id)
Expand Down

0 comments on commit 7ffddd8

Please sign in to comment.