Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport staging] Correctly handle limited sync responses by resetting the thread timeline #3069

Merged
merged 1 commit into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 140 additions & 1 deletion spec/unit/models/thread.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { Thread, THREAD_RELATION_TYPE, ThreadEvent } from "../../../src/models/t
import { mkThread } from "../../test-utils/thread";
import { TestClient } from "../../TestClient";
import { emitPromise, mkMessage, mock } from "../../test-utils/test-utils";
import { EventStatus, MatrixEvent } from "../../../src";
import { Direction, EventStatus, MatrixEvent } from "../../../src";
import { ReceiptType } from "../../../src/@types/read_receipts";
import { getMockClientWithEventEmitter, mockClientMethodsUser } from "../../test-utils/client";
import { ReEmitter } from "../../../src/ReEmitter";
Expand Down Expand Up @@ -283,4 +283,143 @@ describe("Thread", () => {
expect(thread2.getEventReadUpTo(myUserId)).toBe(null);
});
});

describe("resetLiveTimeline", () => {
// ResetLiveTimeline is used when we have missing messages between the current live timeline's end and newly
// received messages. In that case, we want to replace the existing live timeline. To ensure pagination
// continues working correctly, new pagination tokens need to be set on both the old live timeline (which is
// now a regular timeline) and the new live timeline.
it("replaces the live timeline and correctly sets pagination tokens", async () => {
const myUserId = "@bob:example.org";
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
timelineSupport: false,
});
const client = testClient.client;
const room = new Room("123", client, myUserId, {
pendingEventOrdering: PendingEventOrdering.Detached,
});

jest.spyOn(client, "getRoom").mockReturnValue(room);

const { thread } = mkThread({
room,
client,
authorId: myUserId,
participantUserIds: ["@alice:example.org"],
length: 3,
});
await emitPromise(thread, ThreadEvent.Update);
expect(thread.length).toBe(2);

jest.spyOn(client, "createMessagesRequest").mockImplementation((_, token) =>
Promise.resolve({
chunk: [],
start: `${token}-new`,
end: `${token}-new`,
}),
);

function timelines(): [string | null, string | null][] {
return thread.timelineSet
.getTimelines()
.map((it) => [it.getPaginationToken(Direction.Backward), it.getPaginationToken(Direction.Forward)]);
}

expect(timelines()).toEqual([[null, null]]);
const promise = thread.resetLiveTimeline("b1", "f1");
expect(timelines()).toEqual([
[null, "f1"],
["b1", null],
]);
await promise;
expect(timelines()).toEqual([
[null, "f1-new"],
["b1-new", null],
]);
});

// As the pagination tokens cannot be used right now, resetLiveTimeline needs to replace them before they can
// be used. But if in the future the bug in synapse is fixed, and they can actually be used, we can get into a
// state where the client has paginated (and changed the tokens) while resetLiveTimeline tries to set the
// corrected tokens. To prevent such a race condition, we make sure that resetLiveTimeline respects any
// changes done to the pagination tokens.
it("replaces the live timeline but does not replace changed pagination tokens", async () => {
const myUserId = "@bob:example.org";
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
timelineSupport: false,
});
const client = testClient.client;
const room = new Room("123", client, myUserId, {
pendingEventOrdering: PendingEventOrdering.Detached,
});

jest.spyOn(client, "getRoom").mockReturnValue(room);

const { thread } = mkThread({
room,
client,
authorId: myUserId,
participantUserIds: ["@alice:example.org"],
length: 3,
});
await emitPromise(thread, ThreadEvent.Update);
expect(thread.length).toBe(2);

jest.spyOn(client, "createMessagesRequest").mockImplementation((_, token) =>
Promise.resolve({
chunk: [],
start: `${token}-new`,
end: `${token}-new`,
}),
);

function timelines(): [string | null, string | null][] {
return thread.timelineSet
.getTimelines()
.map((it) => [it.getPaginationToken(Direction.Backward), it.getPaginationToken(Direction.Forward)]);
}

expect(timelines()).toEqual([[null, null]]);
const promise = thread.resetLiveTimeline("b1", "f1");
expect(timelines()).toEqual([
[null, "f1"],
["b1", null],
]);
thread.timelineSet.getTimelines()[0].setPaginationToken("f2", Direction.Forward);
thread.timelineSet.getTimelines()[1].setPaginationToken("b2", Direction.Backward);
await promise;
expect(timelines()).toEqual([
[null, "f2"],
["b2", null],
]);
});

it("is correctly called by the room", async () => {
const myUserId = "@bob:example.org";
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
timelineSupport: false,
});
const client = testClient.client;
const room = new Room("123", client, myUserId, {
pendingEventOrdering: PendingEventOrdering.Detached,
});

