Skip to content

Commit

Permalink
feat: add $sync API methods (#361)
Browse files Browse the repository at this point in the history
* WIP initial work

* rename Rpc to LocalPeers

* Handle deviceInfo internally, id -> deviceId

* Tests for stream error handling

* remove unnecessary constructor

* return replication stream

* Attach protomux instance to peer info

* rename and re-organize

* revert changes outside scope of PR

* WIP initial work

* Tie everything together

* rename getProjectInstance

* feat: client.listLocalPeers() & `local-peers` evt

* feat: add $sync API methods

For now this simplifies the API (because we are only supporting local
sync, not remote sync over the internet) to:

- `project.$sync.getState()`
- `project.$sync.start()`
- `project.$sync.stop()`
- Events
    - `sync-state`

It's currently not possible to stop local discovery, nor is it possible
to stop sync of the metadata namespaces (auth, config, blobIndex). The
start and stop methods stop the sync of the data and blob namespaces.

Fixes #134. Stacked on #360, #358 and #356.
  • Loading branch information
gmaclennan committed Nov 9, 2023
1 parent ab77e51 commit ca95b2f
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 58 deletions.
12 changes: 8 additions & 4 deletions src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const CLIENT_SQLITE_FILE_NAME = 'client.db'
const MAX_FILE_DESCRIPTORS = 768

export const kRPC = Symbol('rpc')
export const kManagerReplicate = Symbol('replicate manager')

/**
* @typedef {Omit<import('./local-peers.js').PeerInfo, 'protomux'>} PublicPeerInfo
Expand Down Expand Up @@ -129,7 +130,7 @@ export class MapeoManager extends TypedEmitter {
this.#localDiscovery = new LocalDiscovery({
identityKeypair: this.#keyManager.getIdentityKeypair(),
})
this.#localDiscovery.on('connection', this.replicate.bind(this))
this.#localDiscovery.on('connection', this[kManagerReplicate].bind(this))
}

/**
Expand All @@ -140,12 +141,15 @@ export class MapeoManager extends TypedEmitter {
}

/**
* Replicate Mapeo to a `@hyperswarm/secret-stream`. Should only be used for
* local (trusted) connections, because the RPC channel key is public
* Replicate Mapeo to a `@hyperswarm/secret-stream`. This replication connects
* the Mapeo RPC channel and allows invites. All active projects will sync
* automatically to this replication stream. Only use for local (trusted)
* connections, because the RPC channel key is public. To sync a specific
* project without connecting RPC, use project[kProjectReplication].
*
* @param {import('@hyperswarm/secret-stream')<any>} noiseStream
*/
replicate(noiseStream) {
[kManagerReplicate](noiseStream) {
const replicationStream = this.#localPeers.connect(noiseStream)
Promise.all([this.getDeviceInfo(), openedNoiseSecretStream(noiseStream)])
.then(([{ name }, openedNoiseStream]) => {
Expand Down
35 changes: 25 additions & 10 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
import { Capabilities } from './capabilities.js'
import { getDeviceId, projectKeyToId, valueOf } from './utils.js'
import { MemberApi } from './member-api.js'
import { SyncController } from './sync/sync-controller.js'
import { SyncApi, kSyncReplicate } from './sync/sync-api.js'
import Hypercore from 'hypercore'

/** @typedef {Omit<import('@mapeo/schema').ProjectSettingsValue, 'schemaName'>} EditableProjectSettings */
Expand All @@ -42,7 +42,7 @@ const INDEXER_STORAGE_FOLDER_NAME = 'indexer'
export const kCoreOwnership = Symbol('coreOwnership')
export const kCapabilities = Symbol('capabilities')
export const kSetOwnDeviceInfo = Symbol('kSetOwnDeviceInfo')
export const kReplicate = Symbol('replicate')
export const kProjectReplicate = Symbol('replicate project')

export class MapeoProject {
#projectId
Expand All @@ -56,7 +56,7 @@ export class MapeoProject {
#capabilities
#ownershipWriteDone
#memberApi
#syncController
#syncApi

/**
* @param {Object} opts
Expand Down Expand Up @@ -245,15 +245,17 @@ export class MapeoProject {
},
})

this.#syncController = new SyncController({
this.#syncApi = new SyncApi({
coreManager: this.#coreManager,
capabilities: this.#capabilities,
})

///////// 4. Wire up sync

// Replicate already connected local peers
for (const peer of localPeers.peers) {
if (peer.status !== 'connected') continue
this.#syncController.replicate(peer.protomux)
this.#syncApi[kSyncReplicate](peer.protomux)
}

localPeers.on('discovery-key', (discoveryKey, stream) => {
Expand All @@ -267,10 +269,10 @@ export class MapeoProject {
// When a new peer is found, try to replicate (if it is not a member of the
// project it will fail the capability check and be ignored)
localPeers.on('peer-add', (peer) => {
this.#syncController.replicate(peer.protomux)
this.#syncApi[kSyncReplicate](peer.protomux)
})

///////// 4. Write core ownership record
///////// 5. Write core ownership record

const deferred = pDefer()
// Avoid uncaught rejection. If this is rejected then project.ready() will reject
Expand Down Expand Up @@ -365,6 +367,10 @@ export class MapeoProject {
return this.#memberApi
}

get $sync() {
return this.#syncApi
}

/**
* @param {Partial<EditableProjectSettings>} settings
* @returns {Promise<EditableProjectSettings>}
Expand Down Expand Up @@ -416,15 +422,24 @@ export class MapeoProject {
}

/**
* Replicate a project to a @hyperswarm/secret-stream. Invites will not
* function because the RPC channel is not connected for project replication,
* and only this project will replicate (to replicate multiple projects you
* need to replicate the manager instance via manager[kManagerReplicate])
*
* @param {Exclude<Parameters<Hypercore.createProtocolStream>[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance
* @returns
*/
[kReplicate](stream) {
const replicationStream = Hypercore.createProtocolStream(stream, {})
[kProjectReplicate](stream) {
const replicationStream = Hypercore.createProtocolStream(stream, {
ondiscoverykey: async (discoveryKey) => {
this.#coreManager.handleDiscoveryKey(discoveryKey, replicationStream)
},
})
const protomux = replicationStream.noiseStream.userData
// @ts-ignore - got fed up jumping through hoops to keep TS heppy
return this.#syncController.replicate(protomux)
this.#syncApi[kSyncReplicate](protomux)
return replicationStream
}

/**
Expand Down
82 changes: 82 additions & 0 deletions src/sync/sync-api.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { TypedEmitter } from 'tiny-typed-emitter'
import { SyncState } from './sync-state.js'
import { PeerSyncController } from './peer-sync-controller.js'

export const kSyncReplicate = Symbol('replicate sync')

/**
* @typedef {object} SyncEvents
* @property {(syncState: import('./sync-state.js').State) => void} sync-state
*/

/**
* @extends {TypedEmitter<SyncEvents>}
*/
export class SyncApi extends TypedEmitter {
syncState
#coreManager
#capabilities
/** @type {Map<import('protomux'), PeerSyncController>} */
#peerSyncControllers = new Map()
/** @type {Set<'local' | 'remote'>} */
#dataSyncEnabled = new Set()

/**
*
* @param {object} opts
* @param {import('../core-manager/index.js').CoreManager} opts.coreManager
* @param {import("../capabilities.js").Capabilities} opts.capabilities
* @param {number} [opts.throttleMs]
*/
constructor({ coreManager, throttleMs = 200, capabilities }) {
super()
this.#coreManager = coreManager
this.#capabilities = capabilities
this.syncState = new SyncState({ coreManager, throttleMs })
this.syncState.on('state', this.emit.bind(this, 'sync-state'))
}

getState() {
return this.syncState.getState()
}

/**
* Start syncing data cores
*/
start() {
if (this.#dataSyncEnabled.has('local')) return
this.#dataSyncEnabled.add('local')
for (const peerSyncController of this.#peerSyncControllers.values()) {
peerSyncController.enableDataSync()
}
}

/**
* Stop syncing data cores (metadata cores will continue syncing in the background)
*/
stop() {
if (!this.#dataSyncEnabled.has('local')) return
this.#dataSyncEnabled.delete('local')
for (const peerSyncController of this.#peerSyncControllers.values()) {
peerSyncController.disableDataSync()
}
}

/**
* @param {import('protomux')<import('@hyperswarm/secret-stream')>} protomux A protomux instance
*/
[kSyncReplicate](protomux) {
if (this.#peerSyncControllers.has(protomux)) return

const peerSyncController = new PeerSyncController({
protomux,
coreManager: this.#coreManager,
syncState: this.syncState,
capabilities: this.#capabilities,
})
if (this.#dataSyncEnabled.has('local')) {
peerSyncController.enableDataSync()
}
this.#peerSyncControllers.set(protomux, peerSyncController)
}
}
44 changes: 0 additions & 44 deletions src/sync/sync-controller.js

This file was deleted.

0 comments on commit ca95b2f

Please sign in to comment.