Skip to content

Commit

Permalink
Prevent threads code from making identical simultaneous API hits (#3541)
Browse files Browse the repository at this point in the history
  • Loading branch information
t3chguy authored Jul 7, 2023
1 parent 30dd289 commit cd7c519
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 120 deletions.
107 changes: 4 additions & 103 deletions spec/integ/matrix-client-event-timeline.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -598,12 +598,6 @@ describe("MatrixClient event timelines", function () {
await client.stopClient(); // we don't need the client to be syncing at this time
const room = client.getRoom(roomId)!;

httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return THREAD_ROOT;
});

httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
Expand Down Expand Up @@ -634,12 +628,6 @@ describe("MatrixClient event timelines", function () {
const thread = room.createThread(THREAD_ROOT.event_id!, undefined, [], false);
await httpBackend.flushAllExpected();
const timelineSet = thread.timelineSet;
httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return THREAD_ROOT;
});
await flushHttp(emitPromise(thread, ThreadEvent.Update));

const timeline = await client.getEventTimeline(timelineSet, THREAD_REPLY.event_id!);

Expand Down Expand Up @@ -1510,7 +1498,8 @@ describe("MatrixClient event timelines", function () {
},
event: true,
});
THREAD_REPLY2.localTimestamp += 1000;
// this has to come after THREAD_REPLY which hasn't been instantiated by us
THREAD_REPLY2.localTimestamp += 10000000;

// Test data for the first thread, with the second reply
const THREAD_ROOT_UPDATED = {
Expand Down Expand Up @@ -1570,9 +1559,6 @@ describe("MatrixClient event timelines", function () {
thread.initialEventsFetched = true;
const prom = emitPromise(room, ThreadEvent.NewReply);
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD2_ROOT);
await room.addLiveEvents([THREAD_REPLY2]);
await httpBackend.flushAllExpected();
Expand Down Expand Up @@ -1699,13 +1685,11 @@ describe("MatrixClient event timelines", function () {
thread.initialEventsFetched = true;
const prom = emitPromise(room, ThreadEvent.Update);
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD2_ROOT);
await room.addLiveEvents([THREAD_REPLY_REACTION]);
await httpBackend.flushAllExpected();
await prom;
expect(thread.length).toBe(2);
expect(thread.length).toBe(1); // reactions don't count towards the length of a thread
// Test thread order is unchanged
expect(timeline!.getEvents().map((it) => it.event.event_id)).toEqual([
THREAD_ROOT.event_id,
Expand Down Expand Up @@ -2021,97 +2005,14 @@ describe("MatrixClient event timelines", function () {
.respond(200, function () {
return THREAD_ROOT;
});
httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return THREAD_ROOT;
});
httpBackend
.when(
"GET",
"/_matrix/client/v1/rooms/!foo%3Abar/relations/" +
encodeURIComponent(THREAD_ROOT.event_id!) +
"/" +
encodeURIComponent(THREAD_RELATION_TYPE.name) +
buildRelationPaginationQuery({ dir: Direction.Backward, limit: 1 }),
)
.respond(200, function () {
return {
chunk: [THREAD_REPLY],
};
});
await Promise.all([httpBackend.flushAllExpected(), utils.syncPromise(client)]);

const room = client.getRoom(roomId)!;
const thread = room.getThread(THREAD_ROOT.event_id!)!;
expect(thread.initialEventsFetched).toBeTruthy();
const timelineSet = thread.timelineSet;

httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return THREAD_ROOT;
});
httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return THREAD_ROOT;
});
httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return THREAD_ROOT;
});
httpBackend
.when("GET", "/rooms/!foo%3Abar/event/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return THREAD_ROOT;
});
httpBackend
.when("GET", "/rooms/!foo%3Abar/context/" + encodeURIComponent(THREAD_ROOT.event_id!))
.respond(200, function () {
return {
start: "start_token",
events_before: [],
event: THREAD_ROOT,
events_after: [],
end: "end_token",
state: [],
};
});
httpBackend
.when(
"GET",
"/_matrix/client/v1/rooms/!foo%3Abar/relations/" +
encodeURIComponent(THREAD_ROOT.event_id!) +
"/" +
encodeURIComponent(THREAD_RELATION_TYPE.name) +
buildRelationPaginationQuery({
dir: Direction.Backward,
from: "start_token",
}),
)
.respond(200, function () {
return {
chunk: [],
};
});
httpBackend
.when(
"GET",
"/_matrix/client/v1/rooms/!foo%3Abar/relations/" +
encodeURIComponent(THREAD_ROOT.event_id!) +
"/" +
encodeURIComponent(THREAD_RELATION_TYPE.name) +
buildRelationPaginationQuery({ dir: Direction.Forward, from: "end_token" }),
)
.respond(200, function () {
return {
chunk: [THREAD_REPLY],
};
});