jest.spyOn(client, "getRoom").mockReturnValue(room);

const { thread } = mkThread({
room,
client,
authorId: myUserId,
participantUserIds: ["@alice:example.org"],
length: 3,
});
await emitPromise(thread, ThreadEvent.Update);
expect(thread.length).toBe(2);
const mock = jest.spyOn(thread, "resetLiveTimeline");
mock.mockReturnValue(Promise.resolve());

room.resetLiveTimeline("b1", "f1");
expect(mock).toHaveBeenCalledWith("b1", "f1");
});
});
});
5 changes: 4 additions & 1 deletion src/models/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,9 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
for (const timelineSet of this.timelineSets) {
timelineSet.resetLiveTimeline(backPaginationToken ?? undefined, forwardPaginationToken ?? undefined);
}
for (const thread of this.threads.values()) {
thread.resetLiveTimeline(backPaginationToken, forwardPaginationToken);
}

this.fixUpLegacyTimelineFields();
}
Expand Down Expand Up @@ -1223,7 +1226,7 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
const event = this.findEventById(eventId);
const thread = this.findThreadForEvent(event);
if (thread) {
return thread.timelineSet.getLiveTimeline();
return thread.timelineSet.getTimelineForEvent(eventId);
} else {
return this.getUnfilteredTimelineSet().getTimelineForEvent(eventId);
}
Expand Down
59 changes: 58 additions & 1 deletion src/models/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ export class Thread extends ReadReceipt<EmittedEvents, EventHandlerMap> {
this.setEventMetadata(event);

const lastReply = this.lastReply();
const isNewestReply = !lastReply || event.localTimestamp > lastReply!.localTimestamp;
const isNewestReply = !lastReply || event.localTimestamp >= lastReply!.localTimestamp;

// Add all incoming events to the thread's timeline set when there's no server support
if (!Thread.hasServerSideSupport) {
Expand Down Expand Up @@ -358,6 +358,63 @@ export class Thread extends ReadReceipt<EmittedEvents, EventHandlerMap> {
this.pendingReplyCount = pendingEvents.length;
}

/**
* Reset the live timeline of all timelineSets, and start new ones.
*
* <p>This is used when /sync returns a 'limited' timeline. 'Limited' means that there's a gap between the messages
* /sync returned, and the last known message in our timeline. In such a case, our live timeline isn't live anymore
* and has to be replaced by a new one. To make sure we can continue paginating our timelines correctly, we have to
* set new pagination tokens on the old and the new timeline.
*
* @param backPaginationToken - token for back-paginating the new timeline
* @param forwardPaginationToken - token for forward-paginating the old live timeline,
* if absent or null, all timelines are reset, removing old ones (including the previous live
* timeline which would otherwise be unable to paginate forwards without this token).
* Removing just the old live timeline whilst preserving previous ones is not supported.
*/
public async resetLiveTimeline(
backPaginationToken?: string | null,
forwardPaginationToken?: string | null,
): Promise<void> {
const oldLive = this.liveTimeline;
this.timelineSet.resetLiveTimeline(backPaginationToken ?? undefined, forwardPaginationToken ?? undefined);
const newLive = this.liveTimeline;

// FIXME: Remove the following as soon as https://github.com/matrix-org/synapse/issues/14830 is resolved.
//
// The pagination API for thread timelines currently can't handle the type of pagination tokens returned by sync
//
// To make this work anyway, we'll have to transform them into one of the types that the API can handle.
// One option is passing the tokens to /messages, which can handle sync tokens, and returns the right format.
// /messages does not return new tokens on requests with a limit of 0.
// This means our timelines might overlap a slight bit, but that's not an issue, as we deduplicate messages
// anyway.

let newBackward: string | undefined;
let oldForward: string | undefined;
if (backPaginationToken) {
const res = await this.client.createMessagesRequest(this.roomId, backPaginationToken, 1, Direction.Forward);
newBackward = res.end;
}
if (forwardPaginationToken) {
const res = await this.client.createMessagesRequest(
this.roomId,
forwardPaginationToken,
1,
Direction.Backward,
);
oldForward = res.start;
}
// Only replace the token if we don't have paginated away from this position already. This situation doesn't
// occur today, but if the above issue is resolved, we'd have to go down this path.
if (forwardPaginationToken && oldLive.getPaginationToken(Direction.Forward) === forwardPaginationToken) {
oldLive.setPaginationToken(oldForward ?? null, Direction.Forward);
}
if (backPaginationToken && newLive.getPaginationToken(Direction.Backward) === backPaginationToken) {
newLive.setPaginationToken(newBackward ?? null, Direction.Backward);
}
}

private async updateThreadMetadata(): Promise<void> {
this.updatePendingReplyCount();

Expand Down