Skip to content

Commit

Permalink
chore: rename to CoreSyncState and update API (#308)
Browse files Browse the repository at this point in the history
* move and rename core-sync-state

* cleanup API for CoreSyncState

* delete test for old code

* Apply suggestions from code review

Co-authored-by: Andrew Chou <andrewchou@fastmail.com>

* rename param

* rename test file

---------

Co-authored-by: Andrew Chou <andrewchou@fastmail.com>
  • Loading branch information
gmaclennan and achou11 committed Oct 4, 2023
1 parent 79bff84 commit c8d6292
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 990 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { TypedEmitter } from 'tiny-typed-emitter'
import { keyToId } from '../utils.js'
import RemoteBitfield, { BITS_PER_PAGE } from './remote-bitfield.js'
import RemoteBitfield, {
BITS_PER_PAGE,
} from '../core-manager/remote-bitfield.js'

/**
* @typedef {RemoteBitfield} Bitfield
Expand Down Expand Up @@ -31,29 +33,29 @@ import RemoteBitfield, { BITS_PER_PAGE } from './remote-bitfield.js'
* @property {Record<PeerId, RemotePeerSimpleState>} remoteStates map of state of all known peers
*/
/**
* @typedef {object} CoreReplicationEvents
* @typedef {object} CoreSyncEvents
* @property {() => void} update
*/

/**
* Track replication state for a core identified by `discoveryId`. Can start
* tracking state before the core instance exists locally, via the "preHave"
* messages received over the project creator core.
* Track sync state for a core identified by `discoveryId`. Can start tracking
* state before the core instance exists locally, via the "preHave" messages
* received over the project creator core.
*
* Because deriving the state is expensive (it iterates through the bitfields of
* all peers), this is designed to be pull-based: an `update` event signals that
* the state is updated, but does not pass the state. The consumer can "pull"
* the state when it wants it via `coreReplicationState.getState()`.
* the state when it wants it via `coreSyncState.getState()`.
*
* Each peer (including the local peer) has a state of:
* 1. `have` - number of blocks the peer has locally
* 2. `want` - number of blocks the peer wants, and at least one peer has
* 3. `wanted` - number of blocks the peer has that at least one peer wants
* 4. `missing` - number of blocks the peer wants but no peer has
*
* @extends {TypedEmitter<CoreReplicationEvents>}
* @extends {TypedEmitter<CoreSyncEvents>}
*/
export class CoreReplicationState extends TypedEmitter {
export class CoreSyncState extends TypedEmitter {
/** @type {import('hypercore')<'binary', Buffer>} */
#core
/** @type {InternalState['remoteStates']} */
Expand Down Expand Up @@ -92,10 +94,9 @@ export class CoreReplicationState extends TypedEmitter {
}

/**
* Attach a core. The replication state can be initialized without a core
* instance, because we could receive peer want and have states via extension
* messages before we have the core key that allows us to create a core
* instance.
* Attach a core. The sync state can be initialized without a core instance,
* because we could receive peer want and have states via extension messages
* before we have the core key that allows us to create a core instance.
*
* @param {import('hypercore')<'binary', Buffer>} core
*/
Expand Down Expand Up @@ -137,14 +138,15 @@ export class CoreReplicationState extends TypedEmitter {
/**
* Add a pre-emptive "have" bitfield for a peer. This is used when we receive
* a peer "have" via extension message - it allows us to have a state for the
* peer before the peer actually starts replicating this core
* peer before the peer actually starts syncing this core
*
* @param {PeerId} peerId
* @param {Bitfield} bitfield
* @param {number} start
* @param {Uint32Array} bitfield
*/
setHavesBitfield(peerId, bitfield) {
insertPreHaves(peerId, start, bitfield) {
const peerState = this.#getPeerState(peerId)
peerState.setPreHavesBitfield(bitfield)
peerState.insertPreHaves(start, bitfield)
this.#update()
}

Expand Down Expand Up @@ -225,40 +227,39 @@ export class CoreReplicationState extends TypedEmitter {
}

/**
* Replication state for a core for a peer. Uses an internal bitfield from
* Hypercore to track which blocks the peer has. Default is that a peer wants
* all blocks, but can set ranges of "wants". Setting a want range changes all
* other blocks to "not wanted"
* Sync state for a core for a peer. Uses an internal bitfield from Hypercore to
* track which blocks the peer has. Default is that a peer wants all blocks, but
* can set ranges of "wants". Setting a want range changes all other blocks to
* "not wanted"
*
* @private
* Only exported for testing
*/
export class PeerState {
/** @type {Bitfield | undefined} */
#preHaves
/** @type {Bitfield} */
#preHaves = new RemoteBitfield()
/** @type {Bitfield | undefined} */
#haves
/** @type {Bitfield | undefined} */
#wants
/** @type {Bitfield} */
#wants = new RemoteBitfield()
connected = false
#wantAll
constructor({ wantAll = true } = {}) {
this.#wantAll = wantAll
}
/**
* @param {Bitfield} bitfield
* @param {number} start
* @param {Uint32Array} bitfield
*/
setPreHavesBitfield(bitfield) {
this.#preHaves = bitfield
insertPreHaves(start, bitfield) {
return this.#preHaves.insert(start, bitfield)
}
/**
* @param {Bitfield} bitfield
*/
setHavesBitfield(bitfield) {
this.#haves = bitfield
}
/**
* @param {Bitfield} bitfield
*/
setWantsBitfield(bitfield) {
this.#wants = bitfield
}
/**
* Set a range of blocks that a peer wants. This is not part of the Hypercore
* protocol, so we need our own extension messages that a peer can use to
Expand All @@ -269,7 +270,7 @@ export class PeerState {
* @param {{ start: number, length: number }} range
*/
setWantRange({ start, length }) {
if (!this.#wants) this.#wants = new RemoteBitfield()
this.#wantAll = false
this.#wants.setRange(start, length, true)
}
/**
Expand All @@ -279,11 +280,7 @@ export class PeerState {
* @param {number} index
*/
have(index) {
return this.#haves
? this.#haves.get(index)
: this.#preHaves
? this.#preHaves.get(index)
: false
return this.#haves ? this.#haves.get(index) : this.#preHaves.get(index)
}
/**
* Return the "haves" for the 32 blocks from `index`, as a 32-bit integer
Expand All @@ -294,16 +291,16 @@ export class PeerState {
*/
haveWord(index) {
if (this.#haves) return getBitfieldWord(this.#haves, index)
if (this.#preHaves) return getBitfieldWord(this.#preHaves, index)
return 0
return getBitfieldWord(this.#preHaves, index)
}
/**
* Returns whether this peer wants block at `index`. Defaults to `true` for
* all blocks
* @param {number} index
*/
want(index) {
return this.#wants ? this.#wants.get(index) : true
if (this.#wantAll) return true
return this.#wants.get(index)
}
/**
* Return the "wants" for the 32 blocks from `index`, as a 32-bit integer
Expand All @@ -313,9 +310,11 @@ export class PeerState {
* the 32 blocks from `index`
*/
wantWord(index) {
if (this.#wants) return getBitfieldWord(this.#wants, index)
// This is a 32-bit number with all bits set
return 2 ** 32 - 1
if (this.#wantAll) {
// This is a 32-bit number with all bits set
return 2 ** 32 - 1
}
return getBitfieldWord(this.#wants, index)
}
}

Expand Down
Loading

0 comments on commit c8d6292

Please sign in to comment.