Skip to content

Commit

Permalink
revert: promise queue changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Christoph Pader committed Jul 12, 2023
1 parent 05797c0 commit e732494
Showing 1 changed file with 26 additions and 70 deletions.
96 changes: 26 additions & 70 deletions lib/Onyx.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,8 @@ const METHOD = {
CLEAR: 'clear',
};

// A map controlling concurrent storage modification.
// Each key is associated with a promise of an ongoing storage operation.
// This allows sequential chaining of overlapping writes to the same key,
// effectively preventing concurrent modification conflicts.
const pendingWriteOperations = {};

// When `Onyx.clear` gets called, we don't want other write operations to start
// before clearing is completed. This promise is resolved when clearing is done.
let pendingClearOperation = Promise.resolve();
// Key/value store of Onyx key and arrays of values to merge
const mergeQueue = {};

// Keeps track of the last connectionID that was used so we can keep incrementing it
let lastConnectionID = 0;
Expand Down Expand Up @@ -804,39 +797,6 @@ function notifyCollectionSubscribersOnNextTick(key, value) {
Promise.resolve().then(() => keysChanged(key, value));
}

/**
* This method waits for prior modifications to complete before running the next modification.
* A modification might be a call to `Onyx.set`, `Onyx.merge`, `Onyx.remove`, or any other Onyx method that changed data in storage
*
* @param {(String | null)} key
* @param {Function} nextModification
* @param {boolean} isClearOperation
* @returns {Promise}
*/
function waitPendingWrites(key, nextModification, isClearOperation = false) {
if (isClearOperation) {
const currentTasks = Promise.all([
pendingClearOperation,
...(_.values(pendingWriteOperations)),
]);

pendingClearOperation = currentTasks.finally(nextModification);

return pendingClearOperation;
}

if (key == null) { return Promise.resolve(); }

const currentTasks = Promise.all([
pendingClearOperation,
pendingWriteOperations[key],
]);

pendingWriteOperations[key] = currentTasks.finally(nextModification);

return pendingWriteOperations[key];
}

/**
* Remove a key from Onyx and update the subscribers
*
Expand All @@ -847,11 +807,7 @@ function waitPendingWrites(key, nextModification, isClearOperation = false) {
function remove(key) {
cache.drop(key);
notifySubscribersOnNextTick(key, null);

const nextModification = () => Storage.removeItem(key)
.catch(error => Logger.logAlert(`An error occurred while removing key: ${key}, Error: ${error}`));

return waitPendingWrites(key, nextModification);
return Storage.removeItem(key);
}

/**
Expand Down Expand Up @@ -926,14 +882,8 @@ function set(key, value) {
// We relay changes to subscribers optimistically to avoid lag (think UI updates)
notify(key, value, 'set');

const nextModification = () => Storage.setItem(key, value)
.catch((error) => {
// We have to resolve the current promise for the key, so that the next set doesn't block
pendingWriteOperations[key] = Promise.resolve();
return evictStorageAndRetry(error, set, key, value);
});

return waitPendingWrites(key, nextModification);
return Storage.setItem(key, value)
.catch(error => evictStorageAndRetry(error, set, key, value));
}

/**
Expand Down Expand Up @@ -1025,23 +975,30 @@ function applyMerge(existingValue, changes) {
* @returns {Promise}
*/
function merge(key, changes) {
const nextModification = () => get(key)
return get(key)
.then((existingValue) => {
try {
const modifiedData = applyMerge(existingValue, changes);

// We relay changes to subscribers optimistically to avoid lag (think UI updates)
notify(key, modifiedData, 'merge');
return Storage.mergeItem(key, changes, modifiedData);
} catch (error) {
Logger.logAlert(`An error occurred while applying merge for key: ${key}, Error: ${error}`);
}
const batchedChanges = /* mergeBatchedChanges(key); */ undefined;
const modifiedData = applyMerge(existingValue, batchedChanges);

return Promise.resolve();
// Clean up the write queue so we
// don't apply these changes again
delete mergeQueue[key];

// We relay changes to subscribers optimistically to avoid lag (think UI updates)
notify(key, modifiedData, 'merge');

return Storage.mergeItem(key, changes, modifiedData);
})
.catch(error => Logger.logAlert(`An error occurred while applying merge for key: ${key}, Error: ${error}`));
}

return waitPendingWrites(key, nextModification);
/**
* @private
* @param {String} key
* @returns {Boolean}
*/
function hasPendingMergeForKey(key) {
return Boolean(mergeQueue[key]);
}

/**
Expand Down Expand Up @@ -1083,7 +1040,7 @@ function initializeWithDefaultKeyStates() {
* @returns {Promise<void>}
*/
function clear(keysToPreserve = []) {
const nextModification = () => getAllKeys()
return getAllKeys()
.then((keys) => {
const keysToBeClearedFromStorage = [];
const keyValuesToResetAsCollection = {};
Expand Down Expand Up @@ -1141,8 +1098,6 @@ function clear(keysToPreserve = []) {
_.each(keysToBeClearedFromStorage, key => cache.drop(key));
return Storage.removeItems(keysToBeClearedFromStorage).then(() => Storage.multiSet(defaultKeyValuePairs));
});

return waitPendingWrites(undefined, nextModification, true);
}

/**
Expand Down Expand Up @@ -1340,6 +1295,7 @@ const Onyx = {
multiSet,
merge,
mergeCollection,
hasPendingMergeForKey,
update,
clear,
getAllKeys,
Expand Down

0 comments on commit e732494

Please sign in to comment.