From ca95b2f1105d850c483a9da69a4ed0e0c060ae87 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 9 Nov 2023 16:10:05 +0900 Subject: [PATCH] feat: add `$sync` API methods (#361) * WIP initial work * rename Rpc to LocalPeers * Handle deviceInfo internally, id -> deviceId * Tests for stream error handling * remove unnecessary constructor * return replication stream * Attach protomux instance to peer info * rename and re-organize * revert changes outside scope of PR * WIP initial work * Tie everything together * rename getProjectInstance * feat: client.listLocalPeers() & `local-peers` evt * feat: add $sync API methods For now this simplifies the API (because we are only supporting local sync, not remote sync over the internet) to: - `project.$sync.getState()` - `project.$sync.start()` - `project.$sync.stop()` - Events - `sync-state` It's currently not possible to stop local discovery, nor is it possible to stop sync of the metadata namespaces (auth, config, blobIndex). The start and stop methods stop the sync of the data and blob namespaces. Fixes #134. Stacked on #360, #358 and #356. --- src/mapeo-manager.js | 12 ++++-- src/mapeo-project.js | 35 +++++++++++----- src/sync/sync-api.js | 82 +++++++++++++++++++++++++++++++++++++ src/sync/sync-controller.js | 44 -------------------- 4 files changed, 115 insertions(+), 58 deletions(-) create mode 100644 src/sync/sync-api.js delete mode 100644 src/sync/sync-controller.js diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index 99922662..2ac56b05 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -40,6 +40,7 @@ const CLIENT_SQLITE_FILE_NAME = 'client.db' const MAX_FILE_DESCRIPTORS = 768 export const kRPC = Symbol('rpc') +export const kManagerReplicate = Symbol('replicate manager') /** * @typedef {Omit} PublicPeerInfo @@ -129,7 +130,7 @@ export class MapeoManager extends TypedEmitter { this.#localDiscovery = new LocalDiscovery({ identityKeypair: this.#keyManager.getIdentityKeypair(), }) - this.#localDiscovery.on('connection', this.replicate.bind(this)) + this.#localDiscovery.on('connection', this[kManagerReplicate].bind(this)) } /** @@ -140,12 +141,15 @@ export class MapeoManager extends TypedEmitter { } /** - * Replicate Mapeo to a `@hyperswarm/secret-stream`. Should only be used for - * local (trusted) connections, because the RPC channel key is public + * Replicate Mapeo to a `@hyperswarm/secret-stream`. This replication connects + * the Mapeo RPC channel and allows invites. All active projects will sync + * automatically to this replication stream. Only use for local (trusted) + * connections, because the RPC channel key is public. To sync a specific + * project without connecting RPC, use project[kProjectReplication]. * * @param {import('@hyperswarm/secret-stream')} noiseStream */ - replicate(noiseStream) { + [kManagerReplicate](noiseStream) { const replicationStream = this.#localPeers.connect(noiseStream) Promise.all([this.getDeviceInfo(), openedNoiseSecretStream(noiseStream)]) .then(([{ name }, openedNoiseStream]) => { diff --git a/src/mapeo-project.js b/src/mapeo-project.js index cf980517..66992296 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -32,7 +32,7 @@ import { import { Capabilities } from './capabilities.js' import { getDeviceId, projectKeyToId, valueOf } from './utils.js' import { MemberApi } from './member-api.js' -import { SyncController } from './sync/sync-controller.js' +import { SyncApi, kSyncReplicate } from './sync/sync-api.js' import Hypercore from 'hypercore' /** @typedef {Omit} EditableProjectSettings */ @@ -42,7 +42,7 @@ const INDEXER_STORAGE_FOLDER_NAME = 'indexer' export const kCoreOwnership = Symbol('coreOwnership') export const kCapabilities = Symbol('capabilities') export const kSetOwnDeviceInfo = Symbol('kSetOwnDeviceInfo') -export const kReplicate = Symbol('replicate') +export const kProjectReplicate = Symbol('replicate project') export class MapeoProject { #projectId @@ -56,7 +56,7 @@ export class MapeoProject { #capabilities #ownershipWriteDone #memberApi - #syncController + #syncApi /** * @param {Object} opts @@ -245,15 +245,17 @@ export class MapeoProject { }, }) - this.#syncController = new SyncController({ + this.#syncApi = new SyncApi({ coreManager: this.#coreManager, capabilities: this.#capabilities, }) + ///////// 4. Wire up sync + // Replicate already connected local peers for (const peer of localPeers.peers) { if (peer.status !== 'connected') continue - this.#syncController.replicate(peer.protomux) + this.#syncApi[kSyncReplicate](peer.protomux) } localPeers.on('discovery-key', (discoveryKey, stream) => { @@ -267,10 +269,10 @@ export class MapeoProject { // When a new peer is found, try to replicate (if it is not a member of the // project it will fail the capability check and be ignored) localPeers.on('peer-add', (peer) => { - this.#syncController.replicate(peer.protomux) + this.#syncApi[kSyncReplicate](peer.protomux) }) - ///////// 4. Write core ownership record + ///////// 5. Write core ownership record const deferred = pDefer() // Avoid uncaught rejection. If this is rejected then project.ready() will reject @@ -365,6 +367,10 @@ export class MapeoProject { return this.#memberApi } + get $sync() { + return this.#syncApi + } + /** * @param {Partial} settings * @returns {Promise} @@ -416,15 +422,24 @@ export class MapeoProject { } /** + * Replicate a project to a @hyperswarm/secret-stream. Invites will not + * function because the RPC channel is not connected for project replication, + * and only this project will replicate (to replicate multiple projects you + * need to replicate the manager instance via manager[kManagerReplicate]) * * @param {Exclude[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance * @returns */ - [kReplicate](stream) { - const replicationStream = Hypercore.createProtocolStream(stream, {}) + [kProjectReplicate](stream) { + const replicationStream = Hypercore.createProtocolStream(stream, { + ondiscoverykey: async (discoveryKey) => { + this.#coreManager.handleDiscoveryKey(discoveryKey, replicationStream) + }, + }) const protomux = replicationStream.noiseStream.userData // @ts-ignore - got fed up jumping through hoops to keep TS heppy - return this.#syncController.replicate(protomux) + this.#syncApi[kSyncReplicate](protomux) + return replicationStream } /** diff --git a/src/sync/sync-api.js b/src/sync/sync-api.js new file mode 100644 index 00000000..9508adc1 --- /dev/null +++ b/src/sync/sync-api.js @@ -0,0 +1,82 @@ +import { TypedEmitter } from 'tiny-typed-emitter' +import { SyncState } from './sync-state.js' +import { PeerSyncController } from './peer-sync-controller.js' + +export const kSyncReplicate = Symbol('replicate sync') + +/** + * @typedef {object} SyncEvents + * @property {(syncState: import('./sync-state.js').State) => void} sync-state + */ + +/** + * @extends {TypedEmitter} + */ +export class SyncApi extends TypedEmitter { + syncState + #coreManager + #capabilities + /** @type {Map} */ + #peerSyncControllers = new Map() + /** @type {Set<'local' | 'remote'>} */ + #dataSyncEnabled = new Set() + + /** + * + * @param {object} opts + * @param {import('../core-manager/index.js').CoreManager} opts.coreManager + * @param {import("../capabilities.js").Capabilities} opts.capabilities + * @param {number} [opts.throttleMs] + */ + constructor({ coreManager, throttleMs = 200, capabilities }) { + super() + this.#coreManager = coreManager + this.#capabilities = capabilities + this.syncState = new SyncState({ coreManager, throttleMs }) + this.syncState.on('state', this.emit.bind(this, 'sync-state')) + } + + getState() { + return this.syncState.getState() + } + + /** + * Start syncing data cores + */ + start() { + if (this.#dataSyncEnabled.has('local')) return + this.#dataSyncEnabled.add('local') + for (const peerSyncController of this.#peerSyncControllers.values()) { + peerSyncController.enableDataSync() + } + } + + /** + * Stop syncing data cores (metadata cores will continue syncing in the background) + */ + stop() { + if (!this.#dataSyncEnabled.has('local')) return + this.#dataSyncEnabled.delete('local') + for (const peerSyncController of this.#peerSyncControllers.values()) { + peerSyncController.disableDataSync() + } + } + + /** + * @param {import('protomux')} protomux A protomux instance + */ + [kSyncReplicate](protomux) { + if (this.#peerSyncControllers.has(protomux)) return + + const peerSyncController = new PeerSyncController({ + protomux, + coreManager: this.#coreManager, + syncState: this.syncState, + capabilities: this.#capabilities, + }) + if (this.#dataSyncEnabled.has('local')) { + peerSyncController.enableDataSync() + } + this.#peerSyncControllers.set(protomux, peerSyncController) + } +} diff --git a/src/sync/sync-controller.js b/src/sync/sync-controller.js deleted file mode 100644 index a32b6cb9..00000000 --- a/src/sync/sync-controller.js +++ /dev/null @@ -1,44 +0,0 @@ -import { TypedEmitter } from 'tiny-typed-emitter' -import { SyncState } from './sync-state.js' -import { PeerSyncController } from './peer-sync-controller.js' - -export class SyncController extends TypedEmitter { - #syncState - #coreManager - #capabilities - /** @type {Map} */ - #peerSyncControllers = new Map() - - /** - * - * @param {object} opts - * @param {import('../core-manager/index.js').CoreManager} opts.coreManager - * @param {import("../capabilities.js").Capabilities} opts.capabilities - * @param {number} [opts.throttleMs] - */ - constructor({ coreManager, throttleMs = 200, capabilities }) { - super() - this.#coreManager = coreManager - this.#capabilities = capabilities - this.#syncState = new SyncState({ coreManager, throttleMs }) - } - - getState() { - return this.#syncState.getState() - } - - /** - * @param {import('protomux')} protomux A protomux instance - */ - replicate(protomux) { - if (this.#peerSyncControllers.has(protomux)) return - - const peerSyncController = new PeerSyncController({ - protomux, - coreManager: this.#coreManager, - syncState: this.#syncState, - capabilities: this.#capabilities, - }) - this.#peerSyncControllers.set(protomux, peerSyncController) - } -}