Skip to content

Commit

Permalink
feat!: discoveryId instead of key for entries
Browse files Browse the repository at this point in the history
BREAKING CHANGE: this changes the type of `Entry` to have the core
discoveryId (the discovery key as a hex string) instead of the core key.

Since our mapeo-core code changed to using the discovery key for version
ids, we need to calculate the discovery key for each entry, which has a
performance cost (it's a hash operation). This change avoids the need
for any hashing when processing entries.
  • Loading branch information
gmaclennan committed Dec 6, 2023
1 parent 340fe31 commit 3ccc387
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 39 deletions.
4 changes: 2 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ class MultiCoreIndexer extends TypedEmitter {
/* istanbul ignore if - not sure this is necessary, but better safe than sorry */
if (!entries.length) return
await this.#batch(entries)
for (const { key, index } of entries) {
this.#indexStream.setIndexed(key.toString('hex'), index)
for (const { discoveryId, index } of entries) {
this.#indexStream.setIndexed(discoveryId, index)
}
const batchTime = Date.now() - this.#rateMeasurementStart
// Current rate entries per second
Expand Down
14 changes: 8 additions & 6 deletions lib/core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class CoreIndexStream extends Readable {
#inProgressBitfield
#inProgress = 0
#core
/** @type {string | undefined} */
#discoveryId
/** @type {import('random-access-storage') | undefined} */
#storage
#createStorage
Expand Down Expand Up @@ -123,7 +125,8 @@ class CoreIndexStream extends Readable {
const { discoveryKey } = this.#core
/* istanbul ignore next: just to keep TS happy - after core.ready() this is set */
if (!discoveryKey) throw new Error('Missing discovery key')
this.#storage = this.#createStorage(getStorageName(discoveryKey))
this.#discoveryId = discoveryKey.toString('hex')
this.#storage = this.#createStorage(getStorageName(this.#discoveryId))
this.#indexedBitfield = await Bitfield.open(this.#storage)
this.#inProgressBitfield = await new Bitfield()
this.#core.on('append', this[kHandleAppend])
Expand Down Expand Up @@ -186,8 +189,8 @@ class CoreIndexStream extends Readable {
this.#inProgressBitfield?.set(index, true)
this.#inProgress++
/* istanbul ignore next: this should always be set at this point */
if (!this.#core.key) throw new Error('Missing core key')
const entry = { key: this.#core.key, block, index }
if (!this.#discoveryId) throw new Error('Missing core key')
const entry = { discoveryId: this.#discoveryId, block, index }
this.#readBufferAvailable = this.push(entry)
return true
}
Expand All @@ -207,9 +210,8 @@ class CoreIndexStream extends Readable {

exports.CoreIndexStream = CoreIndexStream

/** @param {Buffer} discoveryKey */
function getStorageName(discoveryKey) {
const id = discoveryKey.toString('hex')
/** @param {string} id */
function getStorageName(id) {
return [id.slice(0, 2), id.slice(2, 4), id].join('/')
}

Expand Down
6 changes: 3 additions & 3 deletions lib/multi-core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ class MultiCoreIndexStream extends Readable {
stream.core
.ready()
.then(() => {
const coreKey = stream.core.key
const discoveryKey = stream.core.discoveryKey
/* istanbul ignore next: this is set after ready */
if (!coreKey) return
this.#streamsById.set(coreKey.toString('hex'), stream)
if (!discoveryKey) return
this.#streamsById.set(discoveryKey.toString('hex'), stream)
})
.catch(noop)
this.#readable.add(stream)
Expand Down
2 changes: 1 addition & 1 deletion lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export type ValueEncoding = 'binary' | 'utf-8' | 'json'

export interface Entry<T extends ValueEncoding = 'binary'> {
index: number
key: Buffer
discoveryId: string
block: T extends 'binary' ? Buffer : T extends 'utf-8' ? string : JSONValue
}

Expand Down
18 changes: 10 additions & 8 deletions test/helpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ async function generateFixtures(cores, count) {
const offset = core.length
const blocks = generateFixture(offset, offset + count)
await core.append(blocks)
entries.push.apply(entries, blocksToExpected(blocks, core.key, offset))
// @ts-expect-error - we know discoveryKey is set here
const discoveryId = core.discoveryKey.toString('hex')
entries.push.apply(entries, blocksToExpected(blocks, discoveryId, offset))
}
return entries
}
Expand Down Expand Up @@ -119,8 +121,8 @@ async function throttledDrain(emitter) {
* @returns number
*/
function sort(a, b) {
const aKey = a.key.toString('hex') + a.block.toString()
const bKey = b.key.toString('hex') + b.block.toString()
const aKey = a.discoveryId + a.block.toString()
const bKey = b.discoveryId + b.block.toString()
return aKey < bKey ? -1 : aKey > bKey ? 1 : 0
}

Expand All @@ -132,12 +134,12 @@ function sortEntries(e) {
/**
*
* @param {Buffer[]} blocks
* @param {Buffer} key
* @returns
* @param {string} discoveryId
* @returns {Entry[]}
*/
function blocksToExpected(blocks, key, offset = 0) {
function blocksToExpected(blocks, discoveryId, offset = 0) {
return blocks.map((block, i) => ({
key,
discoveryId,
block,
index: i + offset,
}))
Expand All @@ -159,7 +161,7 @@ async function createMultiple(n) {
function logEntries(e) {
console.log(
sortEntries(e).map((e) => ({
key: e.key.toString('hex'),
discoveryId: e.discoveryId.slice(0, 7),
block: e.block.toString(),
index: e.index,
}))
Expand Down
4 changes: 4 additions & 0 deletions test/multi-core-indexer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ test('Appends from a replicated core are indexed', async (t) => {
const remote = (remoteCores[i] = await create(core.key))
replicate(core, remoteCores[i], t)
await remote.update({ wait: true })
// @ts-ignore - Hypercore typings are missing
await remote.download({ start: 0, end: remote.length }).downloaded()
}
/** @type {Entry[]} */
Expand Down Expand Up @@ -495,6 +496,7 @@ test('Closing before batch complete should resume on next start', async (t) => {
await /** @type {Promise<void>} */ (
new Promise((res) => {
indexer1.on('index-state', onIndexState)
/** @param {import('../lib/types').IndexState} state */
function onIndexState(state) {
if (state.remaining > 2500) return
indexer1.off('index-state', onIndexState)
Expand Down Expand Up @@ -524,11 +526,13 @@ test('Closing before batch complete should resume on next start', async (t) => {

// This checks that storage names do not change between versions, which would be a breaking change
test('Consistent storage folders', async (t) => {
/** @type {string[]} */
const storageNames = []
const cores = []
for (const keyPair of testKeypairs.slice(0, 5)) {
cores.push(await create({ keyPair }))
}
/** @param {string} name */
function createStorage(name) {
storageNames.push(name)
return new ram()
Expand Down
8 changes: 8 additions & 0 deletions test/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"compilerOptions": {
"noEmit": true,
"emitDeclarationOnly": false
},
"extends": "../tsconfig.json",
"include": ["**/*.js"]
}
26 changes: 15 additions & 11 deletions test/unit-tests/core-index-stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ test('destroy before open', async (t) => {
test('Indexes all items already in a core', async (t) => {
const a = await create()
const blocks = generateFixture(0, 10)
const expected = blocksToExpected(blocks, a.key)
// @ts-ignore
const expected = blocksToExpected(blocks, a.discoveryKey.toString('hex'))
await a.append(blocks)
/** @type {any[]} */
const entries = []
Expand All @@ -56,7 +57,8 @@ test('.remaining property is accurate', async (t) => {
const totalBlocks = 100
const a = await create()
const blocks = generateFixture(0, totalBlocks)
const expected = blocksToExpected(blocks, a.key)
// @ts-ignore
const expected = blocksToExpected(blocks, a.discoveryKey.toString('hex'))
await a.append(blocks)
/** @type {any[]} */
const entries = []
Expand All @@ -81,7 +83,8 @@ test('Indexes items appended after initial index', async (t) => {
stream.on('data', (entry) => entries.push(entry))
await once(stream, 'drained')
t.same(entries, [], 'no entries before append')
const expected = blocksToExpected(blocks, a.key)
// @ts-ignore
const expected = blocksToExpected(blocks, a.discoveryKey.toString('hex'))
await a.append(blocks)
await once(stream, 'drained')
t.same(entries, expected)
Expand All @@ -96,7 +99,7 @@ test('Readable stream from sparse hypercore', async (t) => {
replicate(a, b, t)

const range = b.download({ start: 5, end: 20 })
await range.downloaded()
await range.done()

const stream = new CoreIndexStream(b, () => new ram())
/** @type {Buffer[]} */
Expand All @@ -106,7 +109,7 @@ test('Readable stream from sparse hypercore', async (t) => {

t.same(entries, blocks.slice(5, 20))
const range2 = b.download({ start: 50, end: 60 })
await Promise.all([range2.downloaded(), throttledDrain(stream)])
await Promise.all([range2.done(), throttledDrain(stream)])

t.same(
entries.sort(),
Expand All @@ -118,6 +121,7 @@ test("'indexing' and 'drained' events are paired", async (t) => {
const a = await create()
const blocks = generateFixture(0, 100)
await a.append(blocks)
// @ts-ignore
const b = await create(a.key)

replicate(a, b, t)
Expand All @@ -136,7 +140,7 @@ test("'indexing' and 'drained' events are paired", async (t) => {
stream.resume()

const range = b.download({ start: 0, end: a.length })
await Promise.all([range.downloaded(), throttledDrain(stream)])
await Promise.all([range.done(), throttledDrain(stream)])

t.equal(indexingEvents, idleEvents)
// This is just to check that we're actually testing something
Expand All @@ -152,7 +156,7 @@ test('Appends from a replicated core are indexed', async (t) => {
replicate(a, b, t)
await b.update({ wait: true })
const range1 = b.download({ start: 0, end: b.length })
await range1.downloaded()
await range1.done()

const stream = new CoreIndexStream(b, () => new ram())
/** @type {Buffer[]} */
Expand Down Expand Up @@ -200,12 +204,12 @@ test('Maintains index state', async (t) => {
/**
*
* @param {Buffer[]} blocks
* @param {Buffer} key
* @returns
* @param {string} discoveryId
* @returns {import('../../types/index.js').Entry[]}
*/
function blocksToExpected(blocks, key) {
function blocksToExpected(blocks, discoveryId) {
return blocks.map((block, i) => ({
key,
discoveryId,
block,
index: i,
}))
Expand Down
26 changes: 18 additions & 8 deletions test/unit-tests/multi-core-index-stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ const {
sortEntries,
} = require('../helpers')

/** @typedef {import('../../types/index.js').Entry} Entry */

test('Indexes all items already in a core', async (t) => {
const cores = await createMultiple(5)
const expected = await generateFixtures(cores, 1000)
const indexStreams = cores.map(
(core) => new CoreIndexStream(core, () => new ram())
)
/** @type {Entry[]} */
const entries = []
const stream = new MultiCoreIndexStream(indexStreams)
const ws = new Writable({
Expand All @@ -46,6 +49,7 @@ test('Adding index streams after initialization', async (t) => {
const indexStreams = cores.map(
(core) => new CoreIndexStream(core, () => new ram())
)
/** @type {Entry[]} */
const entries = []
const stream = new MultiCoreIndexStream(indexStreams.slice(0, 2))
stream.addStream(indexStreams[2])
Expand Down Expand Up @@ -77,13 +81,14 @@ test('.remaining is as expected', async (t) => {
const indexStreams = cores.map(
(core) => new CoreIndexStream(core, () => new ram())
)
/** @type {Entry[]} */
const entries = []
const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 })
const ws = new Writable({
writev: (data, cb) => {
entries.push(...data)
for (const { key, index } of data) {
stream.setIndexed(key.toString('hex'), index)
for (const { discoveryId, index } of data) {
stream.setIndexed(discoveryId, index)
}
t.equal(
stream.remaining,
Expand All @@ -110,6 +115,7 @@ test('Indexes items appended after initial index', async (t) => {
const indexStreams = cores.map(
(core) => new CoreIndexStream(core, () => new ram())
)
/** @type {Entry[]} */
const entries = []
const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 })
stream.on('data', (entry) => entries.push(entry))
Expand All @@ -126,7 +132,9 @@ test('Indexes items appended after initial index', async (t) => {
test('index sparse hypercores', async (t) => {
const coreCount = 5
const localCores = await createMultiple(coreCount)
/** @type {Entry[]} */
const expected = []
/** @type {Entry[]} */
const expected2 = []
const indexStreams = []
const remoteCores = Array(coreCount)
Expand All @@ -140,9 +148,10 @@ test('index sparse hypercores', async (t) => {

for (const core of remoteCores) {
const range = core.download({ start: 5, end: 20 })
await range.downloaded()
await range.done()
indexStreams.push(new CoreIndexStream(core, () => new ram()))
}
/** @type {Entry[]} */
const entries = []
const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 })
stream.on('data', (entry) => entries.push(entry))
Expand All @@ -152,9 +161,7 @@ test('index sparse hypercores', async (t) => {

await Promise.all([
throttledDrain(stream),
...remoteCores.map((core) =>
core.download({ start: 50, end: 60 }).downloaded()
),
...remoteCores.map((core) => core.download({ start: 50, end: 60 }).done()),
])

t.same(sortEntries(entries), sortEntries([...expected, ...expected2]))
Expand All @@ -171,9 +178,10 @@ test('Appends from a replicated core are indexed', async (t) => {
replicate(core, remoteCores[i], t)
await remote.update({ wait: true })
const range = remote.download({ start: 0, end: remote.length })
await range.downloaded()
await range.done()
indexStreams.push(new CoreIndexStream(core, () => new ram()))
}
/** @type {Entry[]} */
const entries = []
const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 })
stream.on('data', (entry) => entries.push(entry))
Expand All @@ -192,8 +200,10 @@ test('Appends from a replicated core are indexed', async (t) => {

test('Maintains index state', async (t) => {
const cores = await createMultiple(5)
/** @type {Array<(name: string) => import('random-access-storage')>} */
const storages = []
await generateFixtures(cores, 1000)
/** @type {Entry[]} */
const entries = []

for (const core of cores) {
Expand All @@ -213,7 +223,7 @@ test('Maintains index state', async (t) => {
const stream = new MultiCoreIndexStream(indexStreams)
stream.on('data', (entry) => {
entries.push(entry)
stream.setIndexed(entry.key.toString('hex'), entry.index)
stream.setIndexed(entry.discoveryId, entry.index)
})

const expectedPromise = generateFixtures(cores, 1000)
Expand Down

0 comments on commit 3ccc387

Please sign in to comment.