diff --git a/benchmarks/core-replication-state.js b/benchmarks/core-replication-state.js new file mode 100644 index 00000000..ce1b644f --- /dev/null +++ b/benchmarks/core-replication-state.js @@ -0,0 +1,46 @@ +import bench from 'nanobench' +import { + PeerState, + deriveState, +} from '../src/core-manager/core-replication-state.js' +import RemoteBitfield from '../src/core-manager/remote-bitfield.js' +import createRandom from 'math-random-seed' + +bench('deriveState x 10,000; 10 peers; 10,000 blocks', function (b) { + const length = 10000 + const remotePeers = new Map() + for (let i = 0; i < 10; i++) { + remotePeers.set(i, createPeer(length, i + '')) + } + const localPeer = createPeer(length, 'local') + + const state = { + length, + localState: localPeer, + remoteStates: remotePeers, + } + + b.start() + for (let i = 0; i < 10000; i++) { + deriveState(state) + } + b.end() +}) + +function createPeer(length, seed) { + const peer = new PeerState() + peer.setHavesBitfield(createBitfield(length, seed)) + return peer +} + +function createBitfield(length, seed) { + const random = createRandom(seed) + const b = new RemoteBitfield() + let i = random() < 0.5 ? 0 : Math.ceil(random() * 200) + while (i < length) { + const length = Math.ceil(random() * 200) + b.setRange(i, length, true) + i += length + Math.ceil(random() * 200) + } + return b +} diff --git a/package-lock.json b/package-lock.json index 71d2c0fa..d04a3137 100644 --- a/package-lock.json +++ b/package-lock.json @@ -70,6 +70,7 @@ "eslint": "^8.39.0", "fastify": "^4.20.0", "light-my-request": "^5.10.0", + "math-random-seed": "^2.0.0", "nanobench": "^3.0.0", "npm-run-all": "^4.1.5", "prettier": "^2.8.8", @@ -4611,6 +4612,15 @@ "node": ">= 12" } }, + "node_modules/math-random-seed": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/math-random-seed/-/math-random-seed-2.0.0.tgz", + "integrity": "sha512-QJ7/oqB5MExXjyJCUC8tQGIHo9GL2vB9YY3i16qZCige9SQBJrQTtF/eBG8rFZU/xQxLKIbzeYeOuRYuzFrncg==", + "dev": true, + "dependencies": { + "random-bytes-seed": "^1.0.3" + } + }, "node_modules/memoizee": { "version": "0.4.15", "dev": true, @@ -4827,8 +4837,9 @@ }, "node_modules/nanobench": { "version": "3.0.0", + "resolved": "https://registry.npmjs.org/nanobench/-/nanobench-3.0.0.tgz", + "integrity": "sha512-PaNQpZVmB/hFDQSBExgEJ5/1nzdqaIMUXzwWxFCYRl6j8lFt6kdNAUy1ieyPeMYWQk3AAmRg6UQn4g96J1SgLA==", "dev": true, - "license": "MIT", "dependencies": { "chalk": "^5.0.1", "mutexify": "^1.4.0", @@ -5949,6 +5960,12 @@ "version": "1.0.0", "license": "MIT" }, + "node_modules/random-bytes-seed": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/random-bytes-seed/-/random-bytes-seed-1.0.3.tgz", + "integrity": "sha512-O+eniMt8Sj2iAn2q1x5VEirS/XvbtwYcXNDbOAcRtGN+OhC48cmzS5ksf9qEHRVKC1I8A4qzjucNVElddofB0A==", + "dev": true + }, "node_modules/rc": { "version": "1.2.8", "license": "(BSD-2-Clause OR MIT OR Apache-2.0)", diff --git a/package.json b/package.json index a1814c54..c720523a 100644 --- a/package.json +++ b/package.json @@ -79,6 +79,7 @@ "eslint": "^8.39.0", "fastify": "^4.20.0", "light-my-request": "^5.10.0", + "math-random-seed": "^2.0.0", "nanobench": "^3.0.0", "npm-run-all": "^4.1.5", "prettier": "^2.8.8", diff --git a/src/core-manager/core-replication-state.js b/src/core-manager/core-replication-state.js new file mode 100644 index 00000000..27355f11 --- /dev/null +++ b/src/core-manager/core-replication-state.js @@ -0,0 +1,418 @@ +import { TypedEmitter } from 'tiny-typed-emitter' +import { keyToId } from '../utils.js' +import RemoteBitfield, { BITS_PER_PAGE } from './remote-bitfield.js' + +/** + * @typedef {RemoteBitfield} Bitfield + */ +/** + * @typedef {string} PeerId + */ +/** + * @typedef {Object} InternalState + * @property {number | undefined} length Core length, e.g. how many blocks in the core (including blocks that are not downloaded) + * @property {PeerState} localState + * @property {Map} remoteStates + */ +/** + * @typedef {object} PeerSimpleState + * @property {number} have blocks the peer has locally + * @property {number} want blocks the peer wants, and at least one peer has + * @property {number} wanted blocks the peer has that at least one peer wants + * @property {number} missing blocks the peer wants but no peer has + */ +/** + * @typedef {PeerSimpleState & { connected: boolean }} RemotePeerSimpleState + */ +/** + * @typedef {object} DerivedState + * @property {number} coreLength known (sparse) length of the core + * @property {PeerSimpleState} localState local state + * @property {Record} remoteStates map of state of all known peers + */ +/** + * @typedef {object} CoreReplicationEvents + * @property {() => void} update + */ + +/** + * Track replication state for a core identified by `discoveryId`. Can start + * tracking state before the core instance exists locally, via the "preHave" + * messages received over the project creator core. + * + * Because deriving the state is expensive (it iterates through the bitfields of + * all peers), this is designed to be pull-based: an `update` event signals that + * the state is updated, but does not pass the state. The consumer can "pull" + * the state when it wants it via `coreReplicationState.getState()`. + * + * Each peer (including the local peer) has a state of: + * 1. `have` - number of blocks the peer has locally + * 2. `want` - number of blocks the peer wants, and at least one peer has + * 3. `wanted` - number of blocks the peer has that at least one peer wants + * 4. `missing` - number of blocks the peer wants but no peer has + * + * @extends {TypedEmitter} + */ +export class CoreReplicationState extends TypedEmitter { + /** @type {import('hypercore')<'binary', Buffer>} */ + #core + /** @type {InternalState['remoteStates']} */ + #remoteStates = new Map() + /** @type {InternalState['localState']} */ + #localState = new PeerState() + #discoveryId + /** @type {DerivedState | null} */ + #cachedState = null + + /** + * @param {string} discoveryId Discovery ID for the core that this is representing + */ + constructor(discoveryId) { + super() + this.#discoveryId = discoveryId + } + + /** @type {() => DerivedState} */ + getState() { + if (this.#cachedState) return this.#cachedState + return deriveState({ + length: this.#core?.length, + localState: this.#localState, + remoteStates: this.#remoteStates, + }) + } + + /** + * Called whenever the state changes, so we clear the cache because next call + * to getState() will need to re-derive the state + */ + #update() { + this.#cachedState = null + this.emit('update') + } + + /** + * Attach a core. The replication state can be initialized without a core + * instance, because we could receive peer want and have states via extension + * messages before we have the core key that allows us to create a core + * instance. + * + * @param {import('hypercore')<'binary', Buffer>} core + */ + attachCore(core) { + // @ts-ignore - we know discoveryKey exists here + const discoveryId = keyToId(core.discoveryKey) + if (discoveryId !== this.#discoveryId) { + throw new Error('discoveryId does not match') + } + if (this.#core) return + + this.#core = core + this.#localState.setHavesBitfield( + // @ts-ignore - internal property + core?.core?.bitfield + ) + + for (const peer of this.#core.peers) { + this.#onPeerAdd(peer) + } + + this.#core.on('peer-add', this.#onPeerAdd) + + this.#core.on('peer-remove', this.#onPeerRemove) + + // TODO: Maybe we need to also wait on core.update() and then emit state? + + // These events happen when the local bitfield changes, so we want to emit + // state because it will have changed + this.#core.on('download', () => { + this.#update() + }) + + this.#core.on('append', () => { + this.#update() + }) + } + + /** + * Add a pre-emptive "have" bitfield for a peer. This is used when we receive + * a peer "have" via extension message - it allows us to have a state for the + * peer before the peer actually starts replicating this core + * + * @param {PeerId} peerId + * @param {Bitfield} bitfield + */ + setHavesBitfield(peerId, bitfield) { + const peerState = this.#getPeerState(peerId) + peerState.setPreHavesBitfield(bitfield) + this.#update() + } + + /** + * Add a ranges of wanted blocks for a peer. By default a peer wants all + * blocks in a core - calling this will change the peer to only want the + * blocks/ranges that are added here + * + * @param {PeerId} peerId + * @param {Array<{ start: number, length: number }>} ranges + */ + setPeerWants(peerId, ranges) { + const peerState = this.#getPeerState(peerId) + for (const { start, length } of ranges) { + peerState.setWantRange({ start, length }) + } + this.#update() + } + + /** + * @param {PeerId} peerId + */ + #getPeerState(peerId) { + let peerState = this.#remoteStates.get(peerId) + if (!peerState) { + peerState = new PeerState() + this.#remoteStates.set(peerId, peerState) + } + return peerState + } + + /** + * Handle a peer being added to the core - updates state and adds listeners to + * emit state updates whenever the peer remote bitfield changes + * + * (defined as class field to bind to `this`) + * @param {any} peer + */ + #onPeerAdd = (peer) => { + const peerId = keyToId(peer.remotePublicKey) + + // Update state to ensure this peer is in the state and set to connected + const peerState = this.#getPeerState(peerId) + peerState.connected = true + + // A peer can have a pre-emptive "have" bitfield received via an extension + // message, but when the peer actually connects then we switch to the actual + // bitfield from the peer object + peerState.setHavesBitfield(peer.remoteBitfield) + this.#update() + + // We want to emit state when a peer's bitfield changes, which can happen as + // a result of these two internal calls. + const originalOnBitfield = peer.onbitfield + const originalOnRange = peer.onrange + peer.onbitfield = (/** @type {any[]} */ ...args) => { + originalOnBitfield.apply(peer, args) + this.#update() + } + peer.onrange = (/** @type {any[]} */ ...args) => { + originalOnRange.apply(peer, args) + this.#update() + } + } + + /** + * Handle a peer being removed - keeps it in state, but sets state.connected = false + * + * (defined as class field to bind to `this`) + * @param {any} peer + */ + #onPeerRemove = (peer) => { + const peerId = keyToId(peer.remotePublicKey) + const peerState = this.#getPeerState(peerId) + peerState.connected = false + this.#update() + } +} + +/** + * Replication state for a core for a peer. Uses an internal bitfield from + * Hypercore to track which blocks the peer has. Default is that a peer wants + * all blocks, but can set ranges of "wants". Setting a want range changes all + * other blocks to "not wanted" + * + * @private + * Only exported for testing + */ +export class PeerState { + /** @type {Bitfield | undefined} */ + #preHaves + /** @type {Bitfield | undefined} */ + #haves + /** @type {Bitfield | undefined} */ + #wants + connected = false + /** + * @param {Bitfield} bitfield + */ + setPreHavesBitfield(bitfield) { + this.#preHaves = bitfield + } + /** + * @param {Bitfield} bitfield + */ + setHavesBitfield(bitfield) { + this.#haves = bitfield + } + /** + * @param {Bitfield} bitfield + */ + setWantsBitfield(bitfield) { + this.#wants = bitfield + } + /** + * Set a range of blocks that a peer wants. This is not part of the Hypercore + * protocol, so we need our own extension messages that a peer can use to + * inform us which blocks they are interested in. For most cores peers always + * want all blocks, but for blob cores often peers only want preview or + * thumbnail versions of media + * + * @param {{ start: number, length: number }} range + */ + setWantRange({ start, length }) { + if (!this.#wants) this.#wants = new RemoteBitfield() + this.#wants.setRange(start, length, true) + } + /** + * Returns whether the peer has the block at `index`. If a pre-have bitfield + * has been passed, this is used if no connected peer bitfield is available. + * If neither bitfield is available then this defaults to `false` + * @param {number} index + */ + have(index) { + return this.#haves + ? this.#haves.get(index) + : this.#preHaves + ? this.#preHaves.get(index) + : false + } + /** + * Return the "haves" for the 32 blocks from `index`, as a 32-bit integer + * + * @param {number} index + * @returns {number} 32-bit number representing whether the peer has or not + * the 32 blocks from `index` + */ + haveWord(index) { + if (this.#haves) return getBitfieldWord(this.#haves, index) + if (this.#preHaves) return getBitfieldWord(this.#preHaves, index) + return 0 + } + /** + * Returns whether this peer wants block at `index`. Defaults to `true` for + * all blocks + * @param {number} index + */ + want(index) { + return this.#wants ? this.#wants.get(index) : true + } + /** + * Return the "wants" for the 32 blocks from `index`, as a 32-bit integer + * + * @param {number} index + * @returns {number} 32-bit number representing whether the peer wants or not + * the 32 blocks from `index` + */ + wantWord(index) { + if (this.#wants) return getBitfieldWord(this.#wants, index) + // This is a 32-bit number with all bits set + return 2 ** 32 - 1 + } +} + +/** + * Derive count for each peer: "want"; "have"; "wanted". There is definitely a + * more performant and clever way of doing this, but at least with this + * implementation I can understand what I am doing. + * + * @param {InternalState} coreState + * + * @private + * Only exporteed for testing + */ +export function deriveState(coreState) { + const peerIds = ['local', ...coreState.remoteStates.keys()] + const peers = [coreState.localState, ...coreState.remoteStates.values()] + + /** @type {PeerSimpleState[]} */ + const peerStates = new Array(peers.length) + const length = coreState.length || 0 + for (let i = 0; i < peerStates.length; i++) { + peerStates[i] = { want: 0, have: 0, wanted: 0, missing: 0 } + } + const haves = new Array(peerStates.length) + let want = 0 + for (let i = 0; i < length; i += 32) { + const truncate = 2 ** Math.min(32, length - i) - 1 + let someoneHasIt = 0 + for (let j = 0; j < peers.length; j++) { + haves[j] = peers[j].haveWord(i) & truncate + someoneHasIt |= haves[j] + peerStates[j].have += bitCount32(haves[j]) + } + let someoneWantsIt = 0 + for (let j = 0; j < peers.length; j++) { + // A block is a want if: + // 1. The peer wants it + // 2. They don't have it + // 3. Someone does have it + const wouldLikeIt = peers[j].wantWord(i) & ~haves[j] + want = wouldLikeIt & someoneHasIt + someoneWantsIt |= want + peerStates[j].want += bitCount32(want) + // A block is missing if: + // 1. The peer wants it + // 2. The peer doesn't have it + // 3. No other peer has it + // Need to truncate to the core length, since otherwise we would get + // missing values beyond core length + const missing = wouldLikeIt & ~someoneHasIt & truncate + peerStates[j].missing += bitCount32(missing) + } + for (let j = 0; j < peerStates.length; j++) { + // A block is wanted if: + // 1. Someone wants it + // 2. The peer has it + const wanted = someoneWantsIt & haves[j] + peerStates[j].wanted += bitCount32(wanted) + } + } + /** @type {DerivedState} */ + const derivedState = { + coreLength: length, + localState: peerStates[0], + remoteStates: {}, + } + for (let j = 1; j < peerStates.length; j++) { + const peerState = /** @type {RemotePeerSimpleState} */ (peerStates[j]) + peerState.connected = peers[j].connected + derivedState.remoteStates[peerIds[j]] = peerState + } + return derivedState +} + +/** + * Apologies for the obscure code. From + * https://stackoverflow.com/a/109025/903300 + * @param {number} n + */ +export function bitCount32(n) { + n = n - ((n >> 1) & 0x55555555) + n = (n & 0x33333333) + ((n >> 2) & 0x33333333) + return (((n + (n >> 4)) & 0xf0f0f0f) * 0x1010101) >> 24 +} + +/** + * Get a 32-bit "chunk" (word) of the bitfield. + * + * @param {RemoteBitfield} bitfield + * @param {number} index + */ +function getBitfieldWord(bitfield, index) { + if (index % 32 !== 0) throw new Error('Index must be multiple of 32') + const j = index & (BITS_PER_PAGE - 1) + const i = (index - j) / BITS_PER_PAGE + + const p = bitfield._pages.get(i) + + return p ? p.bitfield[j / 32] : 0 +} diff --git a/src/core-manager/remote-bitfield.js b/src/core-manager/remote-bitfield.js index 02291e8c..fa206068 100644 --- a/src/core-manager/remote-bitfield.js +++ b/src/core-manager/remote-bitfield.js @@ -5,7 +5,7 @@ import BigSparseArray from 'big-sparse-array' import { quickbit } from './compat.js' -const BITS_PER_PAGE = 32768 +export const BITS_PER_PAGE = 32768 const BYTES_PER_PAGE = BITS_PER_PAGE / 8 const WORDS_PER_PAGE = BYTES_PER_PAGE / 4 const BITS_PER_SEGMENT = 2097152 diff --git a/tests/core-replication-state.js b/tests/core-replication-state.js new file mode 100644 index 00000000..de187541 --- /dev/null +++ b/tests/core-replication-state.js @@ -0,0 +1,435 @@ +import NoiseSecretStream from '@hyperswarm/secret-stream' +import test from 'brittle' +import { + deriveState, + PeerState, + CoreReplicationState, + bitCount32, +} from '../src/core-manager/core-replication-state.js' +import RemoteBitfield, { + BITS_PER_PAGE, +} from '../src/core-manager/remote-bitfield.js' +import { createCore } from './helpers/index.js' +// import { setTimeout } from 'timers/promises' +import { once } from 'node:events' +import pTimeout from 'p-timeout' + +/** + * @type {Array<{ + * message: string, + * state: { + * length: number, + * localState: Parameters[0], + * remoteStates: Array[0]> + * }, + * expected: import('../src/core-manager/core-replication-state.js').DerivedState + * }>} + */ +const scenarios = [ + { + message: '3 peers, start with haves, test want, have, wanted and missing', + state: { + length: 4, + localState: { have: 0b0111 }, + remoteStates: [{ have: 0b0011 }, { have: 0b0101 }, { have: 0b0001 }], + }, + expected: { + coreLength: 4, + localState: { want: 0, have: 3, wanted: 2, missing: 1 }, + remoteStates: { + peer0: { want: 1, have: 2, wanted: 1, missing: 1, connected: false }, + peer1: { want: 1, have: 2, wanted: 1, missing: 1, connected: false }, + peer2: { want: 2, have: 1, wanted: 0, missing: 1, connected: false }, + }, + }, + }, + { + message: 'No bitfields', + state: { + length: 4, + localState: { have: 0 }, // always have a bitfield for this + remoteStates: [{}, {}], + }, + expected: { + coreLength: 4, + localState: { want: 0, have: 0, wanted: 0, missing: 4 }, + remoteStates: { + peer0: { want: 0, have: 0, wanted: 0, missing: 4, connected: false }, + peer1: { want: 0, have: 0, wanted: 0, missing: 4, connected: false }, + }, + }, + }, + { + message: 'connected = true', + state: { + length: 3, + localState: { have: 0b111 }, + remoteStates: [{ have: 0b001, want: 0b011, connected: true }], + }, + expected: { + coreLength: 3, + localState: { want: 0, have: 3, wanted: 1, missing: 0 }, + remoteStates: { + peer0: { want: 1, have: 1, wanted: 0, missing: 0, connected: true }, + }, + }, + }, + { + message: 'test starting with wants', + state: { + length: 3, + localState: { have: 0b111 }, + remoteStates: [{ have: 0b001, want: 0b011 }], + }, + expected: { + coreLength: 3, + localState: { want: 0, have: 3, wanted: 1, missing: 0 }, + remoteStates: { + peer0: { want: 1, have: 1, wanted: 0, missing: 0, connected: false }, + }, + }, + }, + { + message: 'test starting with prehaves', + state: { + length: 3, + localState: { have: 0b111 }, + remoteStates: [{ prehave: 0b011 }], + }, + expected: { + coreLength: 3, + localState: { want: 0, have: 3, wanted: 1, missing: 0 }, + remoteStates: { + peer0: { want: 1, have: 2, wanted: 0, missing: 0, connected: false }, + }, + }, + }, + { + message: 'test starting with prehaves, then haves', + state: { + length: 3, + localState: { have: 0b111 }, + remoteStates: [{ prehave: 0b011, have: 0b111 }], + }, + expected: { + coreLength: 3, + localState: { want: 0, have: 3, wanted: 0, missing: 0 }, + remoteStates: { + peer0: { want: 0, have: 3, wanted: 0, missing: 0, connected: false }, + }, + }, + }, + { + message: 'test length > 32', + state: { + length: 72, + localState: { have: 2 ** 50 - 1 }, + remoteStates: [ + { have: 2 ** 40 - 1 }, + { have: BigInt(2 ** 40 - 1) << BigInt(10), want: (2 ** 10 - 1) << 5 }, + { have: BigInt(2 ** 40 - 1) << BigInt(10), want: (2 ** 10 - 1) << 5 }, + ], + }, + expected: { + coreLength: 72, + localState: { want: 0, have: 50, wanted: 15, missing: 22 }, + remoteStates: { + peer0: { want: 10, have: 40, wanted: 5, missing: 22, connected: false }, + peer1: { want: 5, have: 40, wanted: 10, missing: 0, connected: false }, + peer2: { want: 5, have: 40, wanted: 10, missing: 0, connected: false }, + }, + }, + }, + { + message: 'haves and wants beyond length', + state: { + length: 2, + localState: { have: 0b1111 }, + remoteStates: [{ have: 0, want: 0b1110 }, { have: 0 }], + }, + expected: { + coreLength: 2, + localState: { want: 0, have: 2, wanted: 2, missing: 0 }, + remoteStates: { + peer0: { want: 1, have: 0, wanted: 0, missing: 0, connected: false }, + peer1: { want: 2, have: 0, wanted: 0, missing: 0, connected: false }, + }, + }, + }, +] + +test('deriveState() scenarios', (t) => { + for (const { state, expected, message } of scenarios) { + const derivedState = deriveState({ + length: state.length, + localState: createState(state.localState), + remoteStates: new Map( + state.remoteStates.map((s, i) => ['peer' + i, createState(s)]) + ), + }) + t.alike(derivedState, expected, message) + } +}) + +test('deriveState() have at index beyond bitfield page size', (t) => { + const localState = createState({ have: 2 ** 10 - 1 }) + const remoteState = new PeerState() + const remoteHaveBitfield = new RemoteBitfield() + remoteHaveBitfield.set(BITS_PER_PAGE - 1 + 10, true) + remoteState.setHavesBitfield(remoteHaveBitfield) + const state = { + length: BITS_PER_PAGE + 10, + localState, + remoteStates: new Map([['peer0', remoteState]]), + } + const expected = { + coreLength: BITS_PER_PAGE + 10, + localState: { + want: 1, + have: 10, + wanted: 10, + missing: BITS_PER_PAGE - 1, + }, + remoteStates: { + peer0: { + want: 10, + have: 1, + wanted: 1, + missing: BITS_PER_PAGE - 1, + connected: false, + }, + }, + } + t.alike(deriveState(state), expected) +}) + +test('CoreReplicationState', async (t) => { + for (const { state, expected, message } of scenarios) { + const localCore = await createCore() + await localCore.ready() + const crs = new CoreReplicationState(localCore.discoveryKey.toString('hex')) + crs.attachCore(localCore) + const blocks = new Array(state.length).fill('block') + await localCore.append(blocks) + const downloadPromises = [] + const seed = Buffer.alloc(32) + seed.write('local') + const kp1 = NoiseSecretStream.keyPair(seed) + const peerIds = new Map() + const connectedState = new Map() + for (const [ + index, + { have, want, prehave }, + ] of state.remoteStates.entries()) { + const seed = Buffer.allocUnsafe(32).fill(index) + const kp2 = NoiseSecretStream.keyPair(seed) + const peerId = kp2.publicKey.toString('hex') + peerIds.set('peer' + index, peerId) + connectedState.set(peerId, false) + + // We unit test deriveState with no bitfields, but we need something here + // for things to work + crs.setHavesBitfield(peerId, createBitfield(prehave || 0)) + if (typeof have !== 'number' && typeof want !== 'number') continue + connectedState.set(peerId, true) + const core = await createCore(localCore.key) + setPeerWants(crs, peerId, want) + replicate(localCore, core, { kp1, kp2 }) + await core.update({ wait: true }) + downloadPromises.push(downloadCore(core, have)) + } + await Promise.all(downloadPromises) + await clearCore(localCore, state.localState.have) + const expectedRemoteStates = {} + for (const [key, value] of Object.entries(expected.remoteStates)) { + const peerId = peerIds.get(key) + expectedRemoteStates[peerId] = { + ...value, + connected: connectedState.get(peerId), + } + } + await updateWithTimeout(crs, 100) + t.alike( + crs.getState(), + { ...expected, remoteStates: expectedRemoteStates }, + message + ) + } +}) + +// This takes several hours to run on my M2 Macbook Pro (it's the slowBitCount +// that takes a long time - bitCount32 takes about 23 seconds), so not running +// this by default. The test did pass when I ran it though. +test.skip('bitCount32', (t) => { + for (let n = 0; n < 2 ** 32; n++) { + if (n % 2 ** 28 === 0) console.log(n) + const bitCount = bitCount32(n) + const expected = slowBitCount(n) + if (bitCount !== expected) t.fail('bitcount is correct ' + n) + } +}) + +/** + * Slow but understandable implementation to compare with fast obscure implementation + * @param {number} n + */ +function slowBitCount(n) { + return n.toString(2).replace(/0/g, '').length +} + +/** + * + * @param {{ have?: number, prehave?: number, want?: number, connected?: number }} param0 + */ +function createState({ have, prehave, want, connected }) { + const peerState = new PeerState() + if (prehave) { + const bitfield = createBitfield(prehave) + peerState.setPreHavesBitfield(bitfield) + } + if (have) { + const bitfield = createBitfield(have) + peerState.setHavesBitfield(bitfield) + } + if (want) { + const bitfield = createBitfield(want) + peerState.setWantsBitfield(bitfield) + } + if (typeof connected === 'boolean') peerState.connected = connected + return peerState +} + +/** + * Create a bitfield from a number, e.g. `createBitfield(0b1011)` will create a + * bitfield with the 1st, 2nd and 4th bits set. + * @param {number} bits + */ +function createBitfield(bits) { + if (bits > Number.MAX_SAFE_INTEGER) throw new Error() + const bitfield = new RemoteBitfield() + const bigInt = BigInt(bits) + // 53 because the max safe integer in JS is 53 bits + for (let i = 0; i < 53; i++) { + bitfield.set(i, !!((bigInt >> BigInt(i)) & 1n)) + } + return bitfield +} + +/** + * + * @param {import('hypercore')} core + * @param {number} [bits] + */ +async function clearCore(core, bits) { + if (typeof bits === 'undefined') return + if (bits > Number.MAX_SAFE_INTEGER) throw new Error() + await core.ready() + const bigInt = BigInt(bits) + const promises = [] + // 53 because the max safe integer in JS is 53 bits + for (let i = 0; i < core.length; i++) { + if ((bigInt >> BigInt(i)) & 1n) continue + promises.push(core.clear(i)) + } + await Promise.all(promises) +} + +/** + * + * @param {import('hypercore')} core + * @param {number} [bits] + */ +async function downloadCore(core, bits) { + if (typeof bits === 'undefined') return + if (bits > Number.MAX_SAFE_INTEGER) throw new Error() + await core.ready() + const bigInt = BigInt(bits) + const blocks = [] + // 53 because the max safe integer in JS is 53 bits + for (let i = 0; i < core.length; i++) { + if ((bigInt >> BigInt(i)) & 1n) { + blocks.push(i) + } + } + await core.download({ blocks }).done() +} + +/** + * + * @param {CoreReplicationState} crs + * @param {string} peerId + * @param {number} [bits] + */ +function setPeerWants(crs, peerId, bits) { + if (typeof bits === 'undefined') return + if (bits > Number.MAX_SAFE_INTEGER) throw new Error() + const bigInt = BigInt(bits) + /** @type {{ start: number, length: number}} */ + const ranges = [] + // 53 because the max safe integer in JS is 53 bits + for (let i = 0; i < 53; i++) { + if ((bigInt >> BigInt(i)) & 1n) { + ranges.push({ start: i, length: 1 }) + } + } + crs.setPeerWants(peerId, ranges) +} + +/** + * Wait for update event with a timeout + * @param {CoreReplicationState} crs + * @param {number} milliseconds + */ +async function updateWithTimeout(crs, milliseconds) { + return pTimeout(once(crs, 'update'), { milliseconds, message: false }) +} + +/** + * @param {import('hypercore')} core1 + * @param {import('hypercore')} core2 + * @param { {kp1?: import('@hyperswarm/secret-stream'), kp2?: import('@hyperswarm/secret-stream')} } [keyPairs] + * @returns {() => Promise<[void, void]>} + */ +export function replicate( + core1, + core2, + { + // Keep keypairs deterministic for tests, since we use peer.publicKey as an identifier. + kp1 = NoiseSecretStream.keyPair(Buffer.allocUnsafe(32).fill(0)), + kp2 = NoiseSecretStream.keyPair(Buffer.allocUnsafe(32).fill(1)), + } = {} +) { + const n1 = new NoiseSecretStream(true, undefined, { + keyPair: kp1, + }) + const n2 = new NoiseSecretStream(false, undefined, { + keyPair: kp2, + }) + + // @ts-expect-error + n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) + + // @ts-expect-error + core1.replicate(n1) + // @ts-expect-error + core2.replicate(n2) + + return async function destroy() { + return Promise.all([ + /** @type {Promise} */ + ( + new Promise((res) => { + n1.on('close', res) + n1.destroy() + }) + ), + /** @type {Promise} */ + ( + new Promise((res) => { + n2.on('close', res) + n2.destroy() + }) + ), + ]) + } +}