const timeline = await flushHttp(client.getEventTimeline(timelineSet, THREAD_ROOT.event_id!));
const timeline = await client.getEventTimeline(timelineSet, THREAD_ROOT.event_id!);

httpBackend.when("GET", "/sync").respond(200, {
next_batch: "s_5_5",
Expand Down
2 changes: 1 addition & 1 deletion spec/test-utils/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export const mkThread = ({
room?.reEmitter.reEmit(evt, [MatrixEventEvent.BeforeRedaction]);
}

const thread = room.createThread(rootEvent.getId() ?? "", rootEvent, events, true);
const thread = room.createThread(rootEvent.getId() ?? "", rootEvent, [rootEvent, ...events], true);

return { thread, rootEvent, events };
};
12 changes: 6 additions & 6 deletions spec/unit/models/thread.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { mocked } from "jest-mock";

import { MatrixClient, PendingEventOrdering } from "../../../src/client";
import { Room, RoomEvent } from "../../../src/models/room";
import { Thread, THREAD_RELATION_TYPE, ThreadEvent, FeatureSupport } from "../../../src/models/thread";
import { FeatureSupport, Thread, THREAD_RELATION_TYPE, ThreadEvent } from "../../../src/models/thread";
import { makeThreadEvent, mkThread } from "../../test-utils/thread";
import { TestClient } from "../../TestClient";
import { emitPromise, mkEdit, mkMessage, mkReaction, mock } from "../../test-utils/test-utils";
Expand All @@ -43,6 +43,7 @@ describe("Thread", () => {
const myUserId = "@bob:example.org";
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, { timelineSupport: false });
const client = testClient.client;
client.supportsThreads = jest.fn().mockReturnValue(true);
const room = new Room("123", client, myUserId, {
pendingEventOrdering: PendingEventOrdering.Detached,
});
Expand Down Expand Up @@ -300,6 +301,7 @@ describe("Thread", () => {
timelineSupport: false,
});
const client = testClient.client;
client.supportsThreads = jest.fn().mockReturnValue(true);
const room = new Room("123", client, myUserId, {
pendingEventOrdering: PendingEventOrdering.Detached,
});
Expand Down Expand Up @@ -354,6 +356,7 @@ describe("Thread", () => {
timelineSupport: false,
});
const client = testClient.client;
client.supportsThreads = jest.fn().mockReturnValue(true);
const room = new Room("123", client, myUserId, {
pendingEventOrdering: PendingEventOrdering.Detached,
});
Expand Down Expand Up @@ -405,6 +408,7 @@ describe("Thread", () => {
timelineSupport: false,
});
const client = testClient.client;
client.supportsThreads = jest.fn().mockReturnValue(true);
const room = new Room("123", client, myUserId, {
pendingEventOrdering: PendingEventOrdering.Detached,
});
Expand Down Expand Up @@ -699,11 +703,7 @@ async function createThread(client: MatrixClient, user: string, roomId: string):
root.setThreadId(root.getId());
await room.addLiveEvents([root]);

// Create the thread and wait for it to be initialised
const thread = room.createThread(root.getId()!, root, [], false);
await new Promise<void>((res) => thread.once(RoomEvent.TimelineReset, () => res()));

return thread;
return room.createThread(root.getId()!, root, [], false);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion spec/unit/room.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2787,10 +2787,10 @@ describe("Room", function () {
let prom = emitPromise(room, ThreadEvent.New);
await room.addLiveEvents([threadRoot, threadResponse1]);
const thread: Thread = await prom;
await emitPromise(room, ThreadEvent.Update);

expect(thread.initialEventsFetched).toBeTruthy();
await room.addLiveEvents([threadResponse2]);
await emitPromise(room, ThreadEvent.Update);
expect(thread).toHaveLength(2);
expect(thread.replyToEvent!.getId()).toBe(threadResponse2.getId());

Expand Down
55 changes: 46 additions & 9 deletions src/models/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
public readonly room: Room;
public readonly client: MatrixClient;
private readonly pendingEventOrdering: PendingEventOrdering;
private processRootEventPromise?: Promise<void>;

public initialEventsFetched = !Thread.hasServerSideSupport;
/**
Expand Down Expand Up @@ -134,6 +135,7 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
this.room.on(MatrixEventEvent.BeforeRedaction, this.onBeforeRedaction);
this.room.on(RoomEvent.Redaction, this.onRedaction);
this.room.on(RoomEvent.LocalEchoUpdated, this.onLocalEcho);
this.room.on(RoomEvent.TimelineReset, this.onTimelineReset);
this.timelineSet.on(RoomEvent.Timeline, this.onTimelineEvent);

this.processReceipts(opts.receipts);
Expand All @@ -144,6 +146,12 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
this.setEventMetadata(this.rootEvent);
}

private onTimelineReset = async (): Promise<void> => {
// We hit a gappy sync, ask the server for an update
await this.processRootEventPromise;
this.processRootEventPromise = undefined;
};

private async fetchRootEvent(): Promise<void> {
this.rootEvent = this.room.findEventById(this.id);
// If the rootEvent does not exist in the local stores, then fetch it from the server.
Expand Down Expand Up @@ -197,6 +205,11 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
this._currentUserParticipated = false;
this.emit(ThreadEvent.Delete, this);
} else {
if (this.lastEvent?.getId() === event.getAssociatedId()) {
// XXX: If our last event got redacted we query the server for the last event once again
await this.processRootEventPromise;
this.processRootEventPromise = undefined;
}
await this.updateThreadMetadata();
}
};
Expand All @@ -212,6 +225,9 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
if (sender && room && this.shouldSendLocalEchoReceipt(sender, event)) {
room.addLocalEchoReceipt(sender, event, ReceiptType.Read);
}
if (event.getId() !== this.id && event.isRelation(THREAD_RELATION_TYPE.name)) {
this.replyCount++;
}
}
this.onEcho(event, toStartOfTimeline ?? false);
};
Expand Down Expand Up @@ -245,6 +261,8 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
await this.updateThreadMetadata();
if (!event.isRelation(THREAD_RELATION_TYPE.name)) return; // don't send a new reply event for reactions or edits
if (toStartOfTimeline) return; // ignore messages added to the start of the timeline
// Clear the lastEvent and instead start tracking locally using lastReply
this.lastEvent = undefined;
this.emit(ThreadEvent.NewReply, this, event);
};

Expand Down Expand Up @@ -308,6 +326,11 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
public async addEvent(event: MatrixEvent, toStartOfTimeline: boolean, emit = true): Promise<void> {
this.setEventMetadata(event);

if (!this.initialEventsFetched && !toStartOfTimeline && event.getId() === this.id) {
// We're loading the thread organically
this.initialEventsFetched = true;
}

const lastReply = this.lastReply();
const isNewestReply = !lastReply || event.localTimestamp >= lastReply!.localTimestamp;

Expand Down Expand Up @@ -351,10 +374,14 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
return;
}

// If no thread support exists we want to count all thread relation
// added as a reply. We can't rely on the bundled relationships count
if ((!Thread.hasServerSideSupport || !this.rootEvent) && event.isRelation(THREAD_RELATION_TYPE.name)) {
this.replyCount++;
if (
event.getId() !== this.id &&
event.isRelation(THREAD_RELATION_TYPE.name) &&
!toStartOfTimeline &&
isNewestReply
) {
// Clear the last event as we have the latest end of the timeline
this.lastEvent = undefined;
}

if (emit) {
Expand Down Expand Up @@ -475,18 +502,26 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
}
}

private async updateThreadMetadata(): Promise<void> {
this.updatePendingReplyCount();

private async updateThreadFromRootEvent(): Promise<void> {
if (Thread.hasServerSideSupport) {
// Ensure we show *something* as soon as possible, we'll update it as soon as we get better data, but we
// don't want the thread preview to be empty if we can avoid it
if (!this.initialEventsFetched) {
if (!this.initialEventsFetched && !this.lastEvent) {
await this.processRootEvent();
}
await this.fetchRootEvent();
}
await this.processRootEvent();
}

private async updateThreadMetadata(): Promise<void> {
this.updatePendingReplyCount();

if (!this.processRootEventPromise) {
// We only want to do this once otherwise we end up rolling back to the last unsigned summary we have for the thread
this.processRootEventPromise = this.updateThreadFromRootEvent();
}
await this.processRootEventPromise;

if (!this.initialEventsFetched) {
this.initialEventsFetched = true;
Expand Down Expand Up @@ -572,7 +607,9 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
/**
* Return last reply to the thread, if known.
*/
public lastReply(matches: (ev: MatrixEvent) => boolean = (): boolean => true): MatrixEvent | null {
public lastReply(
matches: (ev: MatrixEvent) => boolean = (ev): boolean => ev.isRelation(RelationType.Thread),
): MatrixEvent | null {
for (let i = this.timeline.length - 1; i >= 0; i--) {
const event = this.timeline[i];
if (matches(event)) {
Expand Down

0 comments on commit cd7c519

Please sign in to comment.