Skip to content

Commit

Permalink
JS: Prevent unpause local state sending duplicate queries (#29115)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 2bcf0f89670374d2f248e076da8365ee57bb6d87
  • Loading branch information
xixixao authored and Convex, Inc. committed Aug 22, 2024
1 parent 609b105 commit 377349e
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 47 deletions.
6 changes: 1 addition & 5 deletions npm-packages/convex/src/browser/sync/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,7 @@ export class BaseConvexClient {
}
},
onResume: () => {
const remoteQueryResults = new Set(
this.remoteQuerySet.remoteQueryResults().keys(),
);
const [querySetModification, authModification] =
this.state.resume(remoteQueryResults);
const [querySetModification, authModification] = this.state.resume();
if (authModification) {
this.webSocketManager.sendMessage(authModification);
}
Expand Down
78 changes: 37 additions & 41 deletions npm-packages/convex/src/browser/sync/local_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export class LocalSyncState {
private readonly outstandingQueriesOlderThanRestart: Set<QueryId>;
private outstandingAuthOlderThanRestart: boolean;
private paused: boolean;
private pendingQuerySetModifications: Map<QueryId, AddQuery | RemoveQuery>;

constructor() {
this.nextQueryId = 0;
Expand All @@ -50,6 +51,7 @@ export class LocalSyncState {
this.outstandingQueriesOlderThanRestart = new Set();
this.outstandingAuthOlderThanRestart = false;
this.paused = false;
this.pendingQuerySetModifications = new Map();
}

hasSyncedPastLastReconnect(): boolean {
Expand Down Expand Up @@ -100,10 +102,6 @@ export class LocalSyncState {
const baseVersion = this.querySetVersion;
const newVersion = this.querySetVersion + 1;

if (!this.paused) {
this.querySetVersion = newVersion;
}

const add: AddQuery = {
type: "Add",
queryId,
Expand All @@ -112,6 +110,13 @@ export class LocalSyncState {
journal,
componentPath,
};

if (this.paused) {
this.pendingQuerySetModifications.set(queryId, add);
} else {
this.querySetVersion = newVersion;
}

const modification: QuerySetModification = {
type: "ModifyQuerySet",
baseVersion,
Expand Down Expand Up @@ -259,7 +264,12 @@ export class LocalSyncState {
restart(
oldRemoteQueryResults: Set<QueryId>,
): [QuerySetModification, Authenticate?] {
this.paused = false;
// Restart works whether we are paused or unpaused.
// The `this.pendingQuerySetModifications` is not used
// when restarting as the AddQuery and RemoveQuery are computed
// from scratch, based on the old remote query results, here.
this.unpause();

this.outstandingQueriesOlderThanRestart.clear();
const modifications = [];
for (const localQuery of this.querySet.values()) {
Expand Down Expand Up @@ -302,44 +312,16 @@ export class LocalSyncState {
this.paused = true;
}

resume(
remoteQueryResults: Set<QueryId>,
): [QuerySetModification?, Authenticate?] {
this.paused = false;
const localQueryIds = new Set();
const modifications = [];
for (const localQuery of this.querySet.values()) {
localQueryIds.add(localQuery.id);

if (!remoteQueryResults.has(localQuery.id)) {
const add: AddQuery = {
type: "Add",
queryId: localQuery.id,
udfPath: localQuery.canonicalizedUdfPath,
args: [convexToJson(localQuery.args)],
journal: localQuery.journal,
};
modifications.push(add);
}
}

for (const remoteQueryId of remoteQueryResults) {
if (!localQueryIds.has(remoteQueryId)) {
const remove: RemoveQuery = {
type: "Remove",
queryId: remoteQueryId,
};
modifications.push(remove);
}
}

resume(): [QuerySetModification?, Authenticate?] {
const querySet: QuerySetModification | undefined =
modifications.length > 0
this.pendingQuerySetModifications.size > 0
? {
type: "ModifyQuerySet",
baseVersion: this.querySetVersion,
newVersion: ++this.querySetVersion,
modifications,
modifications: Array.from(
this.pendingQuerySetModifications.values(),
),
}
: undefined;
const authenticate: Authenticate | undefined =
Expand All @@ -350,9 +332,17 @@ export class LocalSyncState {
...this.auth,
}
: undefined;

this.unpause();

return [querySet, authenticate];
}

private unpause() {
this.paused = false;
this.pendingQuerySetModifications.clear();
}

private removeSubscriber(
queryToken: QueryToken,
): QuerySetModification | null {
Expand All @@ -367,13 +357,19 @@ export class LocalSyncState {
this.outstandingQueriesOlderThanRestart.delete(localQuery.id);
const baseVersion = this.querySetVersion;
const newVersion = this.querySetVersion + 1;
if (!this.paused) {
this.querySetVersion = newVersion;
}
const remove: RemoveQuery = {
type: "Remove",
queryId: localQuery.id,
};
if (this.paused) {
if (this.pendingQuerySetModifications.has(localQuery.id)) {
this.pendingQuerySetModifications.delete(localQuery.id);
} else {
this.pendingQuerySetModifications.set(localQuery.id, remove);
}
} else {
this.querySetVersion = newVersion;
}
return {
type: "ModifyQuerySet",
baseVersion,
Expand Down
189 changes: 188 additions & 1 deletion npm-packages/convex/src/react/auth_websocket.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ describe.sequential.skip("auth websocket tests", () => {
baseVersion: 1,
});

// Make sure we resume
// Make sure we are now unpaused

client.watchQuery(anyApi.myQuery.default).onUpdate(() => {});

Expand All @@ -460,4 +460,191 @@ describe.sequential.skip("auth websocket tests", () => {
});
});
});

test("Local state resume doesn't cause duplicate AddQuery", async () => {
await withInMemoryWebSocket(async ({ address, receive }) => {
const client = testReactClient(address);

// First we subscribe
client.watchQuery(anyApi.myQuery.default).onUpdate(() => {});

expect((await receive()).type).toEqual("Connect");
expect(await receive()).toMatchObject({
type: "ModifyQuerySet",
modifications: [{ type: "Add", queryId: 0 }],
baseVersion: 0,
});

// Before the server confirms, we set auth, leading to pause
// and unpause.
const tokenFetcher = vi.fn(async () =>
jwtEncode({ iat: 1234500, exp: 1244500 }, "secret"),
);
client.setAuth(tokenFetcher);

// We should only send Authenticate, since we already
// sent the Add modification
expect(await receive()).toMatchObject({
type: "Authenticate",
baseVersion: 0,
});

// Subscribe again
client
.watchQuery(anyApi.myQuery.default, { foo: "bla" })
.onUpdate(() => {});
// Now we're sending the second query, not the first!
expect(await receive()).toMatchObject({
type: "ModifyQuerySet",
modifications: [{ type: "Add", queryId: 1 }],
baseVersion: 1,
});
});
});

test("Local state resume doesn't send both Add and Remove", async () => {
await withInMemoryWebSocket(async ({ address, receive }) => {
const client = testReactClient(address);

// First we subscribe to kick off connect.
client.watchQuery(anyApi.myQuery.default).onUpdate(() => {});

expect((await receive()).type).toEqual("Connect");
expect(await receive()).toMatchObject({
type: "ModifyQuerySet",
modifications: [{ type: "Add", queryId: 0 }],
baseVersion: 0,
});

// Set slow auth, causing pause
let resolve: (value: string) => void;
const tokenFetcher2 = vi.fn(
() =>
new Promise<string>((r) => {
resolve = r;
}),
);
client.setAuth(tokenFetcher2);

// Subscribe to second query, while paused
const unsubscribe = client
.watchQuery(anyApi.myQuery.default, { foo: "bla" })
.onUpdate(() => {});

// Subscribe third query, while paused
client
.watchQuery(anyApi.myQuery.default, { foo: "da" })
.onUpdate(() => {});

// Unsubscribe from second query, while paused
unsubscribe();

// Unpause ie resume
resolve!(jwtEncode({ iat: 1234550, exp: 1244550 }, "secret"));

// We authenticate first
expect(await receive()).toMatchObject({
type: "Authenticate",
baseVersion: 0,
});

// We subscribe to the third query only!
expect(await receive()).toMatchObject({
type: "ModifyQuerySet",
modifications: [{ type: "Add", queryId: 2 }],
baseVersion: 1,
});
});
});

test("Local state resume refcounts", async () => {
await withInMemoryWebSocket(async ({ address, receive }) => {
const client = testReactClient(address);

// First we subscribe to kick off connect.
client.watchQuery(anyApi.myQuery.default).onUpdate(() => {});

expect((await receive()).type).toEqual("Connect");
expect(await receive()).toMatchObject({
type: "ModifyQuerySet",
modifications: [{ type: "Add", queryId: 0 }],
baseVersion: 0,
});

// Set slow auth, causing pause
let resolve: (value: string) => void;
const tokenFetcher2 = vi.fn(
() =>
new Promise<string>((r) => {
resolve = r;
}),
);
client.setAuth(tokenFetcher2);

// Subscribe to second query, while paused
const unsubscribe = client
.watchQuery(anyApi.myQuery.default, { foo: "bla" })
.onUpdate(() => {});

// Subscribe to the same query, while paused
client
.watchQuery(anyApi.myQuery.default, { foo: "bla" })
.onUpdate(() => {});

// Unsubscribe once from the second query, while paused
unsubscribe();

// Unpause ie resume
resolve!(jwtEncode({ iat: 1234550, exp: 1244550 }, "secret"));

// We authenticate first
expect(await receive()).toMatchObject({
type: "Authenticate",
baseVersion: 0,
});

// We subscribe to the second query, because there's still one subscriber
// on it.
expect(await receive()).toMatchObject({
type: "ModifyQuerySet",
modifications: [{ type: "Add", queryId: 1 }],
baseVersion: 1,
});
});
});

test("Local state restart doesn't send both Add and Remove", async () => {
await withInMemoryWebSocket(async ({ address, receive }) => {
const client = testReactClient(address);

// Set slow auth, causing pause while connecting
let resolve: (value: string) => void;
const tokenFetcher2 = vi.fn(
() =>
new Promise<string>((r) => {
resolve = r;
}),
);
client.setAuth(tokenFetcher2);

// Subscribe while paused and connecting
const unsubscribe = client
.watchQuery(anyApi.myQuery.default)
.onUpdate(() => {});

// Unsubscribe while paused and connecting
unsubscribe();

// Unpause
resolve!(jwtEncode({ iat: 1234550, exp: 1244550 }, "secret"));

expect((await receive()).type).toEqual("Connect");
expect((await receive()).type).toEqual("Authenticate");
expect(await receive()).toMatchObject({
type: "ModifyQuerySet",
baseVersion: 0,
modifications: [],
});
});
});
});

0 comments on commit 377349e

Please sign in to comment.