Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent threads code from making identical simultaneous API hits #3541

Merged
merged 9 commits into from
Jul 7, 2023
109 changes: 5 additions & 104 deletions spec/integ/matrix-client-event-timeline.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -598,12 +598,6 @@ describe("MatrixClient event timelines", function () {
await client.stopClient(); // we don't need the client to be syncing at this time
const room = client.getRoom(roomId)!;

httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return THREAD_ROOT;
});

httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
Expand Down Expand Up @@ -634,12 +628,6 @@ describe("MatrixClient event timelines", function () {
const thread = room.createThread(THREAD_ROOT.event_id!, undefined, [], false);
await httpBackend.flushAllExpected();
const timelineSet = thread.timelineSet;
httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return THREAD_ROOT;
});
await flushHttp(emitPromise(thread, ThreadEvent.Update));

const timeline = await client.getEventTimeline(timelineSet, THREAD_REPLY.event_id!);

Expand Down Expand Up @@ -1510,7 +1498,8 @@ describe("MatrixClient event timelines", function () {
},
event: true,
});
THREAD_REPLY2.localTimestamp += 1000;
// this has to come after THREAD_REPLY which hasn't been instantiated by us
THREAD_REPLY2.localTimestamp += 10000000;

// Test data for the first thread, with the second reply
const THREAD_ROOT_UPDATED = {
Expand Down Expand Up @@ -1570,14 +1559,11 @@ describe("MatrixClient event timelines", function () {
thread.initialEventsFetched = true;
const prom = emitPromise(room, ThreadEvent.NewReply);
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD2_ROOT);
await room.addLiveEvents([THREAD_REPLY2]);
await httpBackend.flushAllExpected();
await prom;
expect(thread.length).toBe(2);
expect(thread.length).toBe(1);
// Test threads are in chronological order
expect(timeline!.getEvents().map((it) => it.event.event_id)).toEqual([
THREAD2_ROOT.event_id,
Expand Down Expand Up @@ -1699,13 +1685,11 @@ describe("MatrixClient event timelines", function () {
thread.initialEventsFetched = true;
const prom = emitPromise(room, ThreadEvent.Update);
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD2_ROOT);
await room.addLiveEvents([THREAD_REPLY_REACTION]);
await httpBackend.flushAllExpected();
await prom;
expect(thread.length).toBe(2);
expect(thread.length).toBe(1); // reactions don't count towards the length of a thread
// Test thread order is unchanged
expect(timeline!.getEvents().map((it) => it.event.event_id)).toEqual([
THREAD_ROOT.event_id,
Expand Down Expand Up @@ -2021,97 +2005,14 @@ describe("MatrixClient event timelines", function () {
.respond(200, function () {
return THREAD_ROOT;
});
httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return THREAD_ROOT;
});
httpBackend
.when(
"GET",
"/_matrix/client/v1/rooms/!foo%3Abar/relations/" +
encodeURIComponent(THREAD_ROOT.event_id!) +
"/" +
encodeURIComponent(THREAD_RELATION_TYPE.name) +
buildRelationPaginationQuery({ dir: Direction.Backward, limit: 1 }),
)
.respond(200, function () {
return {
chunk: [THREAD_REPLY],
};
});
await Promise.all([httpBackend.flushAllExpected(), utils.syncPromise(client)]);

const room = client.getRoom(roomId)!;
const thread = room.getThread(THREAD_ROOT.event_id!)!;
expect(thread.initialEventsFetched).toBeTruthy();
const timelineSet = thread.timelineSet;

httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return THREAD_ROOT;
});
httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return THREAD_ROOT;
});
httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return THREAD_ROOT;
});
httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return THREAD_ROOT;
});
httpBackend
.when("GET", "/rooms/!foo%3Abar/context/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return {
start: "start_token",
events_before: [],
event: THREAD_ROOT,
events_after: [],
end: "end_token",
state: [],
};
});
httpBackend
.when(
"GET",
"/_matrix/client/v1/rooms/!foo%3Abar/relations/" +
encodeURIComponent(THREAD_ROOT.event_id!) +
"/" +
encodeURIComponent(THREAD_RELATION_TYPE.name) +
buildRelationPaginationQuery({
dir: Direction.Backward,
from: "start_token",
}),
)
.respond(200, function () {
return {
chunk: [],
};
});
httpBackend
.when(
"GET",
"/_matrix/client/v1/rooms/!foo%3Abar/relations/" +
encodeURIComponent(THREAD_ROOT.event_id!) +
"/" +
encodeURIComponent(THREAD_RELATION_TYPE.name) +
buildRelationPaginationQuery({ dir: Direction.Forward, from: "end_token" }),
)
.respond(200, function () {
return {
chunk: [THREAD_REPLY],
};
});

