Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: discoveryId instead of key for entries #25

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ class MultiCoreIndexer extends TypedEmitter {
/* c8 ignore next - not sure this is necessary, but better safe than sorry */
if (!entries.length) return
await this.#batch(entries)
for (const { key, index } of entries) {
this.#indexStream.setIndexed(key.toString('hex'), index)
for (const { discoveryId, index } of entries) {
this.#indexStream.setIndexed(discoveryId, index)
}
const batchTime = Date.now() - this.#rateMeasurementStart
// Current rate entries per second
Expand Down
7 changes: 5 additions & 2 deletions lib/core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class CoreIndexStream extends Readable {
#readBufferAvailable = true
#destroying = false
#drained = false
/** @type {string | undefined} */
#coreDiscoveryId

/**
* @param {import('hypercore')<T, any>} core
Expand Down Expand Up @@ -125,6 +127,7 @@ class CoreIndexStream extends Readable {

async #open() {
await this.#core.ready()
this.#coreDiscoveryId = this.#core.discoveryKey?.toString('hex')
await this.#core.update({ wait: true })
this.#storage ??= await this.#createStorage()
this.#indexedBitfield = await Bitfield.open(this.#storage)
Expand Down Expand Up @@ -189,8 +192,8 @@ class CoreIndexStream extends Readable {
this.#inProgressBitfield?.set(index, true)
this.#inProgress++
/* c8 ignore next: this should always be set at this point */
if (!this.#core.key) throw new Error('Missing core key')
const entry = { key: this.#core.key, block, index }
if (!this.#coreDiscoveryId) throw new Error('Missing core discoveryId')
const entry = { discoveryId: this.#coreDiscoveryId, block, index }
this.#readBufferAvailable = this.push(entry)
return true
}
Expand Down
6 changes: 3 additions & 3 deletions lib/multi-core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ class MultiCoreIndexStream extends Readable {
stream.core
.ready()
.then(() => {
const coreKey = stream.core.key
const discoveryKey = stream.core.discoveryKey
/* c8 ignore next: this is set after ready */
if (!coreKey) return
this.#streamsById.set(coreKey.toString('hex'), stream)
if (!discoveryKey) return
this.#streamsById.set(discoveryKey.toString('hex'), stream)
})
.catch(noop)
this.#readable.add(stream)
Expand Down
2 changes: 1 addition & 1 deletion lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export type ValueEncoding = 'binary' | 'utf-8' | 'json'

export interface Entry<T extends ValueEncoding = 'binary'> {
index: number
key: Buffer
discoveryId: string
block: T extends 'binary' ? Buffer : T extends 'utf-8' ? string : JSONValue
}

Expand Down
19 changes: 11 additions & 8 deletions test/helpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ async function generateFixtures(cores, count) {
const offset = core.length
const blocks = generateFixture(offset, offset + count)
await core.append(blocks)
entries.push.apply(entries, blocksToExpected(blocks, core.key, offset))
entries.push.apply(
entries,
blocksToExpected(blocks, core.discoveryKey, offset)
)
}
return entries
}
Expand Down Expand Up @@ -133,8 +136,8 @@ function throttledStreamEvent(emitter, eventName) {
* @returns number
*/
function sort(a, b) {
const aKey = a.key.toString('hex') + a.block.toString()
const bKey = b.key.toString('hex') + b.block.toString()
const aKey = a.discoveryId + a.block.toString()
const bKey = b.discoveryId + b.block.toString()
return aKey < bKey ? -1 : aKey > bKey ? 1 : 0
}

Expand All @@ -146,12 +149,12 @@ function sortEntries(e) {
/**
*
* @param {Buffer[]} blocks
* @param {Buffer} key
* @returns
* @param {Buffer} discoveryKey
* @returns {Entry[]}
*/
function blocksToExpected(blocks, key, offset = 0) {
function blocksToExpected(blocks, discoveryKey, offset = 0) {
return blocks.map((block, i) => ({
key,
discoveryId: discoveryKey.toString('hex'),
block,
index: i + offset,
}))
Expand All @@ -173,7 +176,7 @@ async function createMultiple(n) {
function logEntries(e) {
console.log(
sortEntries(e).map((e) => ({
key: e.key.toString('hex'),
discoveryId: e.discoveryId,
block: e.block.toString(),
index: e.index,
}))
Expand Down
2 changes: 1 addition & 1 deletion test/multi-core-indexer.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// @ts-check
const MultiCoreIndexer = require('../')
const test = require('node:test')
const { test } = require('node:test')
const assert = require('node:assert/strict')
const ram = require('random-access-memory')
const {
Expand Down
2 changes: 1 addition & 1 deletion test/unit-tests/bitfield.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const test = require('node:test')
const { test } = require('node:test')
const assert = require('node:assert/strict')
const ram = require('random-access-memory')
const Bitfield = require('../../lib/bitfield')
Expand Down
16 changes: 8 additions & 8 deletions test/unit-tests/core-index-stream.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// @ts-check
const { CoreIndexStream } = require('../../lib/core-index-stream')
const test = require('node:test')
const { test } = require('node:test')
const assert = require('node:assert/strict')
const { once } = require('events')
const ram = require('random-access-memory')
Expand Down Expand Up @@ -46,7 +46,7 @@ test('unlink before open', async () => {
test('Indexes all items already in a core', async () => {
const a = await create()
const blocks = generateFixture(0, 10)
const expected = blocksToExpected(blocks, a.key)
const expected = blocksToExpected(blocks, a.discoveryKey)
await a.append(blocks)
/** @type {any[]} */
const entries = []
Expand All @@ -68,7 +68,7 @@ test('.remaining property is accurate', async () => {
const totalBlocks = 100
const a = await create()
const blocks = generateFixture(0, totalBlocks)
const expected = blocksToExpected(blocks, a.key)
const expected = blocksToExpected(blocks, a.discoveryKey)
await a.append(blocks)
/** @type {any[]} */
const entries = []
Expand All @@ -93,7 +93,7 @@ test('Indexes items appended after initial index', async () => {
stream.on('data', (entry) => entries.push(entry))
await once(stream, 'drained')
assert.deepEqual(entries, [], 'no entries before append')
const expected = blocksToExpected(blocks, a.key)
const expected = blocksToExpected(blocks, a.discoveryKey)
await a.append(blocks)
await once(stream, 'drained')
assert.deepEqual(entries, expected)
Expand Down Expand Up @@ -212,12 +212,12 @@ test('Maintains index state', async () => {
/**
*
* @param {Buffer[]} blocks
* @param {Buffer} key
* @returns
* @param {Buffer} discoveryKey
* @returns {import('../../').Entry[]}
*/
function blocksToExpected(blocks, key) {
function blocksToExpected(blocks, discoveryKey) {
return blocks.map((block, i) => ({
key,
discoveryId: discoveryKey.toString('hex'),
block,
index: i,
}))
Expand Down
17 changes: 12 additions & 5 deletions test/unit-tests/multi-core-index-stream.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// @ts-check
const { CoreIndexStream } = require('../../lib/core-index-stream')
const { MultiCoreIndexStream } = require('../../lib/multi-core-index-stream')
const test = require('node:test')
const { test } = require('node:test')
const assert = require('node:assert/strict')
const { once } = require('events')
const ram = require('random-access-memory')
Expand All @@ -15,6 +15,8 @@
sortEntries,
} = require('../helpers')

/** @typedef {import('../..').Entry} Entry */

test('Indexes all items already in a core', async () => {
const cores = await createMultiple(5)
const expected = await generateFixtures(cores, 1000)
Expand Down Expand Up @@ -46,12 +48,14 @@
const indexStreams = cores.map(
(core) => new CoreIndexStream(core, () => new ram())
)
/** @type {Entry[]} */
const entries = []
const stream = new MultiCoreIndexStream(indexStreams.slice(0, 2))
stream.addStream(indexStreams[2])
// Check re-adding a stream that is already being indexed is a no-op
stream.addStream(indexStreams[1])
const ws = new Writable({
/** @type {(data: Entry[], cb: () => void) => void} */
writev: (data, cb) => {
entries.push(...data)
cb()
Expand All @@ -68,21 +72,23 @@
await once(ws, 'close').catch(() => {})
})

test('.remaining is as expected', async () => {
test('.remaining is as expected', { only: true }, async () => {

Check notice on line 75 in test/unit-tests/multi-core-index-stream.test.js

View workflow job for this annotation

GitHub Actions / build (20.x)

'only' and 'runOnly' require the --test-only command-line option.
const coreCount = 5
const blockCount = 100
const cores = await createMultiple(coreCount)
const expected = await generateFixtures(cores, blockCount)
const indexStreams = cores.map(
(core) => new CoreIndexStream(core, () => new ram())
)
/** @type {Entry[]} */
const entries = []
const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 })
const ws = new Writable({
/** @type {(data: Entry[], cb: () => void) => void} */
writev: (data, cb) => {
entries.push(...data)
for (const { key, index } of data) {
stream.setIndexed(key.toString('hex'), index)
for (const { discoveryId, index } of data) {
stream.setIndexed(discoveryId, index)
}
assert.equal(
stream.remaining,
Expand All @@ -108,6 +114,7 @@
const indexStreams = cores.map(
(core) => new CoreIndexStream(core, () => new ram())
)
/** @type {Entry[]} */
const entries = []
const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 })
stream.on('data', (entry) => entries.push(entry))
Expand Down Expand Up @@ -216,7 +223,7 @@
const stream = new MultiCoreIndexStream(indexStreams)
stream.on('data', (entry) => {
entries.push(entry)
stream.setIndexed(entry.key.toString('hex'), entry.index)
stream.setIndexed(entry.discoveryId, entry.index)
})

const expectedPromise = generateFixtures(cores, 1000)
Expand Down
2 changes: 1 addition & 1 deletion test/unit-tests/utils.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// @ts-check
const test = require('node:test')
const { test } = require('node:test')
const assert = require('node:assert/strict')
const { ExhaustivenessError } = require('../../lib/utils.js')

Expand Down
Loading