Skip to content

Commit

Permalink
Breaking: only defer operations while db is opening
Browse files Browse the repository at this point in the history
In other states (besides 'open') a 'Database is not open' error is
now thrown. This reduces the scope of `deferred-leveldown` to what's
strictly needed in `levelup` and aligns behavior.

In addition, override public methods of `abstract-leveldown` instead
of private methods. This has one downside: they need to do the same
callback to promise conversion that `abstract-leveldown` does. The
upside is that operation callbacks are not called before the db has
finished opening, including in cases where `abstract-leveldown` has
a fast-path, like on `db.batch([])` which because the array is
empty bypasses `_batch()`.

Closes #91
Closes #90
  • Loading branch information
vweevers committed Sep 30, 2021
1 parent 713b51d commit 4fcf2c4
Show file tree
Hide file tree
Showing 6 changed files with 659 additions and 135 deletions.
28 changes: 28 additions & 0 deletions deferred-chained-batch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
'use strict'

const { AbstractChainedBatch } = require('abstract-leveldown')
const kOperations = Symbol('operations')

module.exports = class DeferredChainedBatch extends AbstractChainedBatch {
constructor (db) {
super(db)
this[kOperations] = []
}

_put (key, value, options) {
this[kOperations].push({ ...options, type: 'put', key, value })
}

_del (key, options) {
this[kOperations].push({ ...options, type: 'del', key })
}

_clear () {
this[kOperations] = []
}

_write (options, callback) {
// AbstractChainedBatch would call _batch(), we call batch()
this.db.batch(this[kOperations], options, callback)
}
}
77 changes: 61 additions & 16 deletions deferred-iterator.js
Original file line number Diff line number Diff line change
@@ -1,40 +1,85 @@
'use strict'

const AbstractIterator = require('abstract-leveldown').AbstractIterator
const { AbstractIterator } = require('abstract-leveldown')
const inherits = require('inherits')
const getCallback = require('./util').getCallback

const kOptions = Symbol('options')
const kIterator = Symbol('iterator')
const kOperations = Symbol('operations')
const kPromise = Symbol('promise')

function DeferredIterator (db, options) {
AbstractIterator.call(this, db)

this._options = options
this._iterator = null
this._operations = []
this[kOptions] = options
this[kIterator] = null
this[kOperations] = []
}

inherits(DeferredIterator, AbstractIterator)

