Skip to content

Commit

Permalink
Merge branch 'main' into nonready-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
EvanHahn committed Feb 22, 2024
2 parents b4b0402 + 98a0e25 commit 05d6ece
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 114 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/node.js.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:

strategy:
matrix:
node-version: [14.x, 16.x]
node-version: [16.17.1, 18.17.1, 20.x]
# See supported Node.js release schedule at https://nodejs.org/en/about/releases/

steps:
Expand Down
2 changes: 1 addition & 1 deletion .nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
14
20
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ hypercores already in the indexer.
Stop the indexer and flush index state to storage. This will not close the
underlying storage - it is up to the consumer to do that.

### indexer.unlink()

Unlink all index files. This should only be called after `close()` has resolved.

### indexer.on('index-state', onState)

#### onState
Expand Down
33 changes: 17 additions & 16 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ const { Writable } = require('streamx')
const { TypedEmitter } = require('tiny-typed-emitter')
const { once } = require('events')
const raf = require('random-access-file')
// const log = require('debug')('multi-core-indexer')
const { CoreIndexStream } = require('./lib/core-index-stream')
const { MultiCoreIndexStream } = require('./lib/multi-core-index-stream')
const { pDefer } = require('./lib/utils.js')
Expand All @@ -12,9 +11,6 @@ const DEFAULT_BATCH_SIZE = 100
// The indexing rate (in entries per second) is calculated as an exponential
// moving average. A factor > 1 will put more weight on previous values.
const MOVING_AVG_FACTOR = 5
const kHandleEntries = Symbol('handleEntries')
const kEmitState = Symbol('emitState')
const kGetState = Symbol('getState')

