Skip to content

Commit

Permalink
Abort onSnapshotListeners on terminate.
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-andersen committed Jul 29, 2024
1 parent b9244a5 commit 085499b
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 7 deletions.
1 change: 1 addition & 0 deletions packages/firestore/src/core/component_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -485,5 +485,6 @@ export class OnlineComponentProvider {
async terminate(): Promise<void> {
await remoteStoreShutdown(this.remoteStore);
this.datastore?.terminate();
this.eventManager?.terminate();
}
}
38 changes: 32 additions & 6 deletions packages/firestore/src/core/event_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import { debugAssert, debugCast } from '../util/assert';
import { wrapInUserErrorIfRecoverable } from '../util/async_queue';
import { FirestoreError } from '../util/error';
import { Code, FirestoreError } from '../util/error';
import { EventHandler } from '../util/misc';
import { ObjectMap } from '../util/obj_map';

Expand Down Expand Up @@ -64,19 +64,17 @@ export interface EventManager {
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise<void>;
onFirstRemoteStoreListen?: (query: Query) => Promise<void>;
onLastRemoteStoreUnlisten?: (query: Query) => Promise<void>;
terminate(): void;
}

export function newEventManager(): EventManager {
return new EventManagerImpl();
}

export class EventManagerImpl implements EventManager {
queries = new ObjectMap<Query, QueryListenersInfo>(
q => canonifyQuery(q),
queryEquals
);
queries: ObjectMap<Query, QueryListenersInfo> = newQueriesObjectMap();

onlineState = OnlineState.Unknown;
onlineState: OnlineState = OnlineState.Unknown;

snapshotsInSyncListeners: Set<Observer<void>> = new Set();

Expand All @@ -98,6 +96,20 @@ export class EventManagerImpl implements EventManager {
* still listening to the cache.
*/
onLastRemoteStoreUnlisten?: (query: Query) => Promise<void>;

terminate(): void {
errorAllTargets(
this,
new FirestoreError(Code.ABORTED, 'Firestore shutting down')
);
}
}

function newQueriesObjectMap(): ObjectMap<Query, QueryListenersInfo> {
return new ObjectMap<Query, QueryListenersInfo>(
q => canonifyQuery(q),
queryEquals
);
}

function validateEventManager(eventManagerImpl: EventManagerImpl): void {
Expand Down Expand Up @@ -334,6 +346,20 @@ export function removeSnapshotsInSyncListener(
eventManagerImpl.snapshotsInSyncListeners.delete(observer);
}

function errorAllTargets(eventManager: EventManager, error: FirestoreError) {

Check failure on line 349 in packages/firestore/src/core/event_manager.ts

View workflow job for this annotation

GitHub Actions / Lint

Missing return type on function
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
const queries = eventManagerImpl.queries;

// Prevent further access by clearing ObjectMap.
eventManagerImpl.queries = newQueriesObjectMap();

queries.forEach((_, queryInfo) => {
for (const listener of queryInfo.listeners) {
listener.onError(error);
}
});
}

// Call all global snapshot listeners that have been set.
function raiseSnapshotsInSyncEvent(eventManagerImpl: EventManagerImpl): void {
eventManagerImpl.snapshotsInSyncListeners.forEach(observer => {
Expand Down
19 changes: 18 additions & 1 deletion packages/firestore/test/integration/api/database.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ import {
FieldPath,
newTestFirestore,
SnapshotOptions,
newTestApp
newTestApp,
FirestoreError
} from '../util/firebase_export';
import {
apiDescribe,
Expand Down Expand Up @@ -1442,6 +1443,22 @@ apiDescribe('Database', persistence => {
});
});

it('query listener throws error on termination', async () => {
return withTestDoc(persistence, async (docRef, firestore) => {
const deferred: Deferred<FirestoreError> = new Deferred();
const unsubscribe = onSnapshot(docRef, snapshot => {}, deferred.resolve);

await terminate(firestore);

await expect(deferred.promise)
.to.eventually.haveOwnProperty('message')
.equal('Firestore shutting down');

// Call should proceed without error.
unsubscribe();
});
});

it('can wait for pending writes', async () => {
await withTestDoc(persistence, async (docRef, firestore) => {
// Prevent pending writes receiving acknowledgement.
Expand Down

0 comments on commit 085499b

Please sign in to comment.