Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add debug logging for issue #9533 #9959

Merged
merged 2 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9959.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add debug logging for lost/delayed to-device messages.
9 changes: 9 additions & 0 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
from synapse.logging.opentracing import SynapseTags, set_tag
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -574,6 +575,14 @@ async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]
for content in contents
]

if edus:
issue9533_logger.debug(
"Sending %i to-device messages to %s, up to stream id %i",
len(edus),
self._destination,
stream_id,
)

return (edus, stream_id)

def _start_catching_up(self) -> None:
Expand Down
7 changes: 6 additions & 1 deletion synapse/logging/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# These are imported to allow for nicer logging configuration files.
import logging

from synapse.logging._remote import RemoteHandler
from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter

# These are imported to allow for nicer logging configuration files.
__all__ = ["RemoteHandler", "JsonFormatter", "TerseJsonFormatter"]

# Debug logger for https://github.com/matrix-org/synapse/issues/9533 etc
issue9533_logger = logging.getLogger("synapse.9533_debug")
8 changes: 8 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from synapse.api.errors import AuthError
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import log_kv, start_active_span
from synapse.logging.utils import log_function
Expand Down Expand Up @@ -426,6 +427,13 @@ def on_new_event(
for room in rooms:
user_streams |= self.room_to_user_streams.get(room, set())

if stream_key == "to_device_key":
issue9533_logger.debug(
"to-device messages stream id %s, awaking streams for %s",
new_token,
users,
)

time_now_ms = self.clock.time_msec()
for user_stream in user_streams:
try:
Expand Down
1 change: 0 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@

logger = logging.getLogger(__name__)


# How long we allow callers to wait for replication updates before timing out.
_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30

Expand Down
18 changes: 18 additions & 0 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
from typing import List, Optional, Tuple

from synapse.logging import issue9533_logger
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
Expand Down Expand Up @@ -404,6 +405,13 @@ def add_messages_txn(txn, now_ms, stream_id):
],
)

if remote_messages_by_destination:
issue9533_logger.debug(
"Queued outgoing to-device messages with stream_id %i for %s",
stream_id,
list(remote_messages_by_destination.keys()),
)

async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
await self.db_pool.runInteraction(
Expand Down Expand Up @@ -533,6 +541,16 @@ def _add_messages_to_local_device_inbox_txn(
],
)

issue9533_logger.debug(
"Stored to-device messages with stream_id %i for %s",
stream_id,
[
(user_id, device_id)
for (user_id, messages_by_device) in local_by_user_then_device.items()
for device_id in messages_by_device.keys()
],
)


class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
Expand Down