/** @typedef {string | ((name: string) => import('random-access-storage'))} StorageParam */
/** @typedef {import('./lib/types').ValueEncoding} ValueEncoding */
Expand All @@ -35,7 +31,6 @@ class MultiCoreIndexer extends TypedEmitter {
#batch
/** @type {import('./lib/types').IndexStateCurrent} */
#state = 'indexing'
#lastRemaining = -1
#rateMeasurementStart = Date.now()
#rate = 0
#createStorage
Expand Down Expand Up @@ -66,15 +61,14 @@ class MultiCoreIndexer extends TypedEmitter {
this.#writeStream = /** @type {Writable<Entry<T>>} */ (
new Writable({
writev: (entries, cb) => {
// @ts-ignore - I don't know why TS does not like this
this[kHandleEntries](entries).then(() => cb(), cb)
this.#handleEntries(entries).then(() => cb(), cb)
},
highWaterMark: maxBatch,
byteLength: () => 1,
})
)
this.#indexStream.pipe(this.#writeStream)
this.#emitStateBound = this[kEmitState].bind(this)
this.#emitStateBound = this.#emitState.bind(this)
// This is needed because the source streams can start indexing before this
// stream starts reading data. This ensures that the indexing state is
// emitted when the source cores first append / download data
Expand All @@ -88,7 +82,7 @@ class MultiCoreIndexer extends TypedEmitter {
* @type {IndexState}
*/
get state() {
return this[kGetState]()
return this.#getState()
}

/**
Expand All @@ -104,7 +98,7 @@ class MultiCoreIndexer extends TypedEmitter {
* Resolves when indexing state is 'idle'
*/
async idle() {
if (this[kGetState]().current === 'idle') return
if (this.#getState().current === 'idle') return
if (!this.#pendingIdle) {
this.#pendingIdle = pDefer()
}
Expand All @@ -122,9 +116,16 @@ class MultiCoreIndexer extends TypedEmitter {
])
}

/**
* Unlink all index files. This should only be called after `close()` has resolved.
*/
async unlink() {
await this.#indexStream.unlink()
}

/** @param {Entry<T>[]} entries */
async [kHandleEntries](entries) {
this[kEmitState]()
async #handleEntries(entries) {
this.#emitState()
/* istanbul ignore if - not sure this is necessary, but better safe than sorry */
if (!entries.length) return
await this.#batch(entries)
Expand All @@ -140,11 +141,11 @@ class MultiCoreIndexer extends TypedEmitter {
// Set this at the end of batch rather than start so the timing also
// includes the reads from the index streams
this.#rateMeasurementStart = Date.now()
this[kEmitState]()
this.#emitState()
}

[kEmitState]() {
const state = this[kGetState]()
#emitState() {
const state = this.#getState()
if (state.current !== this.#prevEmittedState?.current) {
this.emit(state.current)
}
Expand All @@ -155,7 +156,7 @@ class MultiCoreIndexer extends TypedEmitter {
this.#prevEmittedState = state
}

[kGetState]() {
#getState() {
const remaining = this.#indexStream.remaining
const drained = this.#indexStream.drained
const prevState = this.#state
Expand Down
68 changes: 38 additions & 30 deletions lib/core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,6 @@ const Bitfield = require('./bitfield')
const { pDefer } = require('./utils')
const { promisify } = require('node:util')

const kReadPromise = Symbol('readPromise')
const kOpenPromise = Symbol('openPromise')
const kDestroyPromise = Symbol('destroyPromise')
const kHandleAppend = Symbol('handleAppend')
const kHandleDownload = Symbol('handleDownload')
const kPushEntry = Symbol('pushEntry')

/** @typedef {import('./types').ValueEncoding} ValueEncoding */
/** @typedef {import('./types').JSONValue} JSONValue */
/**
Expand All @@ -30,6 +23,8 @@ const kPushEntry = Symbol('pushEntry')
* @extends {Readable<Entry<T>, Entry<T>, Entry<T>, true, false, import('./types').IndexStreamEvents<Entry<T>>>}
*/
class CoreIndexStream extends Readable {
#handleAppendBound
#handleDownloadBound
/** @type {Bitfield | undefined} */
#indexedBitfield
/** @type {Bitfield | undefined} */
Expand Down Expand Up @@ -60,9 +55,15 @@ class CoreIndexStream extends Readable {
byteLength: () => 1,
})
this.#core = core
this.#createStorage = createStorage
this[kHandleAppend] = this[kHandleAppend].bind(this)
this[kHandleDownload] = this[kHandleDownload].bind(this)
this.#handleAppendBound = this.#handleAppend.bind(this)
this.#handleDownloadBound = this.#handleDownload.bind(this)
this.#createStorage = async () => {
await this.#core.ready()
const { discoveryKey } = this.#core
/* istanbul ignore next: just to keep TS happy - after core.ready() this is set */
if (!discoveryKey) throw new Error('Missing discovery key')
return createStorage(getStorageName(discoveryKey))
}
}

get remaining() {
Expand All @@ -81,12 +82,12 @@ class CoreIndexStream extends Readable {

/** @param {any} cb */
_open(cb) {
this[kOpenPromise]().then(cb, cb)
this.#open().then(cb, cb)
}

/** @param {any} cb */
_read(cb) {
this[kReadPromise]().then(cb, cb)
this.#read().then(cb, cb)
}

_predestroy() {
Expand All @@ -96,7 +97,7 @@ class CoreIndexStream extends Readable {

/** @param {any} cb */
_destroy(cb) {
this[kDestroyPromise]().then(cb, cb)
this.#destroy().then(cb, cb)
}

/**
Expand All @@ -110,27 +111,29 @@ class CoreIndexStream extends Readable {
this.#inProgressBitfield?.set(index, false)
}

async [kDestroyPromise]() {
this.#core.removeListener('append', this[kHandleAppend])
this.#core.removeListener('download', this[kHandleDownload])
async unlink() {
this.#storage ??= await this.#createStorage()
await unlinkStorage(this.#storage)
}

async #destroy() {
this.#core.removeListener('append', this.#handleAppendBound)
this.#core.removeListener('download', this.#handleDownloadBound)
await this.#indexedBitfield?.flush()
if (this.#storage) await closeStorage(this.#storage)
}

async [kOpenPromise]() {
async #open() {
await this.#core.ready()
await this.#core.update({ wait: true })
const { discoveryKey } = this.#core
/* istanbul ignore next: just to keep TS happy - after core.ready() this is set */
if (!discoveryKey) throw new Error('Missing discovery key')
this.#storage = this.#createStorage(getStorageName(discoveryKey))
this.#storage ??= await this.#createStorage()
this.#indexedBitfield = await Bitfield.open(this.#storage)
this.#inProgressBitfield = await new Bitfield()
this.#core.on('append', this[kHandleAppend])
this.#core.on('download', this[kHandleDownload])
this.#core.on('append', this.#handleAppendBound)
this.#core.on('download', this.#handleDownloadBound)
}

async [kReadPromise]() {
async #read() {
if (this.#index >= this.#core.length && this.#downloaded.size === 0) {
this.#drained = true
this.emit('drained')
Expand All @@ -142,7 +145,7 @@ class CoreIndexStream extends Readable {
let didPush = false
this.#readBufferAvailable = true
while (this.#readBufferAvailable && this.#index < this.#core.length) {
didPush = (await this[kPushEntry](this.#index)) || didPush
didPush = (await this.#pushEntry(this.#index)) || didPush
// Don't increment this until after the async push above
this.#index++
}
Expand All @@ -151,7 +154,7 @@ class CoreIndexStream extends Readable {
for (const index of this.#downloaded) {
this.#downloaded.delete(index)
didPush =
(await this[kPushEntry](index)) ||
(await this.#pushEntry(index)) ||
/* istanbul ignore next - TODO: Test when hypercore-next supports a core.clear() method */
didPush
// This is for back-pressure, for which there is not a good test yet.
Expand All @@ -166,7 +169,7 @@ class CoreIndexStream extends Readable {
}
if (!didPush && !this.#destroying) {
// If nothing was pushed, queue up another read
await this[kReadPromise]()
await this.#read()
}
await this.#indexedBitfield?.flush()
}
Expand All @@ -177,7 +180,7 @@ class CoreIndexStream extends Readable {
* @param {number} index
* @returns {Promise<boolean>}
*/
async [kPushEntry](index) {
async #pushEntry(index) {
const isProcessed =
this.#indexedBitfield?.get(index) || this.#inProgressBitfield?.get(index)
if (isProcessed) return false
Expand All @@ -192,14 +195,14 @@ class CoreIndexStream extends Readable {
return true
}

async [kHandleAppend]() {
async #handleAppend() {
this.#pending.resolve()
}

/**
* @param {number} index
*/
async [kHandleDownload](index) {
async #handleDownload(index) {
this.#downloaded.add(index)
this.#pending.resolve()
}
Expand All @@ -217,3 +220,8 @@ function getStorageName(discoveryKey) {
function closeStorage(storage) {
return promisify(storage.close.bind(storage))()
}

/** @param {import('random-access-storage')} storage */
function unlinkStorage(storage) {
return promisify(storage.unlink.bind(storage))()
}
Loading

0 comments on commit 05d6ece

Please sign in to comment.