Skip to content

Commit

Permalink
feat: add method to unlink storage
Browse files Browse the repository at this point in the history
This adds `MultiCoreIndexer.prototype.unlink()` which unlinks all the
index storage.

Addresses #26.
  • Loading branch information
EvanHahn committed Jan 19, 2024
1 parent 259844a commit 4d8c94a
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 5 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ hypercores already in the indexer.
Stop the indexer and flush index state to storage. This will not close the
underlying storage - it is up to the consumer to do that.

### indexer.unlink()

Unlink all index files. This should only be called after `close()` has resolved.

### indexer.on('index-state', onState)

#### onState
Expand Down
7 changes: 7 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ class MultiCoreIndexer extends TypedEmitter {
])
}

/**
* Unlink all index files. This should only be called after `close()` has resolved.
*/
async unlink() {
await this.#indexStream.unlink()
}

/** @param {Entry<T>[]} entries */
async #handleEntries(entries) {
this.#emitState()
Expand Down
23 changes: 18 additions & 5 deletions lib/core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,15 @@ class CoreIndexStream extends Readable {
byteLength: () => 1,
})
this.#core = core
this.#createStorage = createStorage
this.#handleAppendBound = this.#handleAppend.bind(this)
this.#handleDownloadBound = this.#handleDownload.bind(this)
this.#createStorage = async () => {
await this.#core.ready()
const { discoveryKey } = this.#core
/* istanbul ignore next: just to keep TS happy - after core.ready() this is set */
if (!discoveryKey) throw new Error('Missing discovery key')
return createStorage(getStorageName(discoveryKey))
}
}

get remaining() {
Expand Down Expand Up @@ -105,6 +111,11 @@ class CoreIndexStream extends Readable {
this.#inProgressBitfield?.set(index, false)
}

async unlink() {
this.#storage ??= await this.#createStorage()
await unlinkStorage(this.#storage)
}

async #destroy() {
this.#core.removeListener('append', this.#handleAppendBound)
this.#core.removeListener('download', this.#handleDownloadBound)
Expand All @@ -115,10 +126,7 @@ class CoreIndexStream extends Readable {
async #open() {
await this.#core.ready()
await this.#core.update({ wait: true })
const { discoveryKey } = this.#core
/* istanbul ignore next: just to keep TS happy - after core.ready() this is set */
if (!discoveryKey) throw new Error('Missing discovery key')
this.#storage = this.#createStorage(getStorageName(discoveryKey))
this.#storage ??= await this.#createStorage()
this.#indexedBitfield = await Bitfield.open(this.#storage)
this.#inProgressBitfield = await new Bitfield()
this.#core.on('append', this.#handleAppendBound)
Expand Down Expand Up @@ -212,3 +220,8 @@ function getStorageName(discoveryKey) {
function closeStorage(storage) {
return promisify(storage.close.bind(storage))()
}

/** @param {import('random-access-storage')} storage */
function unlinkStorage(storage) {
return promisify(storage.unlink.bind(storage))()
}
12 changes: 12 additions & 0 deletions lib/multi-core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ class MultiCoreIndexStream extends Readable {
stream.on('drained', this.#handleDrainedBound)
}

/**
* Unlink all index files. This should only be called after close.
*/
async unlink() {
/** @type {Array<Promise<unknown>>} */
const unlinkPromises = []
for (const stream of this.#streams.keys()) {
unlinkPromises.push(stream.unlink())
}
await Promise.all(unlinkPromises)
}

/** @param {any} cb */
_open(cb) {
cb()
Expand Down
30 changes: 30 additions & 0 deletions test/multi-core-indexer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,36 @@ test('Entries are re-indexed if index storage reset', async (t) => {
t.pass('Indexer closed')
})

test('Entries are re-indexed if index storage unlinked', async (t) => {
const cores = await createMultiple(5)

const createRAM = ram.reusable()

const indexer1 = new MultiCoreIndexer(cores, {
batch: async () => {},
storage: createRAM,
})
const expected = await generateFixtures(cores, 3)
await indexer1.idle()
await indexer1.close()
await indexer1.unlink()

/** @type {Entry[]} */
const entries = []
const indexer2 = new MultiCoreIndexer(cores, {
batch: async (data) => {
entries.push(...data)
},
storage: createRAM,
})
await indexer2.idle()

t.same(sortEntries(entries), sortEntries(expected))

await indexer2.close()
t.pass('Indexer closed')
})

test('Entries are batched to batchMax when indexing is slower than Hypercore reads', async (t) => {
const cores = await createMultiple(5)
await generateFixtures(cores, 500)
Expand Down

0 comments on commit 4d8c94a

Please sign in to comment.