const timeline = await flushHttp(client.getEventTimeline(timelineSet, THREAD_ROOT.event_id!));
const timeline = await client.getEventTimeline(timelineSet, THREAD_ROOT.event_id!);

httpBackend.when("GET", "/sync").respond(200, {
next_batch: "s_5_5",
Expand Down
6 changes: 1 addition & 5 deletions spec/unit/models/thread.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -699,11 +699,7 @@ async function createThread(client: MatrixClient, user: string, roomId: string):
root.setThreadId(root.getId());
await room.addLiveEvents([root]);

// Create the thread and wait for it to be initialised
const thread = room.createThread(root.getId()!, root, [], false);
await new Promise<void>((res) => thread.once(RoomEvent.TimelineReset, () => res()));

return thread;
return room.createThread(root.getId()!, root, [], false);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion spec/unit/room.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2787,10 +2787,10 @@ describe("Room", function () {
let prom = emitPromise(room, ThreadEvent.New);
await room.addLiveEvents([threadRoot, threadResponse1]);
const thread: Thread = await prom;
await emitPromise(room, ThreadEvent.Update);

expect(thread.initialEventsFetched).toBeTruthy();
await room.addLiveEvents([threadResponse2]);
await emitPromise(room, ThreadEvent.Update);
expect(thread).toHaveLength(2);
expect(thread.replyToEvent!.getId()).toBe(threadResponse2.getId());

Expand Down
26 changes: 22 additions & 4 deletions src/models/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
public readonly room: Room;
public readonly client: MatrixClient;
private readonly pendingEventOrdering: PendingEventOrdering;
private processRootEventPromise?: Promise<void>;

public initialEventsFetched = !Thread.hasServerSideSupport;
/**
Expand Down Expand Up @@ -197,6 +198,10 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
this._currentUserParticipated = false;
this.emit(ThreadEvent.Delete, this);
} else {
if (this.lastEvent?.getId() === event.getAssociatedId()) {
// XXX: If our last event got redacted we query the server for the last event once again
this.processRootEventPromise = undefined;
robintown marked this conversation as resolved.
Show resolved Hide resolved
}
await this.updateThreadMetadata();
}
};
Expand Down Expand Up @@ -308,6 +313,11 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
public async addEvent(event: MatrixEvent, toStartOfTimeline: boolean, emit = true): Promise<void> {
this.setEventMetadata(event);

if (!this.initialEventsFetched && !toStartOfTimeline && event.getId() === this.id) {
// We're loading the thread organically
this.initialEventsFetched = true;
}

const lastReply = this.lastReply();
const isNewestReply = !lastReply || event.localTimestamp >= lastReply!.localTimestamp;

Expand Down Expand Up @@ -475,18 +485,26 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
}
}

private async updateThreadMetadata(): Promise<void> {
this.updatePendingReplyCount();

private async updateThreadFromRootEvent(): Promise<void> {
if (Thread.hasServerSideSupport) {
// Ensure we show *something* as soon as possible, we'll update it as soon as we get better data, but we
// don't want the thread preview to be empty if we can avoid it
if (!this.initialEventsFetched) {
if (!this.initialEventsFetched && !this.lastEvent) {
await this.processRootEvent();
}
await this.fetchRootEvent();
}
await this.processRootEvent();
}

private async updateThreadMetadata(): Promise<void> {
this.updatePendingReplyCount();

if (!this.processRootEventPromise) {
// We only want to do this once otherwise we end up rolling back to the last unsigned summary we have for the thread
this.processRootEventPromise = this.updateThreadFromRootEvent();
}
await this.processRootEventPromise;

if (!this.initialEventsFetched) {
this.initialEventsFetched = true;
Expand Down
Loading