Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: rename to CoreSyncState and update API #308

Merged
merged 6 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { TypedEmitter } from 'tiny-typed-emitter'
import { keyToId } from '../utils.js'
import RemoteBitfield, { BITS_PER_PAGE } from './remote-bitfield.js'
import RemoteBitfield, {
BITS_PER_PAGE,
} from '../core-manager/remote-bitfield.js'

/**
* @typedef {RemoteBitfield} Bitfield
Expand Down Expand Up @@ -31,29 +33,29 @@ import RemoteBitfield, { BITS_PER_PAGE } from './remote-bitfield.js'
* @property {Record<PeerId, RemotePeerSimpleState>} remoteStates map of state of all known peers
*/
/**
* @typedef {object} CoreReplicationEvents
* @typedef {object} CoreSyncEvents
* @property {() => void} update
*/

/**
* Track replication state for a core identified by `discoveryId`. Can start
* tracking state before the core instance exists locally, via the "preHave"
* messages received over the project creator core.
* Track sync state for a core identified by `discoveryId`. Can start tracking
* state before the core instance exists locally, via the "preHave" messages
* received over the project creator core.
*
* Because deriving the state is expensive (it iterates through the bitfields of
* all peers), this is designed to be pull-based: an `update` event signals that
* the state is updated, but does not pass the state. The consumer can "pull"
* the state when it wants it via `coreReplicationState.getState()`.
* the state when it wants it via `coreSyncState.getState()`.
*
* Each peer (including the local peer) has a state of:
* 1. `have` - number of blocks the peer has locally
* 2. `want` - number of blocks the peer wants, and at least one peer has
* 3. `wanted` - number of blocks the peer has that at least one peer wants
* 4. `missing` - number of blocks the peer wants but no peer has
*
* @extends {TypedEmitter<CoreReplicationEvents>}
* @extends {TypedEmitter<CoreSyncEvents>}
*/
export class CoreReplicationState extends TypedEmitter {
export class CoreSyncState extends TypedEmitter {
/** @type {import('hypercore')<'binary', Buffer>} */
#core
/** @type {InternalState['remoteStates']} */
Expand Down Expand Up @@ -92,10 +94,9 @@ export class CoreReplicationState extends TypedEmitter {
}

/**
* Attach a core. The replication state can be initialized without a core
* instance, because we could receive peer want and have states via extension
* messages before we have the core key that allows us to create a core
* instance.
* Attach a core. The sync state can be initialized without a core instance,
* because we could receive peer want and have states via extension messages
* before we have the core key that allows us to create a core instance.
*
* @param {import('hypercore')<'binary', Buffer>} core
*/
Expand Down Expand Up @@ -137,14 +138,15 @@ export class CoreReplicationState extends TypedEmitter {
/**
* Add a pre-emptive "have" bitfield for a peer. This is used when we receive
* a peer "have" via extension message - it allows us to have a state for the
* peer before the peer actually starts replicating this core
* peer before the peer actually starts syncing this core
*
* @param {PeerId} peerId
* @param {Bitfield} bitfield
* @param {number} start
* @param {Uint32Array} bitfield
*/
setHavesBitfield(peerId, bitfield) {
insertPreHaves(peerId, start, bitfield) {
const peerState = this.#getPeerState(peerId)
peerState.setPreHavesBitfield(bitfield)
peerState.insertPreHaves(start, bitfield)
this.#update()
}

Expand Down Expand Up @@ -225,40 +227,39 @@ export class CoreReplicationState extends TypedEmitter {
}

/**
* Replication state for a core for a peer. Uses an internal bitfield from
* Hypercore to track which blocks the peer has. Default is that a peer wants
* all blocks, but can set ranges of "wants". Setting a want range changes all
* other blocks to "not wanted"
* Sync state for a core for a peer. Uses an internal bitfield from Hypercore to
* track which blocks the peer has. Default is that a peer wants all blocks, but
* can set ranges of "wants". Setting a want range changes all other blocks to
* "not wanted"
*
* @private
* Only exported for testing
*/
export class PeerState {
/** @type {Bitfield | undefined} */
#preHaves
/** @type {Bitfield} */
#preHaves = new RemoteBitfield()
/** @type {Bitfield | undefined} */
#haves
/** @type {Bitfield | undefined} */
#wants
/** @type {Bitfield} */
#wants = new RemoteBitfield()
connected = false
#wantAll
constructor({ wantAll = true } = {}) {
this.#wantAll = wantAll
}
/**
* @param {Bitfield} bitfield
* @param {number} start
* @param {Uint32Array} bitfield
*/
setPreHavesBitfield(bitfield) {
this.#preHaves = bitfield
insertPreHaves(start, bitfield) {
return this.#preHaves.insert(start, bitfield)
}
/**
* @param {Bitfield} bitfield
*/
setHavesBitfield(bitfield) {
this.#haves = bitfield
}
/**
* @param {Bitfield} bitfield
*/
setWantsBitfield(bitfield) {
this.#wants = bitfield
}
/**
* Set a range of blocks that a peer wants. This is not part of the Hypercore
* protocol, so we need our own extension messages that a peer can use to
Expand All @@ -269,7 +270,7 @@ export class PeerState {
* @param {{ start: number, length: number }} range
*/
setWantRange({ start, length }) {
if (!this.#wants) this.#wants = new RemoteBitfield()
this.#wantAll = false
this.#wants.setRange(start, length, true)
}
/**
Expand All @@ -279,11 +280,7 @@ export class PeerState {
* @param {number} index
*/
have(index) {
return this.#haves
? this.#haves.get(index)
: this.#preHaves
? this.#preHaves.get(index)
: false
return this.#haves ? this.#haves.get(index) : this.#preHaves.get(index)
}
/**
* Return the "haves" for the 32 blocks from `index`, as a 32-bit integer
Expand All @@ -294,16 +291,16 @@ export class PeerState {
*/
haveWord(index) {
if (this.#haves) return getBitfieldWord(this.#haves, index)
if (this.#preHaves) return getBitfieldWord(this.#preHaves, index)
return 0
return getBitfieldWord(this.#preHaves, index)
}
/**
* Returns whether this peer wants block at `index`. Defaults to `true` for
* all blocks
* @param {number} index
*/
want(index) {
return this.#wants ? this.#wants.get(index) : true
if (this.#wantAll) return true
return this.#wants.get(index)
}
/**
* Return the "wants" for the 32 blocks from `index`, as a 32-bit integer
Expand All @@ -313,9 +310,11 @@ export class PeerState {
* the 32 blocks from `index`
*/
wantWord(index) {
if (this.#wants) return getBitfieldWord(this.#wants, index)
// This is a 32-bit number with all bits set
return 2 ** 32 - 1
if (this.#wantAll) {
// This is a 32-bit number with all bits set
return 2 ** 32 - 1
}
return getBitfieldWord(this.#wants, index)
}
}

Expand Down
Loading
Loading