Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Detect unknown remote devices and mark cache as stale #6776

Merged
merged 5 commits into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6776.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Detect unknown remote devices and mark cache as stale.
57 changes: 55 additions & 2 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import logging
from typing import Any, Dict

from canonicaljson import json

Expand Down Expand Up @@ -65,6 +66,9 @@ def on_direct_to_device_edu(self, origin, content):
logger.warning("Request for keys for non-local user %s", user_id)
raise SynapseError(400, "Not a user here")

if not by_device:
continue

messages_by_device = {
device_id: {
"content": message_content,
Expand All @@ -73,8 +77,11 @@ def on_direct_to_device_edu(self, origin, content):
}
for device_id, message_content in by_device.items()
}
if messages_by_device:
local_messages[user_id] = messages_by_device
local_messages[user_id] = messages_by_device

yield self._check_for_unknown_devices(
message_type, sender_user_id, by_device
)

stream_id = yield self.store.add_messages_from_remote_to_device_inbox(
origin, message_id, local_messages
Expand All @@ -84,6 +91,52 @@ def on_direct_to_device_edu(self, origin, content):
"to_device_key", stream_id, users=local_messages.keys()
)

@defer.inlineCallbacks
def _check_for_unknown_devices(
self,
message_type: str,
sender_user_id: str,
by_device: Dict[str, Dict[str, Any]],
):
"""Checks inbound device messages for unkown remote devices, and if
found marks the remote cache for the user as stale.
"""

if message_type != "m.room_key_request":
return

# Get the sending device IDs
requesting_device_ids = set()
for message_content in by_device.values():
device_id = message_content.get("requesting_device_id")
requesting_device_ids.add(device_id)

# Check if we are tracking the devices of the remote user.
room_ids = yield self.store.get_rooms_for_user(sender_user_id)
if not room_ids:
logger.info(
"Received device message from remote device we don't"
" share a room with: %s %s",
sender_user_id,
requesting_device_ids,
)
return

# If we are tracking check that we know about the sending
# devices.
cached_devices = yield self.store.get_cached_devices_for_user(sender_user_id)

unknown_devices = requesting_device_ids - set(cached_devices)
if unknown_devices:
logger.info(
"Received device message from remote device not in our cache: %s %s",
sender_user_id,
unknown_devices,
)
yield self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
# TODO: Poke something to start trying to refetch user's
# keys.

@defer.inlineCallbacks
def send_device_message(self, sender_user_id, message_type, messages):
set_tag("number_of_messages", len(messages))
Expand Down
20 changes: 20 additions & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,26 @@ async def _process_received_pdu(
user = UserID.from_string(event.state_key)
await self.user_joined_room(user, room_id)

# For encrypted messages we check that we know about the sending device,
# if we don't then we mark the device cache for that user as stale.
if event.type == EventTypes.Encryption:
device_id = event.content.get("device_id")
if device_id is not None:
cached_devices = await self.store.get_cached_devices_for_user(
event.sender
)
if device_id not in cached_devices:
logger.info(
"Received event from remote device not in our cache: %s %s",
event.sender,
device_id,
)
await self.store.mark_remote_user_device_cache_as_stale(
event.sender
)
# TODO: Poke something to start trying to refetch user's
# keys.

@log_function
async def backfill(self, dest, room_id, limit, extremities):
""" Trigger a backfill request to `dest` for the given `room_id`
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ def _invalidate_caches_for_devices(self, token, user_id, destination):
destination, token
)

self._get_cached_devices_for_user.invalidate((user_id,))
self.get_cached_devices_for_user.invalidate((user_id,))
self._get_cached_user_device.invalidate_many((user_id,))
self.get_device_list_last_stream_id_for_remote.invalidate((user_id,))
29 changes: 24 additions & 5 deletions synapse/storage/data_stores/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ def get_user_devices_from_cache(self, query_list):
device = yield self._get_cached_user_device(user_id, device_id)
results.setdefault(user_id, {})[device_id] = device
else:
results[user_id] = yield self._get_cached_devices_for_user(user_id)
results[user_id] = yield self.get_cached_devices_for_user(user_id)

set_tag("in_cache", results)
set_tag("not_in_cache", user_ids_not_in_cache)
Expand All @@ -475,12 +475,12 @@ def _get_cached_user_device(self, user_id, device_id):
return db_to_json(content)

@cachedInlineCallbacks()
def _get_cached_devices_for_user(self, user_id):
def get_cached_devices_for_user(self, user_id):
devices = yield self.db.simple_select_list(
table="device_lists_remote_cache",
keyvalues={"user_id": user_id},
retcols=("device_id", "content"),
desc="_get_cached_devices_for_user",
desc="get_cached_devices_for_user",
)
return {
device["device_id"]: db_to_json(device["content"]) for device in devices
Expand Down Expand Up @@ -641,6 +641,18 @@ def get_device_list_last_stream_id_for_remotes(self, user_ids):

return results

def mark_remote_user_device_cache_as_stale(self, user_id: str):
"""Records that the server has reason to believe the cache of the devices
for the remote users is out of date.
"""
return self.db.simple_upsert(
table="device_lists_remote_resync",
keyvalues={"user_id": user_id},
values={},
insertion_values={"added_ts": self._clock.time_msec()},
desc="make_remote_user_device_cache_as_stale",
)


class DeviceBackgroundUpdateStore(SQLBaseStore):
def __init__(self, database: Database, db_conn, hs):
Expand Down Expand Up @@ -887,7 +899,7 @@ def _update_remote_device_list_cache_entry_txn(
)

txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id))
txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,))
txn.call_after(
self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
)
Expand All @@ -902,6 +914,13 @@ def _update_remote_device_list_cache_entry_txn(
lock=False,
)

# If we're replacing the remote user's device list cache presumably
# we've done a full resync, so we remove the entry that says we need
# to resync
self.db.simple_delete_txn(
txn, table="device_lists_remote_resync", keyvalues={"user_id": user_id},
)

def update_remote_device_list_cache(self, user_id, devices, stream_id):
"""Replace the entire cache of the remote user's devices.

Expand Down Expand Up @@ -942,7 +961,7 @@ def _update_remote_device_list_cache_txn(self, txn, user_id, devices, stream_id)
],
)

txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,))
txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,))
txn.call_after(
self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Records whether the server thinks that the remote users cached device lists
-- may be out of date (e.g. if we have received a to device message from a
-- device we don't know about).
CREATE TABLE IF NOT EXISTS device_lists_remote_resync (
user_id TEXT NOT NULL,
added_ts BIGINT NOT NULL
);

CREATE UNIQUE INDEX device_lists_remote_resync_idx ON device_lists_remote_resync (user_id);
CREATE INDEX device_lists_remote_resync_ts_idx ON device_lists_remote_resync (added_ts);