Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: peer sync controller [1/2] #347

Merged
merged 20 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
"fastify": ">= 4"
},
"dependencies": {
"@digidem/types": "^2.1.0",
"@digidem/types": "^2.2.0",
"@fastify/type-provider-typebox": "^3.3.0",
"@hyperswarm/secret-stream": "^6.1.2",
"@mapeo/crypto": "1.0.0-alpha.10",
Expand All @@ -127,7 +127,7 @@
"drizzle-orm": "0.28.2",
"eventemitter3": "^5.0.1",
"fastify-plugin": "^4.5.0",
"hypercore": "^10.9.0",
"hypercore": "10.17.0",
"hypercore-crypto": "^3.3.1",
"hyperdrive": "^11.5.3",
"hyperswarm": "^4.4.1",
Expand Down
6 changes: 3 additions & 3 deletions src/core-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,8 @@ export class CoreManager extends TypedEmitter {
// replicated to a stream if we want sharing of unknown auth cores to work.
protocol.pair(
{ protocol: 'hypercore/alpha' },
/** @param {Buffer} discoveryKey */ (discoveryKey) => {
this.#handleDiscoveryKey(discoveryKey, stream)
/** @param {Buffer} discoveryKey */ async (discoveryKey) => {
this.handleDiscoveryKey(discoveryKey, stream)
}
)

Expand Down Expand Up @@ -395,7 +395,7 @@ export class CoreManager extends TypedEmitter {
* @param {Buffer} discoveryKey
* @param {any} stream
*/
async #handleDiscoveryKey(discoveryKey, stream) {
async handleDiscoveryKey(discoveryKey, stream) {
const discoveryId = discoveryKey.toString('hex')
const peer = await this.#findPeer(stream.remotePublicKey)
if (!peer) {
Expand Down
21 changes: 14 additions & 7 deletions src/sync/core-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import RemoteBitfield, {
* @property {number} missing blocks the peer wants but no peer has
*/
/**
* @typedef {CoreState & { connected: boolean }} PeerCoreState
* @typedef {CoreState & { status: 'disconnected' | 'connecting' | 'connected' }} PeerCoreState
*/
/**
* @typedef {object} DerivedState
* @property {number} coreLength known (sparse) length of the core
* @property {CoreState} localState local state
* @property {Record<PeerId, PeerCoreState>} remoteStates map of state of all known peers
* @property {{ [peerId in PeerId]: PeerCoreState }} remoteStates map of state of all known peers
*/

/**
Expand All @@ -50,7 +50,7 @@ import RemoteBitfield, {
*
*/
export class CoreSyncState {
/** @type {import('hypercore')<'binary', Buffer>} */
/** @type {import('hypercore')<'binary', Buffer> | undefined} */
#core
/** @type {InternalState['remoteStates']} */
#remoteStates = new Map()
Expand Down Expand Up @@ -174,7 +174,13 @@ export class CoreSyncState {

// Update state to ensure this peer is in the state and set to connected
const peerState = this.#getPeerState(peerId)
peerState.connected = true
peerState.status = 'connecting'

this.#core?.update({ wait: true }).then(() => {
// A peer should become connected
peerState.status = 'connected'
this.#update()
})

// A peer can have a pre-emptive "have" bitfield received via an extension
// message, but when the peer actually connects then we switch to the actual
Expand Down Expand Up @@ -205,7 +211,7 @@ export class CoreSyncState {
#onPeerRemove = (peer) => {
const peerId = keyToId(peer.remotePublicKey)
const peerState = this.#getPeerState(peerId)
peerState.connected = false
peerState.status = 'disconnected'
this.#update()
}
}
Expand All @@ -226,7 +232,8 @@ export class PeerState {
#haves
/** @type {Bitfield} */
#wants = new RemoteBitfield()
connected = false
/** @type {PeerCoreState['status']} */
status = 'disconnected'
#wantAll
constructor({ wantAll = true } = {}) {
this.#wantAll = wantAll
Expand Down Expand Up @@ -367,7 +374,7 @@ export function deriveState(coreState) {
}
for (let j = 1; j < peerStates.length; j++) {
const peerState = /** @type {PeerCoreState} */ (peerStates[j])
peerState.connected = peers[j].connected
peerState.status = peers[j].status
derivedState.remoteStates[peerIds[j]] = peerState
}
return derivedState
Expand Down
52 changes: 39 additions & 13 deletions src/sync/namespace-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class NamespaceSyncState {
if (this.#cachedState) return this.#cachedState
/** @type {SyncState} */
const state = {
localState: { want: 0, have: 0, wanted: 0, missing: 0 },
localState: createState(),
remoteStates: {},
}
for (const css of this.#coreStates.values()) {
Expand All @@ -65,9 +65,10 @@ export class NamespaceSyncState {
coreState.remoteStates
)) {
if (!(peerId in state.remoteStates)) {
state.remoteStates[peerId] = createPeerState(peerCoreState.connected)
state.remoteStates[peerId] = peerCoreState
} else {
mutatingAddPeerState(state.remoteStates[peerId], peerCoreState)
}
mutatingAddPeerState(state.remoteStates[peerId], peerCoreState)
}
}
this.#cachedState = state
Expand Down Expand Up @@ -108,9 +109,27 @@ export class NamespaceSyncState {
}
}

/** @returns {SyncState['remoteStates'][string]} */
function createPeerState(connected = false) {
return { want: 0, have: 0, wanted: 0, missing: 0, connected }
/**
* @overload
* @returns {SyncState['localState']}
*/

/**
* @overload
* @param {import('./core-sync-state.js').PeerCoreState['status']} status
* @returns {import('./core-sync-state.js').PeerCoreState}
*/

/**
* @param {import('./core-sync-state.js').PeerCoreState['status']} [status]
* @returns
gmaclennan marked this conversation as resolved.
Show resolved Hide resolved
*/
export function createState(status) {
if (status) {
return { want: 0, have: 0, wanted: 0, missing: 0, status }
} else {
return { want: 0, have: 0, wanted: 0, missing: 0 }
}
}

/**
Expand All @@ -122,24 +141,31 @@ function createPeerState(connected = false) {

/**
* @overload
* @param {SyncState['remoteStates'][string]} accumulator
* @param {SyncState['remoteStates'][string]} currentValue
* @returns {SyncState['remoteStates'][string]}
* @param {import('./core-sync-state.js').PeerCoreState} accumulator
* @param {import('./core-sync-state.js').PeerCoreState} currentValue
* @returns {import('./core-sync-state.js').PeerCoreState}
*/

/**
* Adds peer state in `currentValue` to peer state in `accumulator`
*
* @param {SyncState['remoteStates'][string]} accumulator
* @param {SyncState['remoteStates'][string]} currentValue
* @param {import('./core-sync-state.js').PeerCoreState} accumulator
* @param {import('./core-sync-state.js').PeerCoreState} currentValue
*/
function mutatingAddPeerState(accumulator, currentValue) {
accumulator.have += currentValue.have
accumulator.want += currentValue.want
accumulator.wanted += currentValue.wanted
accumulator.missing += currentValue.missing
if ('connected' in accumulator) {
accumulator.connected = accumulator.connected && currentValue.connected
if ('status' in accumulator && accumulator.status !== currentValue.status) {
if (currentValue.status === 'disconnected') {
accumulator.status === 'disconnected'
} else if (
currentValue.status === 'connecting' &&
accumulator.status === 'connected'
) {
accumulator.status = 'connecting'
}
}
return accumulator
}
Loading