From 377349eacb3fa82f2ddd0eb7a6f0cbb439b40f0e Mon Sep 17 00:00:00 2001 From: Michal Srb Date: Thu, 22 Aug 2024 19:51:41 +0200 Subject: [PATCH] JS: Prevent unpause local state sending duplicate queries (#29115) GitOrigin-RevId: 2bcf0f89670374d2f248e076da8365ee57bb6d87 --- .../convex/src/browser/sync/client.ts | 6 +- .../convex/src/browser/sync/local_state.ts | 78 ++++---- .../convex/src/react/auth_websocket.test.tsx | 189 +++++++++++++++++- 3 files changed, 226 insertions(+), 47 deletions(-) diff --git a/npm-packages/convex/src/browser/sync/client.ts b/npm-packages/convex/src/browser/sync/client.ts index 3066c82d..d4b183f6 100644 --- a/npm-packages/convex/src/browser/sync/client.ts +++ b/npm-packages/convex/src/browser/sync/client.ts @@ -294,11 +294,7 @@ export class BaseConvexClient { } }, onResume: () => { - const remoteQueryResults = new Set( - this.remoteQuerySet.remoteQueryResults().keys(), - ); - const [querySetModification, authModification] = - this.state.resume(remoteQueryResults); + const [querySetModification, authModification] = this.state.resume(); if (authModification) { this.webSocketManager.sendMessage(authModification); } diff --git a/npm-packages/convex/src/browser/sync/local_state.ts b/npm-packages/convex/src/browser/sync/local_state.ts index 1d57dd88..0d9ce66b 100644 --- a/npm-packages/convex/src/browser/sync/local_state.ts +++ b/npm-packages/convex/src/browser/sync/local_state.ts @@ -40,6 +40,7 @@ export class LocalSyncState { private readonly outstandingQueriesOlderThanRestart: Set; private outstandingAuthOlderThanRestart: boolean; private paused: boolean; + private pendingQuerySetModifications: Map; constructor() { this.nextQueryId = 0; @@ -50,6 +51,7 @@ export class LocalSyncState { this.outstandingQueriesOlderThanRestart = new Set(); this.outstandingAuthOlderThanRestart = false; this.paused = false; + this.pendingQuerySetModifications = new Map(); } hasSyncedPastLastReconnect(): boolean { @@ -100,10 +102,6 @@ export class LocalSyncState { const baseVersion = this.querySetVersion; const newVersion = this.querySetVersion + 1; - if (!this.paused) { - this.querySetVersion = newVersion; - } - const add: AddQuery = { type: "Add", queryId, @@ -112,6 +110,13 @@ export class LocalSyncState { journal, componentPath, }; + + if (this.paused) { + this.pendingQuerySetModifications.set(queryId, add); + } else { + this.querySetVersion = newVersion; + } + const modification: QuerySetModification = { type: "ModifyQuerySet", baseVersion, @@ -259,7 +264,12 @@ export class LocalSyncState { restart( oldRemoteQueryResults: Set, ): [QuerySetModification, Authenticate?] { - this.paused = false; + // Restart works whether we are paused or unpaused. + // The `this.pendingQuerySetModifications` is not used + // when restarting as the AddQuery and RemoveQuery are computed + // from scratch, based on the old remote query results, here. + this.unpause(); + this.outstandingQueriesOlderThanRestart.clear(); const modifications = []; for (const localQuery of this.querySet.values()) { @@ -302,44 +312,16 @@ export class LocalSyncState { this.paused = true; } - resume( - remoteQueryResults: Set, - ): [QuerySetModification?, Authenticate?] { - this.paused = false; - const localQueryIds = new Set(); - const modifications = []; - for (const localQuery of this.querySet.values()) { - localQueryIds.add(localQuery.id); - - if (!remoteQueryResults.has(localQuery.id)) { - const add: AddQuery = { - type: "Add", - queryId: localQuery.id, - udfPath: localQuery.canonicalizedUdfPath, - args: [convexToJson(localQuery.args)], - journal: localQuery.journal, - }; - modifications.push(add); - } - } - - for (const remoteQueryId of remoteQueryResults) { - if (!localQueryIds.has(remoteQueryId)) { - const remove: RemoveQuery = { - type: "Remove", - queryId: remoteQueryId, - }; - modifications.push(remove); - } - } - + resume(): [QuerySetModification?, Authenticate?] { const querySet: QuerySetModification | undefined = - modifications.length > 0 + this.pendingQuerySetModifications.size > 0 ? { type: "ModifyQuerySet", baseVersion: this.querySetVersion, newVersion: ++this.querySetVersion, - modifications, + modifications: Array.from( + this.pendingQuerySetModifications.values(), + ), } : undefined; const authenticate: Authenticate | undefined = @@ -350,9 +332,17 @@ export class LocalSyncState { ...this.auth, } : undefined; + + this.unpause(); + return [querySet, authenticate]; } + private unpause() { + this.paused = false; + this.pendingQuerySetModifications.clear(); + } + private removeSubscriber( queryToken: QueryToken, ): QuerySetModification | null { @@ -367,13 +357,19 @@ export class LocalSyncState { this.outstandingQueriesOlderThanRestart.delete(localQuery.id); const baseVersion = this.querySetVersion; const newVersion = this.querySetVersion + 1; - if (!this.paused) { - this.querySetVersion = newVersion; - } const remove: RemoveQuery = { type: "Remove", queryId: localQuery.id, }; + if (this.paused) { + if (this.pendingQuerySetModifications.has(localQuery.id)) { + this.pendingQuerySetModifications.delete(localQuery.id); + } else { + this.pendingQuerySetModifications.set(localQuery.id, remove); + } + } else { + this.querySetVersion = newVersion; + } return { type: "ModifyQuerySet", baseVersion, diff --git a/npm-packages/convex/src/react/auth_websocket.test.tsx b/npm-packages/convex/src/react/auth_websocket.test.tsx index ee6bd3cf..b91973a3 100644 --- a/npm-packages/convex/src/react/auth_websocket.test.tsx +++ b/npm-packages/convex/src/react/auth_websocket.test.tsx @@ -441,7 +441,7 @@ describe.sequential.skip("auth websocket tests", () => { baseVersion: 1, }); - // Make sure we resume + // Make sure we are now unpaused client.watchQuery(anyApi.myQuery.default).onUpdate(() => {}); @@ -460,4 +460,191 @@ describe.sequential.skip("auth websocket tests", () => { }); }); }); + + test("Local state resume doesn't cause duplicate AddQuery", async () => { + await withInMemoryWebSocket(async ({ address, receive }) => { + const client = testReactClient(address); + + // First we subscribe + client.watchQuery(anyApi.myQuery.default).onUpdate(() => {}); + + expect((await receive()).type).toEqual("Connect"); + expect(await receive()).toMatchObject({ + type: "ModifyQuerySet", + modifications: [{ type: "Add", queryId: 0 }], + baseVersion: 0, + }); + + // Before the server confirms, we set auth, leading to pause + // and unpause. + const tokenFetcher = vi.fn(async () => + jwtEncode({ iat: 1234500, exp: 1244500 }, "secret"), + ); + client.setAuth(tokenFetcher); + + // We should only send Authenticate, since we already + // sent the Add modification + expect(await receive()).toMatchObject({ + type: "Authenticate", + baseVersion: 0, + }); + + // Subscribe again + client + .watchQuery(anyApi.myQuery.default, { foo: "bla" }) + .onUpdate(() => {}); + // Now we're sending the second query, not the first! + expect(await receive()).toMatchObject({ + type: "ModifyQuerySet", + modifications: [{ type: "Add", queryId: 1 }], + baseVersion: 1, + }); + }); + }); + + test("Local state resume doesn't send both Add and Remove", async () => { + await withInMemoryWebSocket(async ({ address, receive }) => { + const client = testReactClient(address); + + // First we subscribe to kick off connect. + client.watchQuery(anyApi.myQuery.default).onUpdate(() => {}); + + expect((await receive()).type).toEqual("Connect"); + expect(await receive()).toMatchObject({ + type: "ModifyQuerySet", + modifications: [{ type: "Add", queryId: 0 }], + baseVersion: 0, + }); + + // Set slow auth, causing pause + let resolve: (value: string) => void; + const tokenFetcher2 = vi.fn( + () => + new Promise((r) => { + resolve = r; + }), + ); + client.setAuth(tokenFetcher2); + + // Subscribe to second query, while paused + const unsubscribe = client + .watchQuery(anyApi.myQuery.default, { foo: "bla" }) + .onUpdate(() => {}); + + // Subscribe third query, while paused + client + .watchQuery(anyApi.myQuery.default, { foo: "da" }) + .onUpdate(() => {}); + + // Unsubscribe from second query, while paused + unsubscribe(); + + // Unpause ie resume + resolve!(jwtEncode({ iat: 1234550, exp: 1244550 }, "secret")); + + // We authenticate first + expect(await receive()).toMatchObject({ + type: "Authenticate", + baseVersion: 0, + }); + + // We subscribe to the third query only! + expect(await receive()).toMatchObject({ + type: "ModifyQuerySet", + modifications: [{ type: "Add", queryId: 2 }], + baseVersion: 1, + }); + }); + }); + + test("Local state resume refcounts", async () => { + await withInMemoryWebSocket(async ({ address, receive }) => { + const client = testReactClient(address); + + // First we subscribe to kick off connect. + client.watchQuery(anyApi.myQuery.default).onUpdate(() => {}); + + expect((await receive()).type).toEqual("Connect"); + expect(await receive()).toMatchObject({ + type: "ModifyQuerySet", + modifications: [{ type: "Add", queryId: 0 }], + baseVersion: 0, + }); + + // Set slow auth, causing pause + let resolve: (value: string) => void; + const tokenFetcher2 = vi.fn( + () => + new Promise((r) => { + resolve = r; + }), + ); + client.setAuth(tokenFetcher2); + + // Subscribe to second query, while paused + const unsubscribe = client + .watchQuery(anyApi.myQuery.default, { foo: "bla" }) + .onUpdate(() => {}); + + // Subscribe to the same query, while paused + client + .watchQuery(anyApi.myQuery.default, { foo: "bla" }) + .onUpdate(() => {}); + + // Unsubscribe once from the second query, while paused + unsubscribe(); + + // Unpause ie resume + resolve!(jwtEncode({ iat: 1234550, exp: 1244550 }, "secret")); + + // We authenticate first + expect(await receive()).toMatchObject({ + type: "Authenticate", + baseVersion: 0, + }); + + // We subscribe to the second query, because there's still one subscriber + // on it. + expect(await receive()).toMatchObject({ + type: "ModifyQuerySet", + modifications: [{ type: "Add", queryId: 1 }], + baseVersion: 1, + }); + }); + }); + + test("Local state restart doesn't send both Add and Remove", async () => { + await withInMemoryWebSocket(async ({ address, receive }) => { + const client = testReactClient(address); + + // Set slow auth, causing pause while connecting + let resolve: (value: string) => void; + const tokenFetcher2 = vi.fn( + () => + new Promise((r) => { + resolve = r; + }), + ); + client.setAuth(tokenFetcher2); + + // Subscribe while paused and connecting + const unsubscribe = client + .watchQuery(anyApi.myQuery.default) + .onUpdate(() => {}); + + // Unsubscribe while paused and connecting + unsubscribe(); + + // Unpause + resolve!(jwtEncode({ iat: 1234550, exp: 1244550 }, "secret")); + + expect((await receive()).type).toEqual("Connect"); + expect((await receive()).type).toEqual("Authenticate"); + expect(await receive()).toMatchObject({ + type: "ModifyQuerySet", + baseVersion: 0, + modifications: [], + }); + }); + }); });