diff --git a/.github/workflows/node.js.yml b/.github/workflows/node.js.yml index 41bd8e2..956b2bc 100644 --- a/.github/workflows/node.js.yml +++ b/.github/workflows/node.js.yml @@ -14,7 +14,7 @@ jobs: strategy: matrix: - node-version: [14.x, 16.x] + node-version: [16.17.1, 18.17.1, 20.x] # See supported Node.js release schedule at https://nodejs.org/en/about/releases/ steps: diff --git a/.nvmrc b/.nvmrc index 8351c19..209e3ef 100644 --- a/.nvmrc +++ b/.nvmrc @@ -1 +1 @@ -14 +20 diff --git a/README.md b/README.md index adcf540..498f9b0 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,10 @@ hypercores already in the indexer. Stop the indexer and flush index state to storage. This will not close the underlying storage - it is up to the consumer to do that. +### indexer.unlink() + +Unlink all index files. This should only be called after `close()` has resolved. + ### indexer.on('index-state', onState) #### onState diff --git a/index.js b/index.js index 73ac91e..00dedba 100644 --- a/index.js +++ b/index.js @@ -3,7 +3,6 @@ const { Writable } = require('streamx') const { TypedEmitter } = require('tiny-typed-emitter') const { once } = require('events') const raf = require('random-access-file') -// const log = require('debug')('multi-core-indexer') const { CoreIndexStream } = require('./lib/core-index-stream') const { MultiCoreIndexStream } = require('./lib/multi-core-index-stream') const { pDefer } = require('./lib/utils.js') @@ -12,9 +11,6 @@ const DEFAULT_BATCH_SIZE = 100 // The indexing rate (in entries per second) is calculated as an exponential // moving average. A factor > 1 will put more weight on previous values. const MOVING_AVG_FACTOR = 5 -const kHandleEntries = Symbol('handleEntries') -const kEmitState = Symbol('emitState') -const kGetState = Symbol('getState') /** @typedef {string | ((name: string) => import('random-access-storage'))} StorageParam */ /** @typedef {import('./lib/types').ValueEncoding} ValueEncoding */ @@ -35,7 +31,6 @@ class MultiCoreIndexer extends TypedEmitter { #batch /** @type {import('./lib/types').IndexStateCurrent} */ #state = 'indexing' - #lastRemaining = -1 #rateMeasurementStart = Date.now() #rate = 0 #createStorage @@ -66,15 +61,14 @@ class MultiCoreIndexer extends TypedEmitter { this.#writeStream = /** @type {Writable>} */ ( new Writable({ writev: (entries, cb) => { - // @ts-ignore - I don't know why TS does not like this - this[kHandleEntries](entries).then(() => cb(), cb) + this.#handleEntries(entries).then(() => cb(), cb) }, highWaterMark: maxBatch, byteLength: () => 1, }) ) this.#indexStream.pipe(this.#writeStream) - this.#emitStateBound = this[kEmitState].bind(this) + this.#emitStateBound = this.#emitState.bind(this) // This is needed because the source streams can start indexing before this // stream starts reading data. This ensures that the indexing state is // emitted when the source cores first append / download data @@ -88,7 +82,7 @@ class MultiCoreIndexer extends TypedEmitter { * @type {IndexState} */ get state() { - return this[kGetState]() + return this.#getState() } /** @@ -104,7 +98,7 @@ class MultiCoreIndexer extends TypedEmitter { * Resolves when indexing state is 'idle' */ async idle() { - if (this[kGetState]().current === 'idle') return + if (this.#getState().current === 'idle') return if (!this.#pendingIdle) { this.#pendingIdle = pDefer() } @@ -122,9 +116,16 @@ class MultiCoreIndexer extends TypedEmitter { ]) } + /** + * Unlink all index files. This should only be called after `close()` has resolved. + */ + async unlink() { + await this.#indexStream.unlink() + } + /** @param {Entry[]} entries */ - async [kHandleEntries](entries) { - this[kEmitState]() + async #handleEntries(entries) { + this.#emitState() /* istanbul ignore if - not sure this is necessary, but better safe than sorry */ if (!entries.length) return await this.#batch(entries) @@ -140,11 +141,11 @@ class MultiCoreIndexer extends TypedEmitter { // Set this at the end of batch rather than start so the timing also // includes the reads from the index streams this.#rateMeasurementStart = Date.now() - this[kEmitState]() + this.#emitState() } - [kEmitState]() { - const state = this[kGetState]() + #emitState() { + const state = this.#getState() if (state.current !== this.#prevEmittedState?.current) { this.emit(state.current) } @@ -155,7 +156,7 @@ class MultiCoreIndexer extends TypedEmitter { this.#prevEmittedState = state } - [kGetState]() { + #getState() { const remaining = this.#indexStream.remaining const drained = this.#indexStream.drained const prevState = this.#state diff --git a/lib/core-index-stream.js b/lib/core-index-stream.js index 3c4f35e..6ef8fe8 100644 --- a/lib/core-index-stream.js +++ b/lib/core-index-stream.js @@ -4,13 +4,6 @@ const Bitfield = require('./bitfield') const { pDefer } = require('./utils') const { promisify } = require('node:util') -const kReadPromise = Symbol('readPromise') -const kOpenPromise = Symbol('openPromise') -const kDestroyPromise = Symbol('destroyPromise') -const kHandleAppend = Symbol('handleAppend') -const kHandleDownload = Symbol('handleDownload') -const kPushEntry = Symbol('pushEntry') - /** @typedef {import('./types').ValueEncoding} ValueEncoding */ /** @typedef {import('./types').JSONValue} JSONValue */ /** @@ -30,6 +23,8 @@ const kPushEntry = Symbol('pushEntry') * @extends {Readable, Entry, Entry, true, false, import('./types').IndexStreamEvents>>} */ class CoreIndexStream extends Readable { + #handleAppendBound + #handleDownloadBound /** @type {Bitfield | undefined} */ #indexedBitfield /** @type {Bitfield | undefined} */ @@ -60,9 +55,15 @@ class CoreIndexStream extends Readable { byteLength: () => 1, }) this.#core = core - this.#createStorage = createStorage - this[kHandleAppend] = this[kHandleAppend].bind(this) - this[kHandleDownload] = this[kHandleDownload].bind(this) + this.#handleAppendBound = this.#handleAppend.bind(this) + this.#handleDownloadBound = this.#handleDownload.bind(this) + this.#createStorage = async () => { + await this.#core.ready() + const { discoveryKey } = this.#core + /* istanbul ignore next: just to keep TS happy - after core.ready() this is set */ + if (!discoveryKey) throw new Error('Missing discovery key') + return createStorage(getStorageName(discoveryKey)) + } } get remaining() { @@ -81,12 +82,12 @@ class CoreIndexStream extends Readable { /** @param {any} cb */ _open(cb) { - this[kOpenPromise]().then(cb, cb) + this.#open().then(cb, cb) } /** @param {any} cb */ _read(cb) { - this[kReadPromise]().then(cb, cb) + this.#read().then(cb, cb) } _predestroy() { @@ -96,7 +97,7 @@ class CoreIndexStream extends Readable { /** @param {any} cb */ _destroy(cb) { - this[kDestroyPromise]().then(cb, cb) + this.#destroy().then(cb, cb) } /** @@ -110,27 +111,29 @@ class CoreIndexStream extends Readable { this.#inProgressBitfield?.set(index, false) } - async [kDestroyPromise]() { - this.#core.removeListener('append', this[kHandleAppend]) - this.#core.removeListener('download', this[kHandleDownload]) + async unlink() { + this.#storage ??= await this.#createStorage() + await unlinkStorage(this.#storage) + } + + async #destroy() { + this.#core.removeListener('append', this.#handleAppendBound) + this.#core.removeListener('download', this.#handleDownloadBound) await this.#indexedBitfield?.flush() if (this.#storage) await closeStorage(this.#storage) } - async [kOpenPromise]() { + async #open() { await this.#core.ready() await this.#core.update({ wait: true }) - const { discoveryKey } = this.#core - /* istanbul ignore next: just to keep TS happy - after core.ready() this is set */ - if (!discoveryKey) throw new Error('Missing discovery key') - this.#storage = this.#createStorage(getStorageName(discoveryKey)) + this.#storage ??= await this.#createStorage() this.#indexedBitfield = await Bitfield.open(this.#storage) this.#inProgressBitfield = await new Bitfield() - this.#core.on('append', this[kHandleAppend]) - this.#core.on('download', this[kHandleDownload]) + this.#core.on('append', this.#handleAppendBound) + this.#core.on('download', this.#handleDownloadBound) } - async [kReadPromise]() { + async #read() { if (this.#index >= this.#core.length && this.#downloaded.size === 0) { this.#drained = true this.emit('drained') @@ -142,7 +145,7 @@ class CoreIndexStream extends Readable { let didPush = false this.#readBufferAvailable = true while (this.#readBufferAvailable && this.#index < this.#core.length) { - didPush = (await this[kPushEntry](this.#index)) || didPush + didPush = (await this.#pushEntry(this.#index)) || didPush // Don't increment this until after the async push above this.#index++ } @@ -151,7 +154,7 @@ class CoreIndexStream extends Readable { for (const index of this.#downloaded) { this.#downloaded.delete(index) didPush = - (await this[kPushEntry](index)) || + (await this.#pushEntry(index)) || /* istanbul ignore next - TODO: Test when hypercore-next supports a core.clear() method */ didPush // This is for back-pressure, for which there is not a good test yet. @@ -166,7 +169,7 @@ class CoreIndexStream extends Readable { } if (!didPush && !this.#destroying) { // If nothing was pushed, queue up another read - await this[kReadPromise]() + await this.#read() } await this.#indexedBitfield?.flush() } @@ -177,7 +180,7 @@ class CoreIndexStream extends Readable { * @param {number} index * @returns {Promise} */ - async [kPushEntry](index) { + async #pushEntry(index) { const isProcessed = this.#indexedBitfield?.get(index) || this.#inProgressBitfield?.get(index) if (isProcessed) return false @@ -192,14 +195,14 @@ class CoreIndexStream extends Readable { return true } - async [kHandleAppend]() { + async #handleAppend() { this.#pending.resolve() } /** * @param {number} index */ - async [kHandleDownload](index) { + async #handleDownload(index) { this.#downloaded.add(index) this.#pending.resolve() } @@ -217,3 +220,8 @@ function getStorageName(discoveryKey) { function closeStorage(storage) { return promisify(storage.close.bind(storage))() } + +/** @param {import('random-access-storage')} storage */ +function unlinkStorage(storage) { + return promisify(storage.unlink.bind(storage))() +} diff --git a/lib/multi-core-index-stream.js b/lib/multi-core-index-stream.js index 638be0b..1597d17 100644 --- a/lib/multi-core-index-stream.js +++ b/lib/multi-core-index-stream.js @@ -13,17 +13,14 @@ const { once } = require('events') * @template {ValueEncoding} [T='binary'] * @typedef {import('./core-index-stream').CoreIndexStream} CoreIndexStream */ -const kReadPromise = Symbol('readPromise') -const kHandleReadable = Symbol('handleReadable') -const kDestroyPromise = Symbol('destroyPromise') -const kHandleIndexing = Symbol('handleIndexing') -const kHandleDrained = Symbol('handleDrained') /** * @template {ValueEncoding} [T='binary'] * @extends {Readable, Entry, Entry, true, false, import('./types').IndexStreamEvents>>} */ class MultiCoreIndexStream extends Readable { + #handleIndexingBound + #handleDrainedBound /** @type {Map, () => void>} */ #streams = new Map() /** @type {Map>} */ @@ -49,8 +46,8 @@ class MultiCoreIndexStream extends Readable { byteLength: () => 1, }) this.#drained = streams.length === 0 - this[kHandleIndexing] = this[kHandleIndexing].bind(this) - this[kHandleDrained] = this[kHandleDrained].bind(this) + this.#handleIndexingBound = this.#handleIndexing.bind(this) + this.#handleDrainedBound = this.#handleDrained.bind(this) for (const s of streams) { this.addStream(s) } @@ -89,7 +86,7 @@ class MultiCoreIndexStream extends Readable { if (this.#streams.has(stream)) return this.#drained = false // Do this so that we can remove this listener when we destroy the stream - const handleReadableFn = this[kHandleReadable].bind(this, stream) + const handleReadableFn = this.#handleReadable.bind(this, stream) this.#streams.set(stream, handleReadableFn) stream.core .ready() @@ -102,8 +99,20 @@ class MultiCoreIndexStream extends Readable { .catch(noop) this.#readable.add(stream) stream.on('readable', handleReadableFn) - stream.on('indexing', this[kHandleIndexing]) - stream.on('drained', this[kHandleDrained]) + stream.on('indexing', this.#handleIndexingBound) + stream.on('drained', this.#handleDrainedBound) + } + + /** + * Unlink all index files. This should only be called after close. + */ + async unlink() { + /** @type {Array>} */ + const unlinkPromises = [] + for (const stream of this.#streams.keys()) { + unlinkPromises.push(stream.unlink()) + } + await Promise.all(unlinkPromises) } /** @param {any} cb */ @@ -113,7 +122,7 @@ class MultiCoreIndexStream extends Readable { /** @param {any} cb */ _read(cb) { - this[kReadPromise]().then(cb, cb) + this.#read().then(cb, cb) } _predestroy() { @@ -123,22 +132,22 @@ class MultiCoreIndexStream extends Readable { /** @param {any} cb */ _destroy(cb) { - this[kDestroyPromise]().then(cb, cb) + this.#destroy().then(cb, cb) } - async [kDestroyPromise]() { + async #destroy() { const closePromises = [] for (const [stream, handleReadableFn] of this.#streams) { stream.off('readable', handleReadableFn) - stream.off('indexing', this[kHandleIndexing]) - stream.off('drained', this[kHandleDrained]) + stream.off('indexing', this.#handleIndexingBound) + stream.off('drained', this.#handleDrainedBound) stream.destroy() closePromises.push(once(stream, 'close')) } await Promise.all(closePromises) } - async [kReadPromise]() { + async #read() { let didPush = false if (!this.#readable.size && !this.#destroying) { await (this.#pending = pDefer()).promise @@ -156,12 +165,12 @@ class MultiCoreIndexStream extends Readable { } if (!didPush && !this.#destroying) { // If nothing was pushed, queue up another read - await this[kReadPromise]() + await this.#read() } } /** @param {CoreIndexStream} stream */ - [kHandleReadable](stream) { + #handleReadable(stream) { this.#readable.add(stream) this.#pending.resolve() } @@ -170,16 +179,19 @@ class MultiCoreIndexStream extends Readable { // `indexing` event always fires at the start of indexing in the chain of // streams (the `drained` event should happen at the end of the chain once // everything is read) - [kHandleIndexing]() { + #handleIndexing() { if (!this.#drained) return this.#drained = false this.emit('indexing') } - [kHandleDrained]() { + #handleDrained() { let drained = true for (const stream of this.#streams.keys()) { - if (!stream.drained) drained = false + if (!stream.drained) { + drained = false + break + } } if (drained === this.#drained && !drained) return this.#drained = drained diff --git a/package-lock.json b/package-lock.json index 9539a0d..3b0019c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,14 +13,13 @@ "@types/streamx": "^2.9.1", "b4a": "^1.6.4", "big-sparse-array": "^1.0.2", - "debug": "^4.3.3", "random-access-file": "^4.0.4", "streamx": "^2.15.0", "tiny-typed-emitter": "^2.1.0" }, "devDependencies": { "@digidem/types": "^1.0.1", - "@types/debug": "^4.1.8", + "@types/nanobench": "^3.0.0", "@types/sodium-native": "^2.3.5", "@types/tap": "^15.0.8", "brittle": "^3.3.2", @@ -614,15 +613,6 @@ "node": ">= 8" } }, - "node_modules/@types/debug": { - "version": "4.1.8", - "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.8.tgz", - "integrity": "sha512-/vPO1EPOs306Cvhwv7KfVfYvOJqA/S/AXjaHQiJboCZzcNDb+TIJFN9/2C9DZ//ijSKWioNyUxD792QmDJ+HKQ==", - "dev": true, - "dependencies": { - "@types/ms": "*" - } - }, "node_modules/@types/istanbul-lib-coverage": { "version": "2.0.4", "resolved": "https://registry.npmjs.org/@types/istanbul-lib-coverage/-/istanbul-lib-coverage-2.0.4.tgz", @@ -635,10 +625,10 @@ "integrity": "sha512-Klz949h02Gz2uZCMGwDUSDS1YBlTdDDgbWHi+81l29tQALUtvz4rAYi5uoVhE5Lagoq6DeqAUlbrHvW/mXDgdQ==", "dev": true }, - "node_modules/@types/ms": { - "version": "0.7.31", - "resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.31.tgz", - "integrity": "sha512-iiUgKzV9AuaEkZqkOLDIvlQiL6ltuZd9tGcW3gwpnX8JbuiuhFlEGmmFXEXkN50Cvq7Os88IY2v0dkDqXYWVgA==", + "node_modules/@types/nanobench": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/@types/nanobench/-/nanobench-3.0.0.tgz", + "integrity": "sha512-ZLUczwWDAdILwoIigOJYHsuecrKqxq8tY3uVCgWLUkFNYIIIYQcFhrVwCOkDLDYOoXbwhxgfCWjszmHUEBGHMA==", "dev": true }, "node_modules/@types/node": { @@ -1677,6 +1667,7 @@ "version": "4.3.4", "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", + "dev": true, "dependencies": { "ms": "2.1.2" }, @@ -4271,7 +4262,8 @@ "node_modules/ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", + "dev": true }, "node_modules/multifeed": { "version": "6.0.0", @@ -9279,15 +9271,6 @@ "fastq": "^1.6.0" } }, - "@types/debug": { - "version": "4.1.8", - "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.8.tgz", - "integrity": "sha512-/vPO1EPOs306Cvhwv7KfVfYvOJqA/S/AXjaHQiJboCZzcNDb+TIJFN9/2C9DZ//ijSKWioNyUxD792QmDJ+HKQ==", - "dev": true, - "requires": { - "@types/ms": "*" - } - }, "@types/istanbul-lib-coverage": { "version": "2.0.4", "resolved": "https://registry.npmjs.org/@types/istanbul-lib-coverage/-/istanbul-lib-coverage-2.0.4.tgz", @@ -9300,10 +9283,10 @@ "integrity": "sha512-Klz949h02Gz2uZCMGwDUSDS1YBlTdDDgbWHi+81l29tQALUtvz4rAYi5uoVhE5Lagoq6DeqAUlbrHvW/mXDgdQ==", "dev": true }, - "@types/ms": { - "version": "0.7.31", - "resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.31.tgz", - "integrity": "sha512-iiUgKzV9AuaEkZqkOLDIvlQiL6ltuZd9tGcW3gwpnX8JbuiuhFlEGmmFXEXkN50Cvq7Os88IY2v0dkDqXYWVgA==", + "@types/nanobench": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/@types/nanobench/-/nanobench-3.0.0.tgz", + "integrity": "sha512-ZLUczwWDAdILwoIigOJYHsuecrKqxq8tY3uVCgWLUkFNYIIIYQcFhrVwCOkDLDYOoXbwhxgfCWjszmHUEBGHMA==", "dev": true }, "@types/node": { @@ -10120,6 +10103,7 @@ "version": "4.3.4", "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", + "dev": true, "requires": { "ms": "2.1.2" } @@ -12070,7 +12054,8 @@ "ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", + "dev": true }, "multifeed": { "version": "6.0.0", diff --git a/package.json b/package.json index 4dae2d2..1829143 100644 --- a/package.json +++ b/package.json @@ -21,14 +21,13 @@ "@types/streamx": "^2.9.1", "b4a": "^1.6.4", "big-sparse-array": "^1.0.2", - "debug": "^4.3.3", "random-access-file": "^4.0.4", "streamx": "^2.15.0", "tiny-typed-emitter": "^2.1.0" }, "devDependencies": { "@digidem/types": "^1.0.1", - "@types/debug": "^4.1.8", + "@types/nanobench": "^3.0.0", "@types/sodium-native": "^2.3.5", "@types/tap": "^15.0.8", "brittle": "^3.3.2", diff --git a/test/helpers/index.js b/test/helpers/index.js index 36d64c9..65f229f 100644 --- a/test/helpers/index.js +++ b/test/helpers/index.js @@ -6,6 +6,7 @@ const ram = require('random-access-memory') const BLOCK_LENGTH = Buffer.from('block000000').byteLength /** @typedef {import('../../lib/types').Entry<'binary'>} Entry */ +/** @typedef {import('node:events').EventEmitter} EventEmitter */ module.exports = { create, @@ -14,6 +15,7 @@ module.exports = { generateFixtures, createMultiple, throttledDrain, + throttledIdle, sortEntries, logEntries, blocksToExpected, @@ -87,24 +89,37 @@ async function generateFixtures(cores, count) { * The index stream can become momentarily drained between reads and * appends/downloads of new data. This throttle drained will resolve only when * the stream has remained drained for > 10ms - * @param {import('events').EventEmitter} emitter + * @param {EventEmitter} emitter * @returns {Promise} */ -async function throttledDrain(emitter) { +function throttledDrain(emitter) { + return throttledStreamEvent(emitter, 'drained') +} + +function throttledIdle(emitter) { + return throttledStreamEvent(emitter, 'idle') +} + +/** + * @param {EventEmitter} emitter + * @param {string} eventName + * @returns {Promise} + */ +function throttledStreamEvent(emitter, eventName) { return new Promise((resolve) => { /** @type {ReturnType} */ let timeoutId - function onDrained() { + function onEvent() { clearTimeout(timeoutId) timeoutId = setTimeout(() => { - emitter.off('drained', onDrained) + emitter.off(eventName, onEvent) emitter.off('indexing', onIndexing) resolve() }, 10) } - emitter.on('drained', onDrained) + emitter.on(eventName, onEvent) emitter.on('indexing', onIndexing) function onIndexing() { clearTimeout(timeoutId) diff --git a/test/multi-core-indexer.test.js b/test/multi-core-indexer.test.js index 62696dc..b2ff9c8 100644 --- a/test/multi-core-indexer.test.js +++ b/test/multi-core-indexer.test.js @@ -329,6 +329,36 @@ test('Entries are re-indexed if index storage reset', async (t) => { t.pass('Indexer closed') }) +test('Entries are re-indexed if index storage unlinked', async (t) => { + const cores = await createMultiple(5) + + const createRAM = ram.reusable() + + const indexer1 = new MultiCoreIndexer(cores, { + batch: async () => {}, + storage: createRAM, + }) + const expected = await generateFixtures(cores, 3) + await indexer1.idle() + await indexer1.close() + await indexer1.unlink() + + /** @type {Entry[]} */ + const entries = [] + const indexer2 = new MultiCoreIndexer(cores, { + batch: async (data) => { + entries.push(...data) + }, + storage: createRAM, + }) + await indexer2.idle() + + t.same(sortEntries(entries), sortEntries(expected)) + + await indexer2.close() + t.pass('Indexer closed') +}) + test('Entries are batched to batchMax when indexing is slower than Hypercore reads', async (t) => { const cores = await createMultiple(5) await generateFixtures(cores, 500) diff --git a/vendor/types/nanobench.d.ts b/vendor/types/nanobench.d.ts deleted file mode 100644 index c376ee6..0000000 --- a/vendor/types/nanobench.d.ts +++ /dev/null @@ -1,8 +0,0 @@ -declare module 'nanobench' { - interface Bench { - start(): void - end(): void - } - function nanobench(name: string, fn: (b: Bench) => void): void - export = nanobench -}