Skip to content

Commit

Permalink
Merge pull request #47913 from gedu/gedu/replace_reconnectapp_most_up…
Browse files Browse the repository at this point in the history
…dated

De-dupe ReconnectApp in the persisted requests queue
  • Loading branch information
roryabraham authored Oct 2, 2024
2 parents d075536 + 0877308 commit bad93d5
Show file tree
Hide file tree
Showing 12 changed files with 658 additions and 95 deletions.
2 changes: 2 additions & 0 deletions src/ONYXKEYS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const ONYXKEYS = {

/** Note: These are Persisted Requests - not all requests in the main queue as the key name might lead one to believe */
PERSISTED_REQUESTS: 'networkRequestQueue',
PERSISTED_ONGOING_REQUESTS: 'networkOngoingRequestQueue',

/** Stores current date */
CURRENT_DATE: 'currentDate',
Expand Down Expand Up @@ -855,6 +856,7 @@ type OnyxValuesMapping = {
[ONYXKEYS.DEVICE_ID]: string;
[ONYXKEYS.IS_SIDEBAR_LOADED]: boolean;
[ONYXKEYS.PERSISTED_REQUESTS]: OnyxTypes.Request[];
[ONYXKEYS.PERSISTED_ONGOING_REQUESTS]: OnyxTypes.Request;
[ONYXKEYS.CURRENT_DATE]: string;
[ONYXKEYS.CREDENTIALS]: OnyxTypes.Credentials;
[ONYXKEYS.STASHED_CREDENTIALS]: OnyxTypes.Credentials;
Expand Down
21 changes: 17 additions & 4 deletions src/libs/API/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import * as Request from '@libs/Request';
import * as PersistedRequests from '@userActions/PersistedRequests';
import CONST from '@src/CONST';
import type OnyxRequest from '@src/types/onyx/Request';
import type {PaginatedRequest, PaginationConfig} from '@src/types/onyx/Request';
import type {PaginatedRequest, PaginationConfig, RequestConflictResolver} from '@src/types/onyx/Request';
import type Response from '@src/types/onyx/Response';
import type {ApiCommand, ApiRequestCommandParameters, ApiRequestType, CommandOfType, ReadCommand, SideEffectRequestCommand, WriteCommand} from './types';

Expand Down Expand Up @@ -45,7 +45,13 @@ type OnyxData = {
/**
* Prepare the request to be sent. Bind data together with request metadata and apply optimistic Onyx data.
*/
function prepareRequest<TCommand extends ApiCommand>(command: TCommand, type: ApiRequestType, params: ApiRequestCommandParameters[TCommand], onyxData: OnyxData = {}): OnyxRequest {
function prepareRequest<TCommand extends ApiCommand>(
command: TCommand,
type: ApiRequestType,
params: ApiRequestCommandParameters[TCommand],
onyxData: OnyxData = {},
conflictResolver: RequestConflictResolver = {},
): OnyxRequest {
Log.info('[API] Preparing request', false, {command, type});

const {optimisticData, ...onyxDataWithoutOptimisticData} = onyxData;
Expand All @@ -71,6 +77,7 @@ function prepareRequest<TCommand extends ApiCommand>(command: TCommand, type: Ap
command,
data,
...onyxDataWithoutOptimisticData,
...conflictResolver,
};

if (isWriteRequest) {
Expand Down Expand Up @@ -116,9 +123,15 @@ function processRequest(request: OnyxRequest, type: ApiRequestType): Promise<voi
* @param [onyxData.failureData] - Onyx instructions that will be passed to Onyx.update() when the response has jsonCode !== 200.
* @param [onyxData.finallyData] - Onyx instructions that will be passed to Onyx.update() when the response has jsonCode === 200 or jsonCode !== 200.
*/
function write<TCommand extends WriteCommand>(command: TCommand, apiCommandParameters: ApiRequestCommandParameters[TCommand], onyxData: OnyxData = {}): Promise<void | Response> {

function write<TCommand extends WriteCommand>(
command: TCommand,
apiCommandParameters: ApiRequestCommandParameters[TCommand],
onyxData: OnyxData = {},
conflictResolver: RequestConflictResolver = {},
): Promise<void | Response> {
Log.info('[API] Called API write', false, {command, ...apiCommandParameters});
const request = prepareRequest(command, CONST.API_REQUEST_TYPE.WRITE, apiCommandParameters, onyxData);
const request = prepareRequest(command, CONST.API_REQUEST_TYPE.WRITE, apiCommandParameters, onyxData, conflictResolver);
return processRequest(request, CONST.API_REQUEST_TYPE.WRITE);
}

Expand Down
23 changes: 15 additions & 8 deletions src/libs/Middleware/HandleUnusedOptimisticID.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,21 @@ const handleUnusedOptimisticID: Middleware = (requestResponse, request, isFromSe
return;
}
const oldReportID = request.data?.reportID;
const offset = isFromSequentialQueue ? 1 : 0;
PersistedRequests.getAll()
.slice(offset)
.forEach((persistedRequest, index) => {
const persistedRequestClone = clone(persistedRequest);
persistedRequestClone.data = deepReplaceKeysAndValues(persistedRequest.data, oldReportID as string, preexistingReportID);
PersistedRequests.update(index + offset, persistedRequestClone);
});

if (isFromSequentialQueue) {
const ongoingRequest = PersistedRequests.getOngoingRequest();
if (ongoingRequest && ongoingRequest.data?.reportID === oldReportID) {
const ongoingRequestClone = clone(ongoingRequest);
ongoingRequestClone.data = deepReplaceKeysAndValues(ongoingRequest.data, oldReportID as string, preexistingReportID);
PersistedRequests.updateOngoingRequest(ongoingRequestClone);
}
}

PersistedRequests.getAll().forEach((persistedRequest, index) => {
const persistedRequestClone = clone(persistedRequest);
persistedRequestClone.data = deepReplaceKeysAndValues(persistedRequest.data, oldReportID as string, preexistingReportID);
PersistedRequests.update(index, persistedRequestClone);
});
});
return response;
});
Expand Down
48 changes: 36 additions & 12 deletions src/libs/Network/SequentialQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ let isReadyPromise = new Promise((resolve) => {
resolveIsReadyPromise?.();

let isSequentialQueueRunning = false;
let currentRequest: Promise<void> | null = null;
let currentRequestPromise: Promise<void> | null = null;
let isQueuePaused = false;

/**
Expand Down Expand Up @@ -80,20 +80,22 @@ function process(): Promise<void> {
return Promise.resolve();
}

const requestToProcess = persistedRequests.at(0);
const requestToProcess = PersistedRequests.processNextRequest();
if (!requestToProcess) {
Log.info('[SequentialQueue] Unable to process. No next request to handle.');
return Promise.resolve();
}

// Set the current request to a promise awaiting its processing so that getCurrentRequest can be used to take some action after the current request has processed.
currentRequest = Request.processWithMiddleware(requestToProcess, true)
currentRequestPromise = Request.processWithMiddleware(requestToProcess, true)
.then((response) => {
// A response might indicate that the queue should be paused. This happens when a gap in onyx updates is detected between the client and the server and
// that gap needs resolved before the queue can continue.
if (response?.shouldPauseQueue) {
Log.info("[SequentialQueue] Handled 'shouldPauseQueue' in response. Pausing the queue.");
pause();
}

PersistedRequests.remove(requestToProcess);
RequestThrottle.clear();
return process();
Expand All @@ -106,6 +108,7 @@ function process(): Promise<void> {
RequestThrottle.clear();
return process();
}
PersistedRequests.rollbackOngoingRequest();
return RequestThrottle.sleep(error, requestToProcess.command)
.then(process)
.catch(() => {
Expand All @@ -116,7 +119,7 @@ function process(): Promise<void> {
});
});

return currentRequest;
return currentRequestPromise;
}

function flush() {
Expand Down Expand Up @@ -164,7 +167,8 @@ function flush() {
if (NetworkStore.isOffline() || PersistedRequests.getAll().length === 0) {
resolveIsReadyPromise?.();
}
currentRequest = null;
currentRequestPromise = null;

// The queue can be paused when we sync the data with backend so we should only update the Onyx data when the queue is empty
if (PersistedRequests.getAll().length === 0) {
flushOnyxUpdatesQueue();
Expand All @@ -184,7 +188,7 @@ function unpause() {
}

const numberOfPersistedRequests = PersistedRequests.getAll().length || 0;
console.debug(`[SequentialQueue] Unpausing the queue and flushing ${numberOfPersistedRequests} requests`);
Log.info(`[SequentialQueue] Unpausing the queue and flushing ${numberOfPersistedRequests} requests`);
isQueuePaused = false;
flush();
}
Expand All @@ -200,9 +204,29 @@ function isPaused(): boolean {
// Flush the queue when the connection resumes
NetworkStore.onReconnection(flush);

function push(request: OnyxRequest) {
// Add request to Persisted Requests so that it can be retried if it fails
PersistedRequests.save(request);
function push(newRequest: OnyxRequest) {
const {checkAndFixConflictingRequest} = newRequest;

if (checkAndFixConflictingRequest) {
const requests = PersistedRequests.getAll();
const {conflictAction} = checkAndFixConflictingRequest(requests);
Log.info(`[SequentialQueue] Conflict action for command ${newRequest.command} - ${conflictAction.type}:`);

// don't try to serialize a function.
// eslint-disable-next-line no-param-reassign
delete newRequest.checkAndFixConflictingRequest;

if (conflictAction.type === 'push') {
PersistedRequests.save(newRequest);
} else if (conflictAction.type === 'replace') {
PersistedRequests.update(conflictAction.index, newRequest);
} else {
Log.info(`[SequentialQueue] No action performed to command ${newRequest.command} and it will be ignored.`);
}
} else {
// Add request to Persisted Requests so that it can be retried if it fails
PersistedRequests.save(newRequest);
}

// If we are offline we don't need to trigger the queue to empty as it will happen when we come back online
if (NetworkStore.isOffline()) {
Expand All @@ -219,10 +243,10 @@ function push(request: OnyxRequest) {
}

function getCurrentRequest(): Promise<void> {
if (currentRequest === null) {
if (currentRequestPromise === null) {
return Promise.resolve();
}
return currentRequest;
return currentRequestPromise;
}

/**
Expand All @@ -232,5 +256,5 @@ function waitForIdle(): Promise<unknown> {
return isReadyPromise;
}

export {flush, getCurrentRequest, isRunning, isPaused, push, waitForIdle, pause, unpause};
export {flush, getCurrentRequest, isRunning, isPaused, push, waitForIdle, pause, unpause, process};
export type {RequestError};
1 change: 0 additions & 1 deletion src/libs/ReportActionsUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ Onyx.connect({
if (!actions) {
return;
}

allReportActions = actions;
},
});
Expand Down
20 changes: 19 additions & 1 deletion src/libs/actions/App.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,25 @@ function reconnectApp(updateIDFrom: OnyxEntry<number> = 0) {
params.updateIDFrom = updateIDFrom;
}

API.write(WRITE_COMMANDS.RECONNECT_APP, params, getOnyxDataForOpenOrReconnect());
API.write(WRITE_COMMANDS.RECONNECT_APP, params, getOnyxDataForOpenOrReconnect(), {
checkAndFixConflictingRequest: (persistedRequests) => {
const index = persistedRequests.findIndex((request) => request.command === WRITE_COMMANDS.RECONNECT_APP);
if (index === -1) {
return {
conflictAction: {
type: 'push',
},
};
}

return {
conflictAction: {
type: 'replace',
index,
},
};
},
});
});
}

Expand Down
87 changes: 80 additions & 7 deletions src/libs/actions/PersistedRequests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,47 @@ import ONYXKEYS from '@src/ONYXKEYS';
import type {Request} from '@src/types/onyx';

let persistedRequests: Request[] = [];
let ongoingRequest: Request | null = null;

Onyx.connect({
key: ONYXKEYS.PERSISTED_REQUESTS,
callback: (val) => (persistedRequests = val ?? []),
callback: (val) => {
persistedRequests = val ?? [];

if (ongoingRequest && persistedRequests.length > 0) {
const nextRequestToProcess = persistedRequests.at(0);

// We try to remove the next request from the persistedRequests if it is the same as ongoingRequest
// so we don't process it twice.
if (isEqual(nextRequestToProcess, ongoingRequest)) {
persistedRequests = persistedRequests.slice(1);
}
}
},
});
Onyx.connect({
key: ONYXKEYS.PERSISTED_ONGOING_REQUESTS,
callback: (val) => {
ongoingRequest = val ?? null;
},
});

/**
* This promise is only used by tests. DO NOT USE THIS PROMISE IN THE APPLICATION CODE
*/
function clear() {
ongoingRequest = null;
Onyx.set(ONYXKEYS.PERSISTED_ONGOING_REQUESTS, null);
return Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, []);
}

function getLength(): number {
return persistedRequests.length;
// Making it backwards compatible with the old implementation
return persistedRequests.length + (ongoingRequest ? 1 : 0);
}

function save(requestToPersist: Request) {
// If the command is not in the keepLastInstance array, add the new request as usual
const requests = [...persistedRequests, requestToPersist];
persistedRequests = requests;
Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests).then(() => {
Expand All @@ -31,18 +54,24 @@ function save(requestToPersist: Request) {
}

function remove(requestToRemove: Request) {
ongoingRequest = null;
/**
* We only remove the first matching request because the order of requests matters.
* If we were to remove all matching requests, we can end up with a final state that is different than what the user intended.
*/
const requests = [...persistedRequests];
const index = requests.findIndex((persistedRequest) => isEqual(persistedRequest, requestToRemove));
if (index === -1) {
return;

if (index !== -1) {
requests.splice(index, 1);
}
requests.splice(index, 1);

persistedRequests = requests;
Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests).then(() => {

Onyx.multiSet({
[ONYXKEYS.PERSISTED_REQUESTS]: persistedRequests,
[ONYXKEYS.PERSISTED_ONGOING_REQUESTS]: null,
}).then(() => {
Log.info(`[SequentialQueue] '${requestToRemove.command}' removed from the queue. Queue length is ${getLength()}`);
});
}
Expand All @@ -54,8 +83,52 @@ function update(oldRequestIndex: number, newRequest: Request) {
Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests);
}

function updateOngoingRequest(newRequest: Request) {
ongoingRequest = newRequest;

if (newRequest.persistWhenOngoing) {
Onyx.set(ONYXKEYS.PERSISTED_ONGOING_REQUESTS, newRequest);
}
}

function processNextRequest(): Request | null {
if (ongoingRequest) {
Log.info(`Ongoing Request already set returning same one ${ongoingRequest.commandName}`);
return ongoingRequest;
}

// You must handle the case where there are no requests to process
if (persistedRequests.length === 0) {
throw new Error('No requests to process');
}

ongoingRequest = persistedRequests.shift() ?? null;

if (ongoingRequest && ongoingRequest.persistWhenOngoing) {
Onyx.set(ONYXKEYS.PERSISTED_ONGOING_REQUESTS, ongoingRequest);
}

return ongoingRequest;
}

function rollbackOngoingRequest() {
if (!ongoingRequest) {
return;
}

// Prepend ongoingRequest to persistedRequests
persistedRequests.unshift(ongoingRequest);

// Clear the ongoingRequest
ongoingRequest = null;
}

function getAll(): Request[] {
return persistedRequests;
}

export {clear, save, getAll, remove, update, getLength};
function getOngoingRequest(): Request | null {
return ongoingRequest;
}

export {clear, save, getAll, remove, update, getLength, getOngoingRequest, processNextRequest, updateOngoingRequest, rollbackOngoingRequest};
Loading

0 comments on commit bad93d5

Please sign in to comment.