Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Properly bump the last_active_ts.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Aug 15, 2023
1 parent d7302fd commit 1e703b5
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 30 deletions.
13 changes: 13 additions & 0 deletions synapse/api/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ class UserDevicePresenceState:
last_active_ts: int
last_sync_ts: int

@classmethod
def default(
cls, user_id: str, device_id: Optional[str]
) -> "UserDevicePresenceState":
"""Returns a default presence state."""
return cls(
user_id=user_id,
device_id=device_id,
state=PresenceState.OFFLINE,
last_active_ts=0,
last_sync_ts=0,
)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class UserPresenceState:
Expand Down
9 changes: 6 additions & 3 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1921,7 +1921,10 @@ async def persist_and_notify_client_events(
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_as_background_process(
"bump_presence_active_time", self._bump_active_time, requester.user
"bump_presence_active_time",
self._bump_active_time,
requester.user,
requester.device_id,
)

async def _notify() -> None:
Expand Down Expand Up @@ -1958,10 +1961,10 @@ async def _maybe_kick_guest_users(
logger.info("maybe_kick_guest_users %r", current_state)
await self.hs.get_room_member_handler().kick_guest_users(current_state)

async def _bump_active_time(self, user: UserID) -> None:
async def _bump_active_time(self, user: UserID, device_id: Optional[str]) -> None:
try:
presence = self.hs.get_presence_handler()
await presence.bump_presence_active_time(user)
await presence.bump_presence_active_time(user, device_id)
except Exception:
logger.exception("Error bumping presence active time")

Expand Down
56 changes: 35 additions & 21 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,9 @@ async def set_state(
"""

@abc.abstractmethod
async def bump_presence_active_time(self, user: UserID) -> None:
async def bump_presence_active_time(
self, user: UserID, device_id: Optional[str]
) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
Expand Down Expand Up @@ -652,7 +654,9 @@ async def set_state(
is_sync=is_sync,
)

async def bump_presence_active_time(self, user: UserID) -> None:
async def bump_presence_active_time(
self, user: UserID, device_id: Optional[str]
) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
Expand All @@ -663,7 +667,9 @@ async def bump_presence_active_time(self, user: UserID) -> None:
# Proxy request to instance that writes presence
user_id = user.to_string()
await self._bump_active_client(
instance_name=self._presence_writer_instance, user_id=user_id
instance_name=self._presence_writer_instance,
user_id=user_id,
device_id=device_id,
)


Expand Down Expand Up @@ -977,7 +983,9 @@ async def _handle_timeouts(self) -> None:

return await self._update_states(changes)

async def bump_presence_active_time(self, user: UserID) -> None:
async def bump_presence_active_time(
self, user: UserID, device_id: Optional[str]
) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
Expand All @@ -989,11 +997,26 @@ async def bump_presence_active_time(self, user: UserID) -> None:

bump_active_time_counter.inc()

prev_state = await self.current_state_for_user(user_id)
now = self.clock.time_msec()

new_fields: Dict[str, Any] = {"last_active_ts": self.clock.time_msec()}
if prev_state.state == PresenceState.UNAVAILABLE:
new_fields["state"] = PresenceState.ONLINE
# Update the device information & mark the device as online if it was
# unavailable.
devices = self.user_to_device_to_current_state.setdefault(user_id, {})
device_state = devices.setdefault(
device_id,
UserDevicePresenceState.default(user_id, device_id),
)
device_state.last_active_ts = now
if device_state.state == PresenceState.UNAVAILABLE:
device_state.state = PresenceState.ONLINE

# Update the user state, if this will always update last_active_ts and
# might update the presence state.
prev_state = await self.current_state_for_user(user_id)
new_fields: Dict[str, Any] = {
"last_active_ts": now,
"presence": _combine_device_states(devices.values()),
}

await self._update_states([prev_state.copy_and_replace(**new_fields)])

Expand Down Expand Up @@ -1272,27 +1295,18 @@ async def set_state(
prev_state = await self.current_state_for_user(user_id)

# Always update the device specific information.
device_state = self.user_to_device_to_current_state.setdefault(
user_id, {}
).setdefault(
devices = self.user_to_device_to_current_state.setdefault(user_id, {})
device_state = devices.setdefault(
device_id,
UserDevicePresenceState(
user_id,
device_id,
presence,
last_active_ts=self.clock.time_msec(),
last_sync_ts=0,
),
UserDevicePresenceState.default(user_id, device_id),
)
device_state.state = presence
device_state.last_active_ts = now
if is_sync:
device_state.last_sync_ts = now

# Based on (all) the user's devices calculate the new presence state.
presence = _combine_device_states(
self.user_to_device_to_current_state[user_id].values()
)
presence = _combine_device_states(devices.values())

# The newly updated status as an amalgamation of all the device statuses.
new_fields = {"state": presence}
Expand Down
7 changes: 4 additions & 3 deletions synapse/replication/http/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ def __init__(self, hs: "HomeServer"):
self._presence_handler = hs.get_presence_handler()

@staticmethod
async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override]
return {}
async def _serialize_payload(user_id: str, device_id: Optional[str]) -> JsonDict: # type: ignore[override]
return {"device_id": device_id}

async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict, user_id: str
) -> Tuple[int, JsonDict]:
await self._presence_handler.bump_presence_active_time(
UserID.from_string(user_id)
UserID.from_string(user_id),
content.get("device_id"),
)

return (200, {})
Expand Down
4 changes: 3 additions & 1 deletion synapse/rest/client/read_marker.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ async def on_POST(
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)

await self.presence_handler.bump_presence_active_time(requester.user)
await self.presence_handler.bump_presence_active_time(
requester.user, requester.device_id
)

body = parse_json_object_from_request(request)

Expand Down
4 changes: 3 additions & 1 deletion synapse/rest/client/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ async def on_POST(
Codes.INVALID_PARAM,
)

await self.presence_handler.bump_presence_active_time(requester.user)
await self.presence_handler.bump_presence_active_time(
requester.user, requester.device_id
)

if receipt_type == ReceiptTypes.FULLY_READ:
await self.read_marker_handler.received_client_read_marker(
Expand Down
4 changes: 3 additions & 1 deletion synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,9 @@ async def on_PUT(

content = parse_json_object_from_request(request)

await self.presence_handler.bump_presence_active_time(requester.user)
await self.presence_handler.bump_presence_active_time(
requester.user, requester.device_id
)

# Limit timeout to stop people from setting silly typing timeouts.
timeout = min(content.get("timeout", 30000), 120000)
Expand Down

0 comments on commit 1e703b5

Please sign in to comment.