Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Aggregate unread notif count query for badge count calculation #14255

Merged
merged 22 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14255.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimise push badge count calculations. Contributed by Nick @ Beeper (@fizzadar).
26 changes: 7 additions & 19 deletions synapse/push/push_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from synapse.push.presentable_names import calculate_room_name, name_from_member_event
from synapse.storage.controllers import StorageControllers
from synapse.storage.databases.main import DataStore
from synapse.util.async_helpers import concurrently_execute


async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -> int:
Expand All @@ -26,23 +25,10 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -

badge = len(invites)

room_notifs = []

async def get_room_unread_count(room_id: str) -> None:
room_notifs.append(
await store.get_unread_event_push_actions_by_room_for_user(
room_id,
user_id,
)
)

await concurrently_execute(get_room_unread_count, joins, 10)

for notifs in room_notifs:
# Combine the counts from all the threads.
notify_count = notifs.main_timeline.notify_count + sum(
n.notify_count for n in notifs.threads.values()
)
room_to_count = await store.get_unread_counts_by_room_for_user(user_id)
for room_id, notify_count in room_to_count.items():
if room_id not in joins:
continue
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved

if notify_count == 0:
continue
Expand All @@ -51,8 +37,10 @@ async def get_room_unread_count(room_id: str) -> None:
# return one badge count per conversation
badge += 1
else:
# increment the badge count by the number of unread messages in the room
# Increase badge by number of notifications in room
# NOTE: this includes threaded notifications as well as non-threaded
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved
badge += notify_count

return badge


Expand Down
136 changes: 136 additions & 0 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
"""

import logging
from collections import defaultdict
from typing import (
TYPE_CHECKING,
Collection,
Expand Down Expand Up @@ -463,6 +464,141 @@ def add_thread_id_summary_txn(txn: LoggingTransaction) -> int:

return result

async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]:
"""Get the notification count by room for a user. Only considers notifications,
no highlights or unreads, and threads are currently aggregated under their room.
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved

This function is intentionally not cached because it is called to calculate the
unread badge for notifications and thus the result is expected to change.
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved

Note that this function assumes the user is a member of the room. Because
summary rows are not removed when a user leaves a room, the caller must
filter out those results from the result.
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved
return await self.db_pool.runInteraction(
"get_unread_counts_by_room_for_user",
self._get_unread_counts_by_room_for_user_txn,
user_id,
)

def _get_unread_counts_by_room_for_user_txn(
self, txn: LoggingTransaction, user_id: str
) -> Dict[str, int]:
receipt_types_clause, args = make_in_list_sql_clause(
self.database_engine,
"receipt_type",
(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
)
args.extend([user_id, user_id])

room_to_count: Dict[str, int] = defaultdict(int)
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved

# First get summary counts by room / thread for the user. Note we use a OR join
# condition here such that we handle both receipts with thread ID's and those
# without that get applied to any thread_id values.
sql = f"""
SELECT eps.room_id, notif_count
FROM event_push_summary AS eps
LEFT JOIN (
SELECT room_id, thread_id, MAX(stream_ordering) AS receipt_stream_ordering
FROM receipts_linearized
LEFT JOIN events USING (room_id, event_id)
WHERE
{receipt_types_clause}
AND user_id = ?
GROUP BY room_id, thread_id
) AS receipts ON (
eps.room_id = receipts.room_id
AND eps.thread_id = receipts.thread_id
) OR (
eps.room_id = receipts.room_id
AND receipts.thread_id IS NULL
)
WHERE user_id = ?
AND notif_count != 0
AND (
(last_receipt_stream_ordering IS NULL AND stream_ordering > receipt_stream_ordering)
OR last_receipt_stream_ordering = receipt_stream_ordering
OR receipt_stream_ordering IS NULL
)
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""
txn.execute(sql, args)

for room_id, notif_count in txn:
room_to_count[room_id] += notif_count

# Now get any event push actions that haven't been rotated using the same OR
# join and filter by receipt and event push summary rotated up to stream ordering.
sql = f"""
SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
FROM event_push_actions AS epa
LEFT JOIN (
SELECT room_id, thread_id, MAX(stream_ordering) AS receipt_stream_ordering
FROM receipts_linearized
LEFT JOIN events USING (room_id, event_id)
WHERE
{receipt_types_clause}
AND user_id = ?
GROUP BY room_id, thread_id
) AS receipts ON (
epa.room_id = receipts.room_id
AND epa.thread_id = receipts.thread_id
) OR (
epa.room_id = receipts.room_id
AND receipts.thread_id IS NULL
)
WHERE user_id = ?
AND epa.notif = 1
AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering)
AND (receipt_stream_ordering IS NULL OR stream_ordering > receipt_stream_ordering)
GROUP BY (epa.room_id)
"""
txn.execute(sql, args)

for room_id, notif_count in txn:
room_to_count[room_id] += notif_count

room_id_clause, room_id_args = make_in_list_sql_clause(
self.database_engine, "epa.room_id", room_to_count.keys()
)

# Finally re-check event_push_actions for any rooms not in the summary, ignoring
# the rotated up-to position. This handles the case where a read receipt has arrived
# but not been rotated meaning the summary table is out of date, so we go back to
# the push actions table.
sql = f"""
SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
FROM event_push_actions AS epa
LEFT JOIN (
SELECT room_id, thread_id, MAX(stream_ordering) AS receipt_stream_ordering
FROM receipts_linearized
LEFT JOIN events USING (room_id, event_id)
WHERE
{receipt_types_clause}
AND user_id = ?
GROUP BY room_id, thread_id
) AS receipts ON (
epa.room_id = receipts.room_id
AND epa.thread_id = receipts.thread_id
) OR (
epa.room_id = receipts.room_id
AND receipts.thread_id IS NULL
)
WHERE user_id = ?
AND NOT {room_id_clause}
AND epa.notif = 1
AND (receipt_stream_ordering IS NULL OR stream_ordering > receipt_stream_ordering)
GROUP BY (epa.room_id)
"""

args.extend(room_id_args)
txn.execute(sql, args)

for room_id, notif_count in txn:
room_to_count[room_id] += notif_count

return room_to_count

@cached(tree=True, max_entries=5000, iterable=True)
async def get_unread_event_push_actions_by_room_for_user(
self,
Expand Down
9 changes: 9 additions & 0 deletions tests/storage/test_event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,15 @@ def _assert_counts(noitf_count: int, highlight_count: int) -> None:
)
self.assertEqual(counts.threads, {})

aggregate_counts = self.get_success(
self.store.db_pool.runInteraction(
"get-aggregate-unread-counts",
self.store._get_unread_counts_by_room_for_user_txn,
user_id,
)
)
self.assertEqual(sum(aggregate_counts.values()), noitf_count)
clokep marked this conversation as resolved.
Show resolved Hide resolved

def _create_event(highlight: bool = False) -> str:
result = self.helper.send_event(
room_id,
Expand Down