DeferredIterator.prototype.setDb = function (db) {
const it = this._iterator = db.iterator(this._options)
this[kIterator] = db.iterator(this[kOptions])

for (const op of this._operations) {
it[op.method](...op.args)
for (const op of this[kOperations].splice(0, this[kOperations].length)) {
this[kIterator][op.method](...op.args)
}
}

DeferredIterator.prototype._operation = function (method, args) {
if (this._iterator) return this._iterator[method](...args)
this._operations.push({ method, args })
}
DeferredIterator.prototype.next = function (...args) {
if (this.db.status === 'open') {
return this[kIterator].next(...args)
}

const callback = getCallback(args, kPromise, function map (key, value) {
if (key === undefined && value === undefined) {
return undefined
} else {
return [key, value]
}
})

for (const m of ['next', 'end']) {
DeferredIterator.prototype['_' + m] = function (...args) {
this._operation(m, args)
if (this.db.status === 'opening') {
this[kOperations].push({ method: 'next', args })
} else {
this._nextTick(callback, new Error('Database is not open'))
}

return callback[kPromise] || this
}

// Must defer seek() rather than _seek() because it requires db._serializeKey to be available
DeferredIterator.prototype.seek = function (...args) {
this._operation('seek', args)
if (this.db.status === 'open') {
this[kIterator].seek(...args)
} else if (this.db.status === 'opening') {
this[kOperations].push({ method: 'seek', args })
} else {
throw new Error('Database is not open')
}
}

DeferredIterator.prototype.end = function (...args) {
if (this.db.status === 'open') {
return this[kIterator].end(...args)
}

const callback = getCallback(args, kPromise)

if (this.db.status === 'opening') {
this[kOperations].push({ method: 'end', args })
} else {
this._nextTick(callback, new Error('Database is not open'))
}

return callback[kPromise] || this
}

for (const method of ['next', 'seek', 'end']) {
DeferredIterator.prototype['_' + method] = function () {
/* istanbul ignore next: assertion */
throw new Error('Did not expect private method to be called: ' + method)
}
}

module.exports = DeferredIterator
128 changes: 80 additions & 48 deletions deferred-leveldown.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
'use strict'

const AbstractLevelDOWN = require('abstract-leveldown').AbstractLevelDOWN
const { AbstractLevelDOWN } = require('abstract-leveldown')
const inherits = require('inherits')
const DeferredIterator = require('./deferred-iterator')
const DeferredChainedBatch = require('./deferred-chained-batch')
const getCallback = require('./util').getCallback

const deferrables = ['put', 'get', 'getMany', 'del', 'batch', 'clear']
const optionalDeferrables = ['approximateSize', 'compactRange']

const kInnerDb = Symbol('innerDb')
const kOperations = Symbol('operations')
const kPromise = Symbol('promise')

function DeferredLevelDOWN (db) {
AbstractLevelDOWN.call(this, db.supports || {})

Expand All @@ -17,84 +24,109 @@ function DeferredLevelDOWN (db) {
}
}

this._db = db
this._operations = []
this[kInnerDb] = db
this[kOperations] = []

closed(this)
implement(this)
}

inherits(DeferredLevelDOWN, AbstractLevelDOWN)

DeferredLevelDOWN.prototype.type = 'deferred-leveldown'

// Backwards compatibility for reachdown and subleveldown
Object.defineProperty(DeferredLevelDOWN.prototype, '_db', {
enumerable: true,
get () {
return this[kInnerDb]
}
})

DeferredLevelDOWN.prototype._open = function (options, callback) {
this._db.open(options, (err) => {
if (err) return callback(err)
const onopen = (err) => {
if (err || this[kInnerDb].status !== 'open') {
// TODO: reject scheduled operations
return callback(err || new Error('Database is not open'))
}

const operations = this[kOperations]
this[kOperations] = []

for (const op of this._operations) {
for (const op of operations) {
if (op.iterator) {
op.iterator.setDb(this._db)
op.iterator.setDb(this[kInnerDb])
} else {
this._db[op.method](...op.args)
this[kInnerDb][op.method](...op.args)
}
}

this._operations = []
/* istanbul ignore if: assertion */
if (this[kOperations].length > 0) {
throw new Error('Did not expect further operations')
}

open(this)
callback()
})
}

if (this[kInnerDb].status === 'new' || this[kInnerDb].status === 'closed') {
this[kInnerDb].open(options, onopen)
} else {
this._nextTick(onopen)
}
}

DeferredLevelDOWN.prototype._close = function (callback) {
this._db.close((err) => {
if (err) return callback(err)
closed(this)
callback()
})
this[kInnerDb].close(callback)
}

function open (self) {
for (const m of deferrables.concat('iterator')) {
self['_' + m] = function (...args) {
return this._db[m](...args)
}
}

for (const m of Object.keys(self.supports.additionalMethods)) {
self[m] = function (...args) {
return this._db[m](...args)
}
}
DeferredLevelDOWN.prototype._isOperational = function () {
return this.status === 'opening'
}

function closed (self) {
for (const m of deferrables) {
self['_' + m] = function (...args) {
this._operations.push({ method: m, args })
function implement (self) {
const additionalMethods = Object.keys(self.supports.additionalMethods)

for (const method of deferrables.concat(additionalMethods)) {
// Override the public rather than private methods to cover cases where abstract-leveldown
// has a fast-path like on db.batch([]) which bypasses _batch() because the array is empty.
self[method] = function (...args) {
if (method === 'batch' && args.length === 0) {
return new DeferredChainedBatch(this)
} else if (this.status === 'open') {
return this[kInnerDb][method](...args)
}

const callback = getCallback(args, kPromise)

if (this.status === 'opening') {
this[kOperations].push({ method, args })
} else {
this._nextTick(callback, new Error('Database is not open'))
}

return callback[kPromise]
}
}

for (const m of Object.keys(self.supports.additionalMethods)) {
self[m] = function (...args) {
this._operations.push({ method: m, args })
self.iterator = function (options) {
if (this.status === 'open') {
return this[kInnerDb].iterator(options)
} else if (this.status === 'opening') {
const iterator = new DeferredIterator(this, options)
this[kOperations].push({ iterator })
return iterator
} else {
throw new Error('Database is not open')
}
}

self._iterator = function (options) {
const it = new DeferredIterator(self, options)
this._operations.push({ iterator: it })
return it
for (const method of deferrables.concat(['iterator'])) {
self['_' + method] = function () {
/* istanbul ignore next: assertion */
throw new Error('Did not expect private method to be called: ' + method)
}
}
}

DeferredLevelDOWN.prototype._serializeKey = function (key) {
return key
}

DeferredLevelDOWN.prototype._serializeValue = function (value) {
return value
}

module.exports = DeferredLevelDOWN
module.exports.DeferredIterator = DeferredIterator
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
"prepublishOnly": "npm run dependency-check"
},
"files": [
"deferred-chained-batch.js",
"deferred-iterator.js",
"deferred-leveldown.js",
"util.js",
"CHANGELOG.md",
"LICENSE.md",
"UPGRADING.md"
Expand Down
Loading

0 comments on commit 4fcf2c4

Please sign in to comment.