Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add method to unlink storage #34

Merged
merged 2 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ hypercores already in the indexer.
Stop the indexer and flush index state to storage. This will not close the
underlying storage - it is up to the consumer to do that.

### indexer.unlink()

Unlink all index files. This should only be called after `close()` has resolved.

### indexer.on('index-state', onState)

#### onState
Expand Down
7 changes: 7 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ class MultiCoreIndexer extends TypedEmitter {
])
}

/**
* Unlink all index files. This should only be called after `close()` has resolved.
*/
async unlink() {
await this.#indexStream.unlink()
}

/** @param {Entry<T>[]} entries */
async #handleEntries(entries) {
this.#emitState()
Expand Down
23 changes: 18 additions & 5 deletions lib/core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,15 @@ class CoreIndexStream extends Readable {
byteLength: () => 1,
})
this.#core = core
this.#createStorage = createStorage
this.#handleAppendBound = this.#handleAppend.bind(this)
this.#handleDownloadBound = this.#handleDownload.bind(this)
this.#createStorage = async () => {
await this.#core.ready()
const { discoveryKey } = this.#core
/* istanbul ignore next: just to keep TS happy - after core.ready() this is set */
if (!discoveryKey) throw new Error('Missing discovery key')
return createStorage(getStorageName(discoveryKey))
}
}

get remaining() {
Expand Down Expand Up @@ -105,6 +111,11 @@ class CoreIndexStream extends Readable {
this.#inProgressBitfield?.set(index, false)
}

async unlink() {
this.#storage ??= await this.#createStorage()
await unlinkStorage(this.#storage)
Comment on lines +115 to +116
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm assuming there's an edge case that this accounts for but just in case: would a no-op (or throw) be possible here if the storage isn't initialized? e.g. if (!this.#storage) return

this seems to be performing operations that cancel each other out if the storage isn't already initialized, but maybe it's much more complex than that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be cases where the storage has been created on disk but hasn't been initialized in memory. For example:

  1. I start the app, index a bunch of stuff, and then close it.
  2. Later, I start the app and immediately unlink. this.#storage won't be created. We want to create it so we can unlink it.

My understanding is that creating the storage instance doesn't actually touch any files on disk. For example:

const RandomAccessFile = require("random-access-file")
const fs = require("node:fs")

new RandomAccessFile("my-file.txt")

fs.existsSync("my-file.txt")
// => false

Maybe it's worth adding an explainer comment about this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah okay that makes more sense! I was under the impression that this class fully manages the storage on disk but if it's being done somewhere else, your reasoning is clearer. thanks!

}

async #destroy() {
this.#core.removeListener('append', this.#handleAppendBound)
this.#core.removeListener('download', this.#handleDownloadBound)
Expand All @@ -115,10 +126,7 @@ class CoreIndexStream extends Readable {
async #open() {
await this.#core.ready()
await this.#core.update({ wait: true })
const { discoveryKey } = this.#core
/* istanbul ignore next: just to keep TS happy - after core.ready() this is set */
if (!discoveryKey) throw new Error('Missing discovery key')
this.#storage = this.#createStorage(getStorageName(discoveryKey))
this.#storage ??= await this.#createStorage()
this.#indexedBitfield = await Bitfield.open(this.#storage)
this.#inProgressBitfield = await new Bitfield()
this.#core.on('append', this.#handleAppendBound)
Expand Down Expand Up @@ -212,3 +220,8 @@ function getStorageName(discoveryKey) {
function closeStorage(storage) {
return promisify(storage.close.bind(storage))()
}

/** @param {import('random-access-storage')} storage */
function unlinkStorage(storage) {
return promisify(storage.unlink.bind(storage))()
}
12 changes: 12 additions & 0 deletions lib/multi-core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ class MultiCoreIndexStream extends Readable {
stream.on('drained', this.#handleDrainedBound)
}

/**
* Unlink all index files. This should only be called after close.
*/
async unlink() {
/** @type {Array<Promise<unknown>>} */
const unlinkPromises = []
for (const stream of this.#streams.keys()) {
unlinkPromises.push(stream.unlink())
}
await Promise.all(unlinkPromises)
}

/** @param {any} cb */
_open(cb) {
cb()
Expand Down
30 changes: 30 additions & 0 deletions test/multi-core-indexer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,36 @@ test('Entries are re-indexed if index storage reset', async (t) => {
t.pass('Indexer closed')
})

test('Entries are re-indexed if index storage unlinked', async (t) => {
const cores = await createMultiple(5)

const createRAM = ram.reusable()

const indexer1 = new MultiCoreIndexer(cores, {
batch: async () => {},
storage: createRAM,
})
const expected = await generateFixtures(cores, 3)
await indexer1.idle()
await indexer1.close()
await indexer1.unlink()

/** @type {Entry[]} */
const entries = []
const indexer2 = new MultiCoreIndexer(cores, {
batch: async (data) => {
entries.push(...data)
},
storage: createRAM,
})
await indexer2.idle()

t.same(sortEntries(entries), sortEntries(expected))

await indexer2.close()
t.pass('Indexer closed')
})

test('Entries are batched to batchMax when indexing is slower than Hypercore reads', async (t) => {
const cores = await createMultiple(5)
await generateFixtures(cores, 500)
Expand Down
Loading