Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add debug logging for issue #9553
Browse files Browse the repository at this point in the history
Hopefully this will help us track down where to-device messages are getting
lost/delayed.
  • Loading branch information
richvdh committed May 10, 2021
1 parent 6c84778 commit 50e45ab
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 1 deletion.
1 change: 1 addition & 0 deletions changelog.d/9959.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add debug logging for lost/delayed to-device messages.
9 changes: 9 additions & 0 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
from synapse.logging.opentracing import SynapseTags, set_tag
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -574,6 +575,14 @@ async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]
for content in contents
]

if edus:
issue9533_logger.debug(
"Sending %i to-device messages to %s, up to stream id %i",
len(edus),
self._destination,
stream_id,
)

return (edus, stream_id)

def _start_catching_up(self) -> None:
Expand Down
5 changes: 5 additions & 0 deletions synapse/logging/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
# limitations under the License.

# These are imported to allow for nicer logging configuration files.
import logging

from synapse.logging._remote import RemoteHandler
from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter

__all__ = ["RemoteHandler", "JsonFormatter", "TerseJsonFormatter"]

# Debug logger for https://github.com/matrix-org/synapse/issues/9533 etc
issue9533_logger = logging.getLogger("synapse.9533_debug")
8 changes: 8 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from synapse.api.errors import AuthError
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import log_kv, start_active_span
from synapse.logging.utils import log_function
Expand Down Expand Up @@ -426,6 +427,13 @@ def on_new_event(
for room in rooms:
user_streams |= self.room_to_user_streams.get(room, set())

if stream_key == "to_device_key":
issue9533_logger.debug(
"to-device messages stream id %s, awaking streams for %s",
new_token,
users,
)

time_now_ms = self.clock.time_msec()
for user_stream in user_streams:
try:
Expand Down
1 change: 0 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@

logger = logging.getLogger(__name__)


# How long we allow callers to wait for replication updates before timing out.
_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30

Expand Down
18 changes: 18 additions & 0 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
from typing import List, Optional, Tuple

from synapse.logging import issue9533_logger
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
Expand Down Expand Up @@ -404,6 +405,13 @@ def add_messages_txn(txn, now_ms, stream_id):
],
)

if remote_messages_by_destination:
issue9533_logger.debug(
"Queued outgoing to-device messages with stream_id %i for %s",
stream_id,
list(remote_messages_by_destination.keys()),
)

async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
await self.db_pool.runInteraction(
Expand Down Expand Up @@ -533,6 +541,16 @@ def _add_messages_to_local_device_inbox_txn(
],
)

issue9533_logger.debug(
"Stored to-device messages with stream_id %i for %s",
stream_id,
[
(user_id, device_id)
for (user_id, messages_by_device) in local_by_user_then_device.items()
for device_id in messages_by_device.keys()
],
)


class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
Expand Down

0 comments on commit 50e45ab

Please sign in to comment.