From 63c2cd68f6fc82f5fc1d968c2b614582c8aa7ed7 Mon Sep 17 00:00:00 2001 From: Andrew Chou Date: Thu, 26 Oct 2023 11:49:17 -0400 Subject: [PATCH 1/6] chore: update @mapeo/schema and @mapeo/sqlite-indexer (#357) --- package-lock.json | 16 ++++++++-------- package.json | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/package-lock.json b/package-lock.json index 319dfb60..fe47420b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,8 +14,8 @@ "@fastify/type-provider-typebox": "^3.3.0", "@hyperswarm/secret-stream": "^6.1.2", "@mapeo/crypto": "1.0.0-alpha.10", - "@mapeo/schema": "3.0.0-next.11", - "@mapeo/sqlite-indexer": "1.0.0-alpha.6", + "@mapeo/schema": "3.0.0-next.13", + "@mapeo/sqlite-indexer": "1.0.0-alpha.8", "@sinclair/typebox": "^0.29.6", "b4a": "^1.6.3", "base32.js": "^0.1.0", @@ -824,9 +824,9 @@ } }, "node_modules/@mapeo/schema": { - "version": "3.0.0-next.11", - "resolved": "https://registry.npmjs.org/@mapeo/schema/-/schema-3.0.0-next.11.tgz", - "integrity": "sha512-tZnIzNmXpKNSkqEZQP9rCb91toKga/jrSF9RIUsYeIMamsePHtLuTxF2BEVdT81/P+NLgHt57uXIXEIefU3Usw==", + "version": "3.0.0-next.13", + "resolved": "https://registry.npmjs.org/@mapeo/schema/-/schema-3.0.0-next.13.tgz", + "integrity": "sha512-g5+Lx0uGzq5i2nlrDvuPExkzrQpzx3dhl1G1gfXm72Hw8he2ecGOm5Gu9vW9PsfKtN45AKRy+jd7kfl9+jLhpw==", "dependencies": { "@json-schema-tools/dereferencer": "^1.6.1", "ajv": "^8.12.0", @@ -922,9 +922,9 @@ } }, "node_modules/@mapeo/sqlite-indexer": { - "version": "1.0.0-alpha.6", - "resolved": "https://registry.npmjs.org/@mapeo/sqlite-indexer/-/sqlite-indexer-1.0.0-alpha.6.tgz", - "integrity": "sha512-iLUePxr2kHgsWfFTuJAKjTSZCRuVsIVNbQVyLEkN0pX/2dWzljCxCRvO+9rc1x+bThUas96ZAzCedqeeqC0zRw==", + "version": "1.0.0-alpha.8", + "resolved": "https://registry.npmjs.org/@mapeo/sqlite-indexer/-/sqlite-indexer-1.0.0-alpha.8.tgz", + "integrity": "sha512-qU+I6L4QKp6CkNA5AYu8dqADWaX+usfZq89c+fKOmIRZ+jR9ta3790PzCQbr5VCaYPDfp0OfemO1EjJ7RhHfcQ==", "dependencies": { "@types/better-sqlite3": "^7.6.4", "better-sqlite3": "^8.4.0" diff --git a/package.json b/package.json index 63c7a69a..7fcbd02c 100644 --- a/package.json +++ b/package.json @@ -111,8 +111,8 @@ "@fastify/type-provider-typebox": "^3.3.0", "@hyperswarm/secret-stream": "^6.1.2", "@mapeo/crypto": "1.0.0-alpha.10", - "@mapeo/schema": "3.0.0-next.11", - "@mapeo/sqlite-indexer": "1.0.0-alpha.6", + "@mapeo/schema": "3.0.0-next.13", + "@mapeo/sqlite-indexer": "1.0.0-alpha.8", "@sinclair/typebox": "^0.29.6", "b4a": "^1.6.3", "base32.js": "^0.1.0", From 1f4abac31b84fcc5a73a9db2ef7f8fafbffeb3ba Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Tue, 31 Oct 2023 01:16:58 +0900 Subject: [PATCH 2/6] feat: MapeoRPC -> LocalPeers (#356) --- src/discovery/local-discovery.js | 17 +- src/invite-api.js | 2 +- src/{rpc/index.js => local-peers.js} | 162 +++++++++++------ src/mapeo-manager.js | 4 +- src/mapeo-project.js | 2 +- src/member-api.js | 2 +- src/utils.js | 7 +- test-e2e/manager-invite.js | 10 +- test-e2e/members.js | 4 +- test-types/data-types.ts | 4 +- tests/helpers/{rpc.js => local-peers.js} | 14 +- tests/invite-api.js | 84 ++++----- tests/{rpc.js => local-peers.js} | 213 +++++++++++++---------- 13 files changed, 299 insertions(+), 226 deletions(-) rename src/{rpc/index.js => local-peers.js} (74%) rename tests/helpers/{rpc.js => local-peers.js} (78%) rename tests/{rpc.js => local-peers.js} (69%) diff --git a/src/discovery/local-discovery.js b/src/discovery/local-discovery.js index 33ec0863..6c1ade7b 100644 --- a/src/discovery/local-discovery.js +++ b/src/discovery/local-discovery.js @@ -10,12 +10,13 @@ import pTimeout from 'p-timeout' import { keyToPublicId } from '@mapeo/crypto' /** @typedef {{ publicKey: Buffer, secretKey: Buffer }} Keypair */ +/** @typedef {import('../utils.js').OpenedNoiseStream} OpenedNoiseStream */ export const ERR_DUPLICATE = 'Duplicate connection' /** * @typedef {Object} DiscoveryEvents - * @property {(connection: import('@hyperswarm/secret-stream')) => void} connection + * @property {(connection: OpenedNoiseStream) => void} connection */ /** @@ -24,7 +25,7 @@ export const ERR_DUPLICATE = 'Duplicate connection' export class LocalDiscovery extends TypedEmitter { #identityKeypair #server - /** @type {Map>} */ + /** @type {Map} */ #noiseConnections = new Map() #dnssd #sm @@ -142,14 +143,18 @@ export class LocalDiscovery extends TypedEmitter { // Further errors will be handled in #handleNoiseStreamConnection() socket.off('error', onSocketError) secretStream.off('error', this.#handleSocketError) - this.#handleNoiseStreamConnection(secretStream) + this.#handleNoiseStreamConnection( + // We know the NoiseStream is open at this point, so we can coerce the type + /** @type {OpenedNoiseStream} */ + (secretStream) + ) }) } /** * - * @param {NoiseSecretStream} existing - * @param {NoiseSecretStream} keeping + * @param {OpenedNoiseStream} existing + * @param {OpenedNoiseStream} keeping */ #handleConnectionSwap(existing, keeping) { let closed = false @@ -174,7 +179,7 @@ export class LocalDiscovery extends TypedEmitter { /** * - * @param {NoiseSecretStream} conn + * @param {OpenedNoiseStream} conn * @returns */ #handleNoiseStreamConnection(conn) { diff --git a/src/invite-api.js b/src/invite-api.js index cb871d0a..03dd7297 100644 --- a/src/invite-api.js +++ b/src/invite-api.js @@ -47,7 +47,7 @@ export class InviteApi extends TypedEmitter { /** * @param {Object} options - * @param {import('./rpc/index.js').MapeoRPC} options.rpc + * @param {import('./local-peers.js').LocalPeers} options.rpc * @param {object} options.queries * @param {(projectId: string) => Promise} options.queries.isMember * @param {(invite: import('./generated/rpc.js').Invite) => Promise} options.queries.addProject diff --git a/src/rpc/index.js b/src/local-peers.js similarity index 74% rename from src/rpc/index.js rename to src/local-peers.js index f3551c10..5989a5bd 100644 --- a/src/rpc/index.js +++ b/src/local-peers.js @@ -1,14 +1,14 @@ // @ts-check import { TypedEmitter } from 'tiny-typed-emitter' import Protomux from 'protomux' -import { openedNoiseSecretStream, keyToId } from '../utils.js' +import { openedNoiseSecretStream, keyToId } from './utils.js' import cenc from 'compact-encoding' import { DeviceInfo, Invite, InviteResponse, InviteResponse_Decision, -} from '../generated/rpc.js' +} from './generated/rpc.js' import pDefer from 'p-defer' const PROTOCOL_NAME = 'mapeo/rpc' @@ -16,7 +16,7 @@ const PROTOCOL_NAME = 'mapeo/rpc' // Protomux message types depend on the order that messages are added to a // channel (this needs to remain consistent). To avoid breaking changes, the // types here should not change. -/** @satisfies {{ [k in keyof typeof import('../generated/rpc.js')]?: number }} */ +/** @satisfies {{ [k in keyof typeof import('./generated/rpc.js')]?: number }} */ const MESSAGE_TYPES = { Invite: 0, InviteResponse: 1, @@ -24,10 +24,19 @@ const MESSAGE_TYPES = { } const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)]) -/** @typedef {Peer['info']} PeerInfoInternal */ -/** @typedef {Omit & { status: Exclude }} PeerInfo */ -/** @typedef {'connecting' | 'connected' | 'disconnected'} PeerState */ -/** @typedef {import('type-fest').SetNonNullable} InviteWithKeys */ +/** + * @typedef {object} PeerInfoBase + * @property {string} deviceId + * @property {string | undefined} name + */ +/** @typedef {PeerInfoBase & { status: 'connecting' }} PeerInfoConnecting */ +/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux }} PeerInfoConnected */ +/** @typedef {PeerInfoBase & { status: 'disconnected', disconnectedAt: number }} PeerInfoDisconnected */ + +/** @typedef {PeerInfoConnecting | PeerInfoConnected | PeerInfoDisconnected} PeerInfoInternal */ +/** @typedef {PeerInfoConnected | PeerInfoDisconnected} PeerInfo */ +/** @typedef {PeerInfoInternal['status']} PeerState */ +/** @typedef {import('type-fest').SetNonNullable} InviteWithKeys */ /** * @template ValueType @@ -44,6 +53,12 @@ class Peer { #connected /** @type {Map>>} */ pendingInvites = new Map() + /** @type {string | undefined} */ + #name + #connectedAt = 0 + #disconnectedAt = 0 + /** @type {Protomux} */ + #protomux /** * @param {object} options @@ -55,40 +70,65 @@ class Peer { this.#channel = channel this.#connected = pDefer() } + /** @returns {PeerInfoInternal} */ get info() { - return { - status: this.#state, - id: keyToId(this.#publicKey), - } - } - /** - * Poor-man's finite state machine. Rather than a `setState` method, only - * allows specific transitions between states. - * - * @param {'connect' | 'disconnect'} type - */ - action(type) { - switch (type) { - case 'connect': - /* c8 ignore next 3 */ - if (this.#state !== 'connecting') { - return // TODO: report error - this should not happen + const deviceId = keyToId(this.#publicKey) + switch (this.#state) { + case 'connecting': + return { + status: this.#state, + deviceId, + name: this.#name, } - this.#state = 'connected' - this.#connected.resolve() - break - case 'disconnect': - /* c8 ignore next */ - if (this.#state === 'disconnected') return - this.#state = 'disconnected' - for (const pending of this.pendingInvites.values()) { - for (const { reject } of pending) { - reject(new PeerDisconnectedError()) - } + case 'connected': + return { + status: this.#state, + deviceId, + name: this.#name, + connectedAt: this.#connectedAt, + protomux: this.#protomux, } - this.pendingInvites.clear() - break + case 'disconnected': + return { + status: this.#state, + deviceId, + name: this.#name, + disconnectedAt: this.#disconnectedAt, + } + /* c8 ignore next 4 */ + default: { + /** @type {never} */ + const _exhaustiveCheck = this.#state + return _exhaustiveCheck + } + } + } + /** @param {Protomux} protomux */ + connect(protomux) { + this.#protomux = protomux + /* c8 ignore next 3 */ + if (this.#state !== 'connecting') { + return // TODO: report error - this should not happen } + this.#state = 'connected' + this.#connectedAt = Date.now() + this.#connected.resolve() + } + disconnect() { + // @ts-ignore - easier to ignore this than handle this for TS - avoids holding a reference to old Protomux instances + this.#protomux = undefined + /* c8 ignore next */ + if (this.#state === 'disconnected') return + this.#state = 'disconnected' + this.#disconnectedAt = Date.now() + // Can just resolve this rather than reject, because #assertConnected will throw the error + this.#connected.resolve() + for (const pending of this.pendingInvites.values()) { + for (const { reject } of pending) { + reject(new PeerDisconnectedError()) + } + } + this.pendingInvites.clear() } /** @param {InviteWithKeys} invite */ async sendInvite(invite) { @@ -111,6 +151,10 @@ class Peer { const messageType = MESSAGE_TYPES.DeviceInfo this.#channel.messages[messageType].send(buf) } + /** @param {DeviceInfo} deviceInfo */ + receiveDeviceInfo(deviceInfo) { + this.#name = deviceInfo.name + } async #assertConnected() { await this.#connected.promise if (this.#state === 'connected' && !this.#channel.closed) return @@ -120,23 +164,18 @@ class Peer { } /** - * @typedef {object} MapeoRPCEvents + * @typedef {object} LocalPeersEvents * @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status * @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received - * @property {(deviceInfo: DeviceInfo & { deviceId: string }) => void} device-info Emitted when we receive device info for a device */ -/** @extends {TypedEmitter} */ -export class MapeoRPC extends TypedEmitter { +/** @extends {TypedEmitter} */ +export class LocalPeers extends TypedEmitter { /** @type {Map} */ #peers = new Map() /** @type {Set>} */ #opening = new Set() - constructor() { - super() - } - static InviteResponse = InviteResponse_Decision /** @@ -221,7 +260,8 @@ export class MapeoRPC extends TypedEmitter { /** * Connect to a peer over an existing NoiseSecretStream * - * @param {import('../types.js').NoiseStream | import('../types.js').ProtocolStream} stream a NoiseSecretStream from @hyperswarm/secret-stream + * @param {import('./types.js').NoiseStream} stream a NoiseSecretStream from @hyperswarm/secret-stream + * @returns {import('./types.js').ReplicationStream} */ connect(stream) { if (!stream.noiseStream) throw new Error('Invalid stream') @@ -229,8 +269,12 @@ export class MapeoRPC extends TypedEmitter { stream.userData && Protomux.isProtomux(stream.userData) ? stream.userData : Protomux.from(stream) + stream.userData = protomux this.#opening.add(stream.opened) + // No need to connect error handler to stream because Protomux does this, + // and errors are eventually handled by #closePeer + // noiseSecretStream.remotePublicKey can be null before the stream has // opened, so this helped awaits the open openedNoiseSecretStream(stream).then((stream) => { @@ -254,7 +298,7 @@ export class MapeoRPC extends TypedEmitter { userData: null, protocol: PROTOCOL_NAME, messages, - onopen: this.#openPeer.bind(this, remotePublicKey), + onopen: this.#openPeer.bind(this, remotePublicKey, protomux), onclose: this.#closePeer.bind(this, remotePublicKey), }) channel.open() @@ -263,27 +307,28 @@ export class MapeoRPC extends TypedEmitter { const existingPeer = this.#peers.get(peerId) /* c8 ignore next 3 */ if (existingPeer && existingPeer.info.status !== 'disconnected') { - existingPeer.action('disconnect') // Should not happen, but in case + existingPeer.disconnect() // Should not happen, but in case } const peer = new Peer({ publicKey: remotePublicKey, channel }) this.#peers.set(peerId, peer) // Do not emit peers now - will emit when connected }) - return stream + return stream.rawStream } - /** @param {Buffer} publicKey */ - #openPeer(publicKey) { + /** + * @param {Buffer} publicKey + * @param {Protomux} protomux + */ + #openPeer(publicKey, protomux) { const peerId = keyToId(publicKey) const peer = this.#peers.get(peerId) /* c8 ignore next */ if (!peer) return // TODO: report error - this should not happen - // No-op if no change in state - /* c8 ignore next */ - if (peer.info.status === 'connected') return // TODO: report error - this should not happen - peer.action('connect') - this.#emitPeers() + const wasConnected = peer.info.status === 'connected' + peer.connect(protomux) + if (!wasConnected) this.#emitPeers() } /** @param {Buffer} publicKey */ @@ -296,7 +341,7 @@ export class MapeoRPC extends TypedEmitter { /* c8 ignore next */ if (peer.info.status === 'disconnected') return // TODO: Track reasons for closing - peer.action('disconnect') + peer.disconnect() this.#emitPeers() } @@ -348,7 +393,8 @@ export class MapeoRPC extends TypedEmitter { } case 'DeviceInfo': { const deviceInfo = DeviceInfo.decode(value) - this.emit('device-info', { ...deviceInfo, deviceId: peerId }) + peer.receiveDeviceInfo(deviceInfo) + this.#emitPeers() break } /* c8 ignore next 5 */ diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index 23842de7..32965a23 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -22,7 +22,7 @@ import { projectKeyToPublicId, } from './utils.js' import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js' -import { MapeoRPC } from './rpc/index.js' +import { LocalPeers } from './local-peers.js' import { InviteApi } from './invite-api.js' /** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */ @@ -69,7 +69,7 @@ export class MapeoManager { migrationsFolder: new URL('../drizzle/client', import.meta.url).pathname, }) - this.#rpc = new MapeoRPC() + this.#rpc = new LocalPeers() this.#keyManager = new KeyManager(rootKey) this.#deviceId = getDeviceId(this.#keyManager) this.#projectSettingsIndexWriter = new IndexWriter({ diff --git a/src/mapeo-project.js b/src/mapeo-project.js index dd5bd399..ed24cbc5 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -67,7 +67,7 @@ export class MapeoProject { * @param {import('drizzle-orm/better-sqlite3').BetterSQLite3Database} opts.sharedDb * @param {IndexWriter} opts.sharedIndexWriter * @param {import('./types.js').CoreStorage} opts.coreStorage Folder to store all hypercore data - * @param {import('./rpc/index.js').MapeoRPC} opts.rpc + * @param {import('./local-peers.js').LocalPeers} opts.rpc * */ constructor({ diff --git a/src/member-api.js b/src/member-api.js index d79b2201..1d2f490d 100644 --- a/src/member-api.js +++ b/src/member-api.js @@ -22,7 +22,7 @@ export class MemberApi extends TypedEmitter { * @param {import('./core-ownership.js').CoreOwnership} opts.coreOwnership * @param {import('./generated/keys.js').EncryptionKeys} opts.encryptionKeys * @param {Buffer} opts.projectKey - * @param {import('./rpc/index.js').MapeoRPC} opts.rpc + * @param {import('./local-peers.js').LocalPeers} opts.rpc * @param {Object} opts.dataTypes * @param {Pick} opts.dataTypes.deviceInfo * @param {Pick} opts.dataTypes.project diff --git a/src/utils.js b/src/utils.js index 92c0568f..07fc064e 100644 --- a/src/utils.js +++ b/src/utils.js @@ -46,9 +46,12 @@ export function truncateId(keyOrId, length = 3) { return keyToId(keyOrId).slice(0, length) } -/** @typedef {import('@hyperswarm/secret-stream')} NoiseStream */ +/** @typedef {import('@hyperswarm/secret-stream')} NoiseStream */ /** @typedef {NoiseStream & { destroyed: true }} DestroyedNoiseStream */ -/** @typedef {NoiseStream & { publicKey: Buffer, remotePublicKey: Buffer, handshake: Buffer }} OpenedNoiseStream */ +/** + * @template {import('node:stream').Duplex | import('streamx').Duplex} [T=import('node:stream').Duplex | import('streamx').Duplex] + * @typedef {import('@hyperswarm/secret-stream') & { publicKey: Buffer, remotePublicKey: Buffer, handshake: Buffer }} OpenedNoiseStream + */ /** * Utility to await a NoiseSecretStream to open, that returns a stream with the diff --git a/test-e2e/manager-invite.js b/test-e2e/manager-invite.js index 437ba429..4a0b866c 100644 --- a/test-e2e/manager-invite.js +++ b/test-e2e/manager-invite.js @@ -5,7 +5,7 @@ import RAM from 'random-access-memory' import { MEMBER_ROLE_ID } from '../src/capabilities.js' import { InviteResponse_Decision } from '../src/generated/rpc.js' import { MapeoManager, kRPC } from '../src/mapeo-manager.js' -import { replicate } from '../tests/helpers/rpc.js' +import { replicate } from '../tests/helpers/local-peers.js' test('member invite accepted', async (t) => { t.plan(10) @@ -25,7 +25,7 @@ test('member invite accepted', async (t) => { creator[kRPC].on('peers', async (peers) => { t.is(peers.length, 1) - const response = await creatorProject.$member.invite(peers[0].id, { + const response = await creatorProject.$member.invite(peers[0].deviceId, { roleId: MEMBER_ROLE_ID, }) @@ -52,7 +52,7 @@ test('member invite accepted', async (t) => { joiner[kRPC].on('peers', (peers) => { t.is(peers.length, 1) - expectedInvitorPeerId = peers[0].id + expectedInvitorPeerId = peers[0].deviceId }) joiner.invite.on('invite-received', async (invite) => { @@ -119,7 +119,7 @@ test('member invite rejected', async (t) => { creator[kRPC].on('peers', async (peers) => { t.is(peers.length, 1) - const response = await creatorProject.$member.invite(peers[0].id, { + const response = await creatorProject.$member.invite(peers[0].deviceId, { roleId: MEMBER_ROLE_ID, }) @@ -146,7 +146,7 @@ test('member invite rejected', async (t) => { joiner[kRPC].on('peers', (peers) => { t.is(peers.length, 1) - expectedInvitorPeerId = peers[0].id + expectedInvitorPeerId = peers[0].deviceId }) joiner.invite.on('invite-received', async (invite) => { diff --git a/test-e2e/members.js b/test-e2e/members.js index cbef3fef..64c28dee 100644 --- a/test-e2e/members.js +++ b/test-e2e/members.js @@ -11,7 +11,7 @@ import { MEMBER_ROLE_ID, NO_ROLE_CAPABILITIES, } from '../src/capabilities.js' -import { replicate } from '../tests/helpers/rpc.js' +import { replicate } from '../tests/helpers/local-peers.js' test('getting yourself after creating project', async (t) => { const { manager } = setup() @@ -191,7 +191,7 @@ function setup() { }) manager[kRPC].on('peers', (peers) => { - const deviceId = peers[0].id + const deviceId = peers[0].deviceId project.$member .invite(deviceId, { roleId }) .then(() => deferred.resolve(deviceId)) diff --git a/test-types/data-types.ts b/test-types/data-types.ts index f93833ad..10b2fcb1 100644 --- a/test-types/data-types.ts +++ b/test-types/data-types.ts @@ -14,7 +14,7 @@ import { drizzle } from 'drizzle-orm/better-sqlite3' import RAM from 'random-access-memory' import { IndexWriter } from '../dist/index-writer/index.js' import { projectSettingsTable } from '../dist/schema/client.js' -import { MapeoRPC } from '../dist/rpc/index.js' +import { LocalPeers } from '../dist/local-peers.js' import { Expect, type Equal } from './utils.js' type Forks = { forks: string[] } @@ -36,7 +36,7 @@ const mapeoProject = new MapeoProject({ tables: [projectSettingsTable], sqlite, }), - rpc: new MapeoRPC(), + rpc: new LocalPeers(), }) ///// Observations diff --git a/tests/helpers/rpc.js b/tests/helpers/local-peers.js similarity index 78% rename from tests/helpers/rpc.js rename to tests/helpers/local-peers.js index 6858e1f1..1b7a8ce8 100644 --- a/tests/helpers/rpc.js +++ b/tests/helpers/local-peers.js @@ -5,10 +5,9 @@ import NoiseSecretStream from '@hyperswarm/secret-stream' */ /** - * @param {import('../../src/rpc/index.js').MapeoRPC} rpc1 - * @param {import('../../src/rpc/index.js').MapeoRPC} rpc2 + * @param {import('../../src/local-peers.js').LocalPeers} rpc1 + * @param {import('../../src/local-peers.js').LocalPeers} rpc2 * @param { {kp1?: KeyPair, kp2?: KeyPair} } [keyPairs] - * @returns {() => Promise<[void, void]>} */ export function replicate( rpc1, @@ -29,25 +28,24 @@ export function replicate( // @ts-expect-error n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) - // @ts-expect-error rpc1.connect(n1) - // @ts-expect-error rpc2.connect(n2) - return async function destroy() { + /** @param {Error} [e] */ + return async function destroy(e) { return Promise.all([ /** @type {Promise} */ ( new Promise((res) => { n1.on('close', res) - n1.destroy() + n1.destroy(e) }) ), /** @type {Promise} */ ( new Promise((res) => { n2.on('close', res) - n2.destroy() + n2.destroy(e) }) ), ]) diff --git a/tests/invite-api.js b/tests/invite-api.js index de24e127..010a955f 100644 --- a/tests/invite-api.js +++ b/tests/invite-api.js @@ -1,10 +1,10 @@ import test from 'brittle' import { randomBytes } from 'crypto' import { KeyManager } from '@mapeo/crypto' -import { MapeoRPC } from '../src/rpc/index.js' +import { LocalPeers } from '../src/local-peers.js' import { InviteApi } from '../src/invite-api.js' import { projectKeyToPublicId } from '../src/utils.js' -import { replicate } from './helpers/rpc.js' +import { replicate } from './helpers/local-peers.js' import NoiseSecretStream from '@hyperswarm/secret-stream' import pDefer from 'p-defer' @@ -15,7 +15,7 @@ test('invite-received event has expected payload', async (t) => { const projects = new Map() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -33,13 +33,13 @@ test('invite-received event has expected payload', async (t) => { r2.on('peers', (peers) => { t.is(peers.length, 1) - expectedInvitorPeerId = peers[0].id + expectedInvitorPeerId = peers[0].deviceId }) r1.on('peers', (peers) => { t.is(peers.length, 1) - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, projectInfo: { name: 'Mapeo' }, @@ -65,7 +65,7 @@ test('Accept invite', async (t) => { const projects = new Map() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -83,12 +83,12 @@ test('Accept invite', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) - t.is(response, MapeoRPC.InviteResponse.ACCEPT) + t.is(response, LocalPeers.InviteResponse.ACCEPT) }) inviteApi.on('invite-received', async ({ projectId }) => { @@ -109,7 +109,7 @@ test('Reject invite', async (t) => { const projects = new Map() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -127,12 +127,12 @@ test('Reject invite', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) - t.is(response, MapeoRPC.InviteResponse.REJECT) + t.is(response, LocalPeers.InviteResponse.REJECT) }) inviteApi.on('invite-received', async ({ projectId }) => { @@ -152,7 +152,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -169,14 +169,14 @@ test('Receiving invite for project that peer already belongs to', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) t.is( response, - MapeoRPC.InviteResponse.ALREADY, + LocalPeers.InviteResponse.ALREADY, 'invited peer automatically responds with "ALREADY"' ) }) @@ -195,7 +195,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() let isMember = false const inviteApi = new InviteApi({ @@ -213,14 +213,14 @@ test('Receiving invite for project that peer already belongs to', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) t.is( response, - MapeoRPC.InviteResponse.ALREADY, + LocalPeers.InviteResponse.ALREADY, 'invited peer automatically responds with "ALREADY"' ) }) @@ -242,7 +242,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { const projects = new Map() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -257,19 +257,19 @@ test('Receiving invite for project that peer already belongs to', async (t) => { }) r1.on('peers', async (peers) => { - const response1 = await r1.invite(peers[0].id, { + const response1 = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) - t.is(response1, MapeoRPC.InviteResponse.ACCEPT) + t.is(response1, LocalPeers.InviteResponse.ACCEPT) - const response2 = await r1.invite(peers[0].id, { + const response2 = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) - t.is(response2, MapeoRPC.InviteResponse.ALREADY) + t.is(response2, LocalPeers.InviteResponse.ALREADY) }) let inviteReceivedEventCount = 0 @@ -286,7 +286,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { }) test('trying to accept or reject non-existent invite throws', async (t) => { - const rpc = new MapeoRPC() + const rpc = new LocalPeers() const inviteApi = new InviteApi({ rpc, queries: { @@ -307,7 +307,7 @@ test('invitor disconnecting results in accept throwing', async (t) => { const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -322,7 +322,7 @@ test('invitor disconnecting results in accept throwing', async (t) => { r1.on('peers', async (peers) => { if (peers.length !== 1 || peers[0].status === 'disconnected') return await t.exception(() => { - return r1.invite(peers[0].id, { + return r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -345,7 +345,7 @@ test('invitor disconnecting results in invite reject response not throwing', asy const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -359,7 +359,7 @@ test('invitor disconnecting results in invite reject response not throwing', asy if (peers.length !== 1 || peers[0].status === 'disconnected') return await t.exception(() => { - return r1.invite(peers[0].id, { + return r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -381,7 +381,7 @@ test('invitor disconnecting results in invite already response not throwing', as const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() let isMember = false @@ -399,7 +399,7 @@ test('invitor disconnecting results in invite already response not throwing', as if (peers.length !== 1 || peers[0].status === 'disconnected') return await t.exception(() => { - return r1.invite(peers[0].id, { + return r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -422,7 +422,7 @@ test('addProject throwing results in invite accept throwing', async (t) => { const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -435,7 +435,7 @@ test('addProject throwing results in invite accept throwing', async (t) => { }) r1.on('peers', (peers) => { - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -455,7 +455,7 @@ test('Invite from multiple peers', async (t) => { t.plan(5 + invitorCount) const { projectKey, encryptionKeys } = setup() - const invitee = new MapeoRPC() + const invitee = new LocalPeers() const inviteeKeyPair = NoiseSecretStream.keyPair() const projects = new Map() @@ -492,19 +492,19 @@ test('Invite from multiple peers', async (t) => { }) for (let i = 0; i < invitorCount; i++) { - const invitor = new MapeoRPC() + const invitor = new LocalPeers() const keyPair = NoiseSecretStream.keyPair() invitor.on('peers', async (peers) => { if (++connected === invitorCount) deferred.resolve() - const response = await invitor.invite(peers[0].id, { + const response = await invitor.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) if (first === keyPair.publicKey.toString('hex')) { t.pass('One invitor did receive accept response') - t.is(response, MapeoRPC.InviteResponse.ACCEPT, 'accept response') + t.is(response, LocalPeers.InviteResponse.ACCEPT, 'accept response') } else { - t.is(response, MapeoRPC.InviteResponse.ALREADY, 'already response') + t.is(response, LocalPeers.InviteResponse.ALREADY, 'already response') } }) replicate(invitee, invitor, { kp1: inviteeKeyPair, kp2: keyPair }) @@ -517,7 +517,7 @@ test.skip('Invite from multiple peers, first disconnects before accepted, receiv t.plan(8 + invitorCount) const { projectKey, encryptionKeys } = setup() - const invitee = new MapeoRPC() + const invitee = new LocalPeers() const inviteeKeyPair = NoiseSecretStream.keyPair() const projects = new Map() @@ -562,22 +562,22 @@ test.skip('Invite from multiple peers, first disconnects before accepted, receiv }) for (let i = 0; i < invitorCount; i++) { - const invitor = new MapeoRPC() + const invitor = new LocalPeers() const keyPair = NoiseSecretStream.keyPair() const invitorId = keyPair.publicKey.toString('hex') invitor.on('peers', async (peers) => { if (peers[0].status !== 'connected') return if (++connected === invitorCount) deferred.resolve() try { - const response = await invitor.invite(peers[0].id, { + const response = await invitor.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) if (invitorId === invitesReceived[1]) { t.pass('One invitor did receive accept response') - t.is(response, MapeoRPC.InviteResponse.ACCEPT, 'accept response') + t.is(response, LocalPeers.InviteResponse.ACCEPT, 'accept response') } else { - t.is(response, MapeoRPC.InviteResponse.ALREADY, 'already response') + t.is(response, LocalPeers.InviteResponse.ALREADY, 'already response') } } catch (e) { t.is( @@ -598,7 +598,7 @@ test.skip('Invite from multiple peers, first disconnects before accepted, receiv function setup() { const encryptionKeys = { auth: randomBytes(32) } const projectKey = KeyManager.generateProjectKeypair().publicKey - const rpc = new MapeoRPC() + const rpc = new LocalPeers() return { rpc, diff --git a/tests/rpc.js b/tests/local-peers.js similarity index 69% rename from tests/rpc.js rename to tests/local-peers.js index 83cd9069..dc704f8a 100644 --- a/tests/rpc.js +++ b/tests/local-peers.js @@ -1,39 +1,40 @@ // @ts-check import test from 'brittle' import { - MapeoRPC, + LocalPeers, PeerDisconnectedError, TimeoutError, UnknownPeerError, -} from '../src/rpc/index.js' +} from '../src/local-peers.js' import FakeTimers from '@sinonjs/fake-timers' import { once } from 'events' import { Duplex } from 'streamx' -import { replicate } from './helpers/rpc.js' +import { replicate } from './helpers/local-peers.js' import { randomBytes } from 'node:crypto' import NoiseSecretStream from '@hyperswarm/secret-stream' +import Protomux from 'protomux' test('Send invite and accept', async (t) => { t.plan(3) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) - t.is(response, MapeoRPC.InviteResponse.ACCEPT) + t.is(response, LocalPeers.InviteResponse.ACCEPT) }) r2.on('invite', (peerId, invite) => { t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) @@ -41,8 +42,8 @@ test('Send invite and accept', async (t) => { }) test('Send invite immediately', async (t) => { - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -62,33 +63,33 @@ test('Send invite immediately', async (t) => { r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) - t.is(await responsePromise, MapeoRPC.InviteResponse.ACCEPT) + t.is(await responsePromise, LocalPeers.InviteResponse.ACCEPT) }) test('Send invite and reject', async (t) => { t.plan(3) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) - t.is(response, MapeoRPC.InviteResponse.REJECT) + t.is(response, LocalPeers.InviteResponse.REJECT) }) r2.on('invite', (peerId, invite) => { t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.REJECT, + decision: LocalPeers.InviteResponse.REJECT, }) }) @@ -96,8 +97,8 @@ test('Send invite and reject', async (t) => { }) test('Invite to unknown peer', async (t) => { - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) const unknownPeerId = Buffer.allocUnsafe(32).fill(1).toString('hex') @@ -115,7 +116,7 @@ test('Invite to unknown peer', async (t) => { () => r2.inviteResponse(unknownPeerId, { projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }), UnknownPeerError ) @@ -123,25 +124,25 @@ test('Invite to unknown peer', async (t) => { test('Send invite and already on project', async (t) => { t.plan(3) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) - t.is(response, MapeoRPC.InviteResponse.ALREADY) + t.is(response, LocalPeers.InviteResponse.ALREADY) }) r2.on('invite', (peerId, invite) => { t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ALREADY, + decision: LocalPeers.InviteResponse.ALREADY, }) }) @@ -150,8 +151,8 @@ test('Send invite and already on project', async (t) => { test('Send invite with encryption key', async (t) => { t.plan(4) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) const encryptionKeys = { @@ -161,11 +162,11 @@ test('Send invite with encryption key', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) - t.is(response, MapeoRPC.InviteResponse.ACCEPT) + t.is(response, LocalPeers.InviteResponse.ACCEPT) }) r2.on('invite', (peerId, invite) => { @@ -177,7 +178,7 @@ test('Send invite with encryption key', async (t) => { ) r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) @@ -186,20 +187,20 @@ test('Send invite with encryption key', async (t) => { test('Send invite with project info', async (t) => { t.plan(4) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) const projectInfo = { name: 'MyProject' } r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, projectInfo, encryptionKeys: { auth: randomBytes(32) }, }) - t.is(response, MapeoRPC.InviteResponse.ACCEPT) + t.is(response, LocalPeers.InviteResponse.ACCEPT) }) r2.on('invite', (peerId, invite) => { @@ -207,7 +208,7 @@ test('Send invite with project info', async (t) => { t.alike(invite.projectInfo, projectInfo, 'project info is sent with invite') r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) @@ -216,8 +217,8 @@ test('Send invite with project info', async (t) => { test('Disconnected peer shows in state', async (t) => { t.plan(6) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() let peerStateUpdates = 0 r1.on('peers', async (peers) => { @@ -225,7 +226,7 @@ test('Disconnected peer shows in state', async (t) => { if (peers[0].status === 'connected') { t.pass('peer appeared as connected') t.is(++peerStateUpdates, 1) - destroy() + destroy(new Error()) } else { t.pass('peer appeared as disconnected') t.is(++peerStateUpdates, 2) @@ -235,16 +236,26 @@ test('Disconnected peer shows in state', async (t) => { const destroy = replicate(r1, r2) }) +test('next tick disconnect does not throw', async (t) => { + const r1 = new LocalPeers() + const r2 = new LocalPeers() + + const destroy = replicate(r1, r2) + await Promise.resolve() + destroy(new Error()) + t.pass() +}) + test('Disconnect results in rejected invite', async (t) => { t.plan(2) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) r1.on('peers', async (peers) => { if (peers[0].status === 'connected') { - const invite = r1.invite(peers[0].id, { + const invite = r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) @@ -268,9 +279,9 @@ test('Disconnect results in rejected invite', async (t) => { test('Invite to multiple peers', async (t) => { // This is catches not tracking invites by peer t.plan(2) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() - const r3 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() + const r3 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -279,7 +290,7 @@ test('Invite to multiple peers', async (t) => { t.pass('connected to two peers') const responses = await Promise.all( peers.map((peer) => - r1.invite(peer.id, { + r1.invite(peer.deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) @@ -287,7 +298,7 @@ test('Invite to multiple peers', async (t) => { ) t.alike( responses.sort(), - [MapeoRPC.InviteResponse.ACCEPT, MapeoRPC.InviteResponse.REJECT], + [LocalPeers.InviteResponse.ACCEPT, LocalPeers.InviteResponse.REJECT], 'One peer accepted, one rejected' ) }) @@ -295,14 +306,14 @@ test('Invite to multiple peers', async (t) => { r2.on('invite', (peerId, invite) => { r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) r3.on('invite', (peerId, invite) => { r3.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.REJECT, + decision: LocalPeers.InviteResponse.REJECT, }) }) @@ -314,27 +325,27 @@ test('Invite to multiple peers', async (t) => { test('Multiple invites to a peer, only one response', async (t) => { t.plan(2) let count = 0 - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) r1.on('peers', async (peers) => { const responses = await Promise.all([ - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }), - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }), - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }), ]) - const expected = Array(3).fill(MapeoRPC.InviteResponse.ACCEPT) + const expected = Array(3).fill(LocalPeers.InviteResponse.ACCEPT) t.alike(responses, expected) }) @@ -344,7 +355,7 @@ test('Multiple invites to a peer, only one response', async (t) => { t.is(count, 3) r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) @@ -356,13 +367,13 @@ test('Default: invites do not timeout', async (t) => { t.teardown(() => clock.uninstall()) t.plan(1) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) r1.once('peers', async (peers) => { - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }).then( @@ -381,20 +392,21 @@ test('Invite timeout', async (t) => { t.teardown(() => clock.uninstall()) t.plan(1) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) r1.once('peers', async (peers) => { t.exception( - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, - timeout: 5000, + timeout: 1000, encryptionKeys: { auth: randomBytes(32) }, }), TimeoutError ) + // Not working right now, because of the new async code clock.tick(5001) }) @@ -402,8 +414,8 @@ test('Invite timeout', async (t) => { }) test('Reconnect peer and send invite', async (t) => { - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -418,7 +430,7 @@ test('Reconnect peer and send invite', async (t) => { t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) @@ -426,45 +438,45 @@ test('Reconnect peer and send invite', async (t) => { const [peers] = await once(r1, 'peers') t.is(r1.peers.length, 1) t.is(peers[0].status, 'connected') - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) - t.is(response, MapeoRPC.InviteResponse.ACCEPT) + t.is(response, LocalPeers.InviteResponse.ACCEPT) }) test('invalid stream', (t) => { - const r1 = new MapeoRPC() + const r1 = new LocalPeers() const regularStream = new Duplex() - // @ts-expect-error t.exception(() => r1.connect(regularStream), 'Invalid stream') }) test('Send device info', async (t) => { - t.plan(3) - - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() /** @type {import('../src/generated/rpc.js').DeviceInfo} */ const expectedDeviceInfo = { name: 'mapeo' } r1.on('peers', async (peers) => { t.is(peers.length, 1) - r1.sendDeviceInfo(peers[0].id, expectedDeviceInfo) - }) - - r2.on('device-info', ({ deviceId, ...deviceInfo }) => { - t.ok(deviceId) - t.alike(deviceInfo, expectedDeviceInfo) + r1.sendDeviceInfo(peers[0].deviceId, expectedDeviceInfo) }) replicate(r1, r2) + + await new Promise((res) => { + r2.on('peers', (peers) => { + if (!(peers.length === 1 && peers[0].name)) return + t.is(peers[0].name, expectedDeviceInfo.name) + res(true) + }) + }) }) test('Send device info immediately', async (t) => { - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() /** @type {import('../src/generated/rpc.js').DeviceInfo} */ const expectedDeviceInfo = { name: 'mapeo' } @@ -476,16 +488,18 @@ test('Send device info immediately', async (t) => { r1.sendDeviceInfo(kp2.publicKey.toString('hex'), expectedDeviceInfo) - const [{ deviceId, ...deviceInfo }] = await once(r2, 'device-info') - t.ok(deviceId) - t.alike(deviceInfo, expectedDeviceInfo) + await new Promise((res) => { + r2.on('peers', (peers) => { + if (!(peers.length === 1 && peers[0].name)) return + t.is(peers[0].name, expectedDeviceInfo.name) + res(true) + }) + }) }) test('Reconnect peer and send device info', async (t) => { - t.plan(6) - - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() /** @type {import('../src/generated/rpc.js').DeviceInfo} */ const expectedDeviceInfo = { name: 'mapeo' } @@ -497,16 +511,23 @@ test('Reconnect peer and send device info', async (t) => { t.is(r1.peers.length, 1) t.is(r1.peers[0].status, 'disconnected') - r2.on('device-info', ({ deviceId, ...deviceInfo }) => { - t.ok(deviceId) - t.alike(deviceInfo, expectedDeviceInfo) - }) - replicate(r1, r2) - const [peers] = await once(r1, 'peers') + const [r1peers] = await once(r1, 'peers') t.is(r1.peers.length, 1) - t.is(peers[0].status, 'connected') + t.is(r1peers[0].status, 'connected') + + r1.sendDeviceInfo(r1peers[0].deviceId, expectedDeviceInfo) - r1.sendDeviceInfo(peers[0].id, expectedDeviceInfo) + const [r2Peers] = await once(r2, 'peers') + t.is(r2Peers[0].name, expectedDeviceInfo.name) +}) + +test('connected peer has protomux instance', async (t) => { + const r1 = new LocalPeers() + const r2 = new LocalPeers() + replicate(r1, r2) + const [[peer]] = await once(r1, 'peers') + t.is(peer.status, 'connected') + t.ok(Protomux.isProtomux(peer.protomux)) }) From 0444a0b6a396db1e932f5b6ff8807d0f614818ce Mon Sep 17 00:00:00 2001 From: tomasciccola <117094913+tomasciccola@users.noreply.github.com> Date: Tue, 7 Nov 2023 15:30:27 -0300 Subject: [PATCH 3/6] implement IconApi (#335) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Tomás Ciccola Co-authored-by: Andrew Chou --- src/icon-api.js | 243 ++++++++++++++++ tests/icon-api.js | 685 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 928 insertions(+) create mode 100644 src/icon-api.js create mode 100644 tests/icon-api.js diff --git a/src/icon-api.js b/src/icon-api.js new file mode 100644 index 00000000..97d94c56 --- /dev/null +++ b/src/icon-api.js @@ -0,0 +1,243 @@ +export const kGetIconBlob = Symbol('getIcon') + +/** @typedef {import('@mapeo/schema').IconValue['variants']} IconVariants */ +/** @typedef {IconVariants[number]} IconVariant */ + +/** + * @typedef {Object} BitmapOpts + * @property {Extract} mimeType + * @property {IconVariant['pixelDensity']} pixelDensity + * @property {IconVariant['size']} size + * + * @typedef {Object} SvgOpts + * @property {Extract} mimeType + * @property {IconVariant['size']} size + */ + +/** @type {{ [mime in IconVariant['mimeType']]: string }} */ +const MIME_TO_EXTENSION = { + 'image/png': '.png', + 'image/svg+xml': '.svg', +} + +export class IconApi { + #projectId + #dataType + #dataStore + #getMediaBaseUrl + + /** + * @param {Object} opts + * @param {import('./datatype/index.js').DataType< + * import('./datastore/index.js').DataStore<'config'>, + * typeof import('./schema/project.js').iconTable, + * 'icon', + * import('@mapeo/schema').Icon, + * import('@mapeo/schema').IconValue + * >} opts.iconDataType + * @param {import('./datastore/index.js').DataStore<'config'>} opts.iconDataStore + * @param {string} opts.projectId + * @param {() => Promise} opts.getMediaBaseUrl + */ + constructor({ iconDataType, iconDataStore, projectId, getMediaBaseUrl }) { + this.#dataType = iconDataType + this.#dataStore = iconDataStore + this.#projectId = projectId + this.#getMediaBaseUrl = getMediaBaseUrl + } + + /** + * @param {object} icon + * @param {import('@mapeo/schema').IconValue['name']} icon.name + * @param {Array<(BitmapOpts | SvgOpts) & { blob: Buffer }>} icon.variants + * + * @returns {Promise} + */ + async create(icon) { + if (icon.variants.length < 1) { + throw new Error('empty variants array') + } + + const savedVariants = await Promise.all( + icon.variants.map(async ({ blob, ...variant }) => { + const blobVersionId = await this.#dataStore.writeRaw(blob) + + return { + ...variant, + blobVersionId, + pixelDensity: + // Pixel density does not apply to svg variants + // TODO: Ideally @mapeo/schema wouldn't require pixelDensity when the mime type is svg + variant.mimeType === 'image/svg+xml' + ? /** @type {const} */ (1) + : variant.pixelDensity, + } + }) + ) + + const { docId } = await this.#dataType.create({ + schemaName: 'icon', + name: icon.name, + variants: savedVariants, + }) + + return docId + } + + /** + * @param {string} iconId + * @param {BitmapOpts | SvgOpts} opts + * + * @returns {Promise} + */ + async [kGetIconBlob](iconId, opts) { + const iconRecord = await this.#dataType.getByDocId(iconId) + const iconVariant = getBestVariant(iconRecord.variants, opts) + const blob = await this.#dataStore.readRaw(iconVariant.blobVersionId) + return blob + } + + /** + * @param {string} iconId + * @param {BitmapOpts | SvgOpts} opts + * + * @returns {Promise} + */ + async getIconUrl(iconId, opts) { + let base = await this.#getMediaBaseUrl() + + if (!base.endsWith('/')) { + base += '/' + } + + base += `${this.#projectId}/${iconId}/` + + const mimeExtension = MIME_TO_EXTENSION[opts.mimeType] + + if (opts.mimeType === 'image/svg+xml') { + return base + `${opts.size}${mimeExtension}` + } + + return base + `${opts.size}@${opts.pixelDensity}x${mimeExtension}` + } +} + +/** + * @type {Record} + */ +const SIZE_AS_NUMERIC = { + small: 1, + medium: 2, + large: 3, +} + +/** + * Given a list of icon variants returns the variant that most closely matches the desired parameters. + * Rules, in order of precedence: + * + * 1. Matching mime type (throw if no matches) + * 2. Matching size. If no exact match: + * 1. If smaller ones exist, prefer closest smaller size. + * 2. Otherwise prefer closest larger size. + * 3. Matching pixel density. If no exact match: + * 1. If smaller ones exist, prefer closest smaller density. + * 2. Otherwise prefer closest larger density. + * + * @param {IconVariants} variants + * @param {BitmapOpts | SvgOpts} opts + */ +export function getBestVariant(variants, opts) { + const { size: wantedSize, mimeType: wantedMimeType } = opts + // Pixel density doesn't matter for svg so default to 1 + const wantedPixelDensity = + opts.mimeType === 'image/svg+xml' ? 1 : opts.pixelDensity + + if (variants.length === 0) { + throw new Error('No variants exist') + } + + const matchingMime = variants.filter((v) => v.mimeType === wantedMimeType) + + if (matchingMime.length === 0) { + throw new Error( + `No variants with desired mime type ${wantedMimeType} exist` + ) + } + + const wantedSizeNum = SIZE_AS_NUMERIC[wantedSize] + + // Sort the relevant variants based on the desired size and pixel density, using the rules of the preference. + // Sorted from closest match to furthest match. + matchingMime.sort((a, b) => { + const aSizeNum = SIZE_AS_NUMERIC[a.size] + const bSizeNum = SIZE_AS_NUMERIC[b.size] + + const aSizeDiff = aSizeNum - wantedSizeNum + const bSizeDiff = bSizeNum - wantedSizeNum + + // Both variants match desired size, use pixel density to determine preferred match + if (aSizeDiff === 0 && bSizeDiff === 0) { + // Pixel density doesn't matter for svg but prefer lower for consistent results + if (opts.mimeType === 'image/svg+xml') { + return a.pixelDensity <= b.pixelDensity ? -1 : 1 + } + + return determineSortValue( + wantedPixelDensity, + a.pixelDensity, + b.pixelDensity + ) + } + + return determineSortValue(wantedSizeNum, aSizeNum, bSizeNum) + }) + + // Closest match will be first element + return matchingMime[0] +} + +/** + * Determines a sort value based on the order of precedence outlined below. Winning value moves closer to front. + * + * 1. Exactly match `target` + * 2. Closest value smaller than `target` + * 3. Closest value larger than `target` + * + * @param {number} target + * @param {number} a + * @param {number} b + * + * @returns {-1 | 0 | 1} + */ +function determineSortValue(target, a, b) { + const aDiff = a - target + const bDiff = b - target + + // Both match exactly, don't change sort order + if (aDiff === 0 && bDiff === 0) { + return 0 + } + + // a matches but b doesn't, prefer a + if (aDiff === 0 && bDiff !== 0) { + return -1 + } + + // b matches but a doesn't, prefer b + if (bDiff === 0 && aDiff !== 0) { + return 1 + } + + // Both are larger than desired, prefer smaller of the two + if (aDiff > 0 && bDiff > 0) { + return a < b ? -1 : 1 + } + + // Both are smaller than desired, prefer larger of the two + if (aDiff < 0 && bDiff < 0) { + return a < b ? 1 : -1 + } + + // Mix of smaller and larger than desired, prefer smaller of the two + return a < b ? -1 : 1 +} diff --git a/tests/icon-api.js b/tests/icon-api.js new file mode 100644 index 00000000..51c736d4 --- /dev/null +++ b/tests/icon-api.js @@ -0,0 +1,685 @@ +// @ts-check +import test from 'brittle' +import RAM from 'random-access-memory' +import Database from 'better-sqlite3' +import { drizzle } from 'drizzle-orm/better-sqlite3' +import { migrate } from 'drizzle-orm/better-sqlite3/migrator' +import { randomBytes } from 'node:crypto' + +import { IconApi, kGetIconBlob, getBestVariant } from '../src/icon-api.js' +import { DataType } from '../src/datatype/index.js' +import { DataStore } from '../src/datastore/index.js' +import { createCoreManager } from './helpers/core-manager.js' +import { iconTable } from '../src/schema/project.js' +import { IndexWriter } from '../src/index-writer/index.js' + +test('create()', async (t) => { + const { iconApi, iconDataType } = setup() + + const expectedName = 'myIcon' + + const bitmapBlob = randomBytes(128) + const svgBlob = randomBytes(128) + + await t.exception(async () => { + return iconApi.create({ name: expectedName, variants: [] }) + }, 'throws when no variants are provided') + + /** @type {Parameters[0]['variants']} */ + const expectedVariants = [ + { + size: 'small', + pixelDensity: 1, + mimeType: 'image/png', + blob: bitmapBlob, + }, + { + size: 'small', + mimeType: 'image/svg+xml', + blob: svgBlob, + }, + ] + + const iconId = await iconApi.create({ + name: expectedName, + variants: expectedVariants, + }) + + t.ok(iconId, 'returns document id') + + const doc = await iconDataType.getByDocId(iconId) + + t.ok(doc, 'icon document created') + t.is(doc.name, expectedName, 'document has expected icon name') + t.is( + doc.variants.length, + expectedVariants.length, + 'document has expected icon name' + ) + + for (const expected of expectedVariants) { + const match = doc.variants.find((v) => { + const mimeTypeMatches = v.mimeType === expected.mimeType + const sizeMatches = v.size === expected.size + + if (expected.mimeType === 'image/svg+xml') { + return mimeTypeMatches && sizeMatches + } + + const pixelDensityMatches = v.pixelDensity === expected.pixelDensity + return mimeTypeMatches && sizeMatches && pixelDensityMatches + }) + + t.ok(match, 'variant is saved') + + // TODO: Do we need to check the blobVersionId field? + } +}) + +test('[kGetIconBlob]()', async (t) => { + const { iconApi } = setup() + + const expectedName = 'myIcon' + + const bitmapBlob = randomBytes(128) + const svgBlob = randomBytes(128) + + /** @type {Parameters[0]['variants']} */ + const expectedVariants = [ + { + size: 'small', + pixelDensity: 1, + mimeType: 'image/png', + blob: bitmapBlob, + }, + { + size: 'large', + mimeType: 'image/svg+xml', + blob: svgBlob, + }, + ] + + const iconId = await iconApi.create({ + name: expectedName, + variants: expectedVariants, + }) + + // Bitmap exact + { + const result = await iconApi[kGetIconBlob](iconId, { + size: 'small', + pixelDensity: 1, + mimeType: 'image/png', + }) + + t.alike(result, bitmapBlob, 'returns expected bitmap blob') + } + + // SVG exact + { + const result = await iconApi[kGetIconBlob](iconId, { + size: 'large', + mimeType: 'image/svg+xml', + }) + + t.alike(result, svgBlob, 'returns expected svg blob') + } + + /// See more extensive non-exact testing in getBestVariant() tests further down + + // Bitmap non-exact + { + const result = await iconApi[kGetIconBlob](iconId, { + size: 'medium', + pixelDensity: 2, + mimeType: 'image/png', + }) + + t.alike(result, bitmapBlob, 'returns expected bitmap blob') + } + + // SVG non-exact + { + const result = await iconApi[kGetIconBlob](iconId, { + size: 'medium', + mimeType: 'image/svg+xml', + }) + + t.alike(result, svgBlob, 'returns expected svg blob') + } +}) + +test(`getIconUrl()`, async (t) => { + let mediaBaseUrl = 'http://127.0.0.1:8080/icons/' + + const { iconApi, projectId } = setup({ + getMediaBaseUrl: async () => mediaBaseUrl, + }) + + const iconId = randomBytes(32).toString('hex') + + { + const url = await iconApi.getIconUrl(iconId, { + size: 'small', + mimeType: 'image/png', + pixelDensity: 1, + }) + + t.is( + url, + mediaBaseUrl + `${projectId}/${iconId}/small@1x.png`, + 'returns expected bitmap icon url' + ) + } + + { + const url = await iconApi.getIconUrl(iconId, { + size: 'small', + mimeType: 'image/svg+xml', + }) + + t.is( + url, + mediaBaseUrl + `${projectId}/${iconId}/small.svg`, + 'returns expected svg icon url' + ) + } + + // Change media base url (e.g. port changes) + mediaBaseUrl = 'http://127.0.0.1:3000/' + + { + const url = await iconApi.getIconUrl(iconId, { + size: 'medium', + mimeType: 'image/png', + pixelDensity: 2, + }) + + t.is( + url, + mediaBaseUrl + `${projectId}/${iconId}/medium@2x.png`, + 'returns expected bitmap icon url after media base url changes' + ) + } + + { + const url = await iconApi.getIconUrl(iconId, { + size: 'large', + mimeType: 'image/svg+xml', + }) + + t.is( + url, + mediaBaseUrl + `${projectId}/${iconId}/large.svg`, + 'returns expected svg icon url after media base url changes' + ) + } +}) + +test('getBestVariant() - no variants exist', (t) => { + t.exception(() => { + return getBestVariant([], { + mimeType: 'image/png', + size: 'small', + pixelDensity: 1, + }) + }, 'throws when no variants exist') +}) + +test('getBestVariant() - specify mimeType', (t) => { + /** @type {Pick} */ + const common = { pixelDensity: 1, size: 'small' } + + const pngVariant = createIconVariant({ + ...common, + mimeType: 'image/png', + }) + + const svgVariant = createIconVariant({ + ...common, + mimeType: 'image/svg+xml', + }) + + t.test('request mime type with match present', (st) => { + /** @type {Array<[import('@mapeo/schema').Icon['variants'][number]['mimeType'], import('@mapeo/schema').Icon['variants'][number]]>} */ + const pairs = [ + ['image/png', pngVariant], + ['image/svg+xml', svgVariant], + ] + + for (const [mimeType, expectedVariant] of pairs) { + const result = getBestVariant([pngVariant, svgVariant], { + ...common, + mimeType, + }) + + st.alike( + result, + getBestVariant([pngVariant, svgVariant].reverse(), { + ...common, + mimeType, + }), + 'same result regardless of variants order' + ) + + st.alike( + result, + expectedVariant, + `returns variant with desired mime type (${mimeType})` + ) + } + }) + + t.test('request a mime type with no match present', (st) => { + st.exception(() => { + getBestVariant([pngVariant], { + ...common, + mimeType: 'image/svg+xml', + }) + }, 'throws when no match for svg exists') + + st.exception(() => { + getBestVariant([svgVariant], { + ...common, + mimeType: 'image/png', + }) + }, 'throws when no match for png exists') + }) +}) + +test('getBestVariant() - specify size', (t) => { + /** @type {Pick} */ + const common = { pixelDensity: 1, mimeType: 'image/png' } + + const smallVariant = createIconVariant({ + ...common, + size: 'small', + }) + + const mediumVariant = createIconVariant({ + ...common, + size: 'medium', + }) + + const largeVariant = createIconVariant({ + ...common, + size: 'large', + }) + + t.test('request size with match present', (st) => { + /** @type {Array<[import('@mapeo/schema').Icon['variants'][number]['size'], import('@mapeo/schema').Icon['variants'][number]]>} */ + const pairs = [ + ['small', smallVariant], + ['medium', mediumVariant], + ['large', largeVariant], + ] + for (const [size, expectedVariant] of pairs) { + const result = getBestVariant( + [smallVariant, mediumVariant, largeVariant], + { ...common, size } + ) + + st.alike( + result, + getBestVariant([smallVariant, mediumVariant, largeVariant].reverse(), { + ...common, + size, + }), + 'same result regardless of variants order' + ) + + st.alike( + result, + expectedVariant, + `returns variant with desired size (${size})` + ) + } + }) + + t.test('request size with only smaller existing', (st) => { + const result = getBestVariant([smallVariant, mediumVariant], { + ...common, + size: 'large', + }) + + st.alike( + result, + getBestVariant([smallVariant, mediumVariant].reverse(), { + ...common, + size: 'large', + }), + 'same result regardless of variants order' + ) + + st.alike(result, mediumVariant, 'returns closest smaller size') + }) + + t.test('request size with both larger and smaller existing', (st) => { + const result = getBestVariant([smallVariant, largeVariant], { + ...common, + size: 'medium', + }) + + st.alike( + result, + getBestVariant([smallVariant, largeVariant].reverse(), { + ...common, + size: 'medium', + }), + 'same result regardless of variants order' + ) + + st.alike(result, smallVariant, 'returns smaller size') + }) + + t.test('request size with only larger existing', (st) => { + const result = getBestVariant([mediumVariant, largeVariant], { + ...common, + size: 'small', + }) + + st.alike( + result, + getBestVariant([mediumVariant, largeVariant].reverse(), { + ...common, + size: 'small', + }), + 'same result regardless of variants order' + ) + + st.alike(result, mediumVariant, 'returns closest larger size') + }) +}) + +test('getBestVariant() - specify pixel density', (t) => { + /** @type {Pick} */ + const common = { size: 'small', mimeType: 'image/png' } + + const density1Variant = createIconVariant({ + ...common, + pixelDensity: 1, + }) + + const density2Variant = createIconVariant({ + ...common, + pixelDensity: 2, + }) + + const density3Variant = createIconVariant({ + ...common, + pixelDensity: 3, + }) + + t.test('request pixel density with match present', (st) => { + /** @type {Array<[import('@mapeo/schema').Icon['variants'][number]['pixelDensity'], import('@mapeo/schema').Icon['variants'][number]]>} */ + const pairs = [ + [1, density1Variant], + [2, density2Variant], + [3, density3Variant], + ] + for (const [pixelDensity, expectedVariant] of pairs) { + const result = getBestVariant( + [density1Variant, density2Variant, density3Variant], + { ...common, pixelDensity } + ) + + st.alike( + result, + getBestVariant( + [density1Variant, density2Variant, density3Variant].reverse(), + { ...common, pixelDensity } + ), + 'same result regardless of variants order' + ) + + st.alike( + result, + expectedVariant, + `returns variant with desired pixel density (${pixelDensity})` + ) + } + }) + + t.test('request pixel density with only smaller existing', (st) => { + const result = getBestVariant([density1Variant, density2Variant], { + ...common, + pixelDensity: 3, + }) + + st.alike( + result, + getBestVariant([density1Variant, density2Variant].reverse(), { + ...common, + pixelDensity: 3, + }), + 'same result regardless of variants order' + ) + + st.alike(result, density2Variant, 'returns closest smaller density') + }) + + t.test( + 'request pixel density with both larger and smaller existing', + (st) => { + const result = getBestVariant([density1Variant, density3Variant], { + ...common, + pixelDensity: 2, + }) + + st.alike( + result, + getBestVariant([density1Variant, density3Variant].reverse(), { + ...common, + pixelDensity: 2, + }), + 'same result regardless of variants order' + ) + + st.alike(result, density1Variant, 'returns smaller density') + } + ) + + t.test('request pixel density with only larger existing', (st) => { + const result = getBestVariant([density2Variant, density3Variant], { + ...common, + pixelDensity: 1, + }) + + st.alike( + result, + getBestVariant([density2Variant, density3Variant].reverse(), { + ...common, + pixelDensity: 1, + }), + 'same result regardless of variants order' + ) + + st.alike(result, density2Variant, 'returns closest larger density') + }) +}) + +test('getBestVariant() - params prioritization', (t) => { + const wantedSizePngVariant = createIconVariant({ + mimeType: 'image/png', + pixelDensity: 1, + size: 'small', + }) + + const wantedPixelDensityPngVariant = createIconVariant({ + mimeType: 'image/png', + pixelDensity: 2, + size: 'medium', + }) + + const wantedSizeSvgVariant = createIconVariant({ + mimeType: 'image/svg+xml', + pixelDensity: 1, + size: 'small', + }) + + const wantedPixelDensitySvgVariant = createIconVariant({ + mimeType: 'image/svg+xml', + pixelDensity: 2, + size: 'medium', + }) + + const result = getBestVariant( + [ + wantedSizePngVariant, + wantedPixelDensityPngVariant, + wantedSizeSvgVariant, + wantedPixelDensitySvgVariant, + ], + { + mimeType: 'image/svg+xml', + size: 'small', + } + ) + + t.alike( + result, + getBestVariant( + [ + wantedSizePngVariant, + wantedPixelDensityPngVariant, + wantedSizeSvgVariant, + wantedPixelDensitySvgVariant, + ].reverse(), + { + mimeType: 'image/svg+xml', + size: 'small', + } + ), + 'same result regardless of variants order' + ) + + t.alike(result, wantedSizeSvgVariant, 'mime type > size > pixel density') +}) + +// TODO: The IconApi doesn't allow creating svg variants with a custom pixel density, so maybe can remove this test? +test('getBestVariant() - svg requests are not affected by pixel density', (t) => { + /** @type {Pick} */ + const common = { size: 'small', mimeType: 'image/svg+xml' } + + const variant1 = createIconVariant({ ...common, pixelDensity: 1 }) + const variant2 = createIconVariant({ ...common, pixelDensity: 2 }) + const variant3 = createIconVariant({ ...common, pixelDensity: 3 }) + + const result = getBestVariant([variant1, variant2, variant3], { + size: 'small', + mimeType: 'image/svg+xml', + }) + + t.alike( + result, + getBestVariant([variant1, variant2, variant3].reverse(), { + mimeType: 'image/svg+xml', + size: 'small', + }), + 'same result regardless of variants order' + ) + + t.alike(result, variant1) +}) + +// TODO: Currently fails. Not sure if we'd run into this situation often in reality +test( + 'getBestVariant - multiple exact matches return deterministic result', + { todo: true }, + (t) => { + const variantA = createIconVariant({ + size: 'small', + pixelDensity: 1, + mimeType: 'image/svg+xml', + }) + const variantB = createIconVariant({ + size: 'small', + pixelDensity: 1, + mimeType: 'image/svg+xml', + }) + + const result = getBestVariant([variantA, variantB], { + size: 'small', + mimeType: 'image/svg+xml', + }) + + t.alike( + result, + getBestVariant([variantA, variantB].reverse(), { + mimeType: 'image/svg+xml', + size: 'small', + }), + 'same result regardless of variants order' + ) + + t.alike(result, variantA) + } +) + +/** + * + * @param {{ getMediaBaseUrl?: () => Promise }} [opts] + */ +function setup({ + getMediaBaseUrl = async () => 'http://127.0.0.1:8080/icons', +} = {}) { + const cm = createCoreManager() + const sqlite = new Database(':memory:') + const db = drizzle(sqlite) + + migrate(db, { + migrationsFolder: new URL('../drizzle/project', import.meta.url).pathname, + }) + + const indexWriter = new IndexWriter({ + tables: [iconTable], + sqlite, + }) + + const iconDataStore = new DataStore({ + namespace: 'config', + coreManager: cm, + storage: () => new RAM(), + batch: async (entries) => indexWriter.batch(entries), + }) + + const iconDataType = new DataType({ + dataStore: iconDataStore, + table: iconTable, + db, + }) + + const projectId = randomBytes(32).toString('hex') + + const iconApi = new IconApi({ + iconDataStore, + iconDataType, + projectId, + getMediaBaseUrl, + }) + + return { + projectId, + iconApi, + iconDataType, + } +} + +function createRandomVersionId(index = 0) { + return randomBytes(32).toString('hex') + `/${index}` +} + +/** + * @param {object} opts + * @param {import('@mapeo/schema').Icon['variants'][number]['size']} opts.size + * @param {import('@mapeo/schema').Icon['variants'][number]['mimeType']} opts.mimeType + * @param {import('@mapeo/schema').Icon['variants'][number]['pixelDensity']} opts.pixelDensity + * + * @returns {import('@mapeo/schema').Icon['variants'][number]} + */ +function createIconVariant(opts) { + return { + ...opts, + blobVersionId: createRandomVersionId(), + } +} From a32cab03c0111e41db44b89abfc8776657dd1b9e Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 9 Nov 2023 15:52:37 +0900 Subject: [PATCH 4/6] feat: integrate LocalDiscovery & LocalPeers (#358) --- src/local-peers.js | 21 ++++++++---- src/mapeo-manager.js | 67 +++++++++++++++++++++++++++---------- src/mapeo-project.js | 34 ++++++++++++++++--- src/sync/sync-controller.js | 35 +++---------------- test-types/data-types.ts | 2 +- 5 files changed, 98 insertions(+), 61 deletions(-) diff --git a/src/local-peers.js b/src/local-peers.js index 5989a5bd..32c0d230 100644 --- a/src/local-peers.js +++ b/src/local-peers.js @@ -30,7 +30,7 @@ const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)]) * @property {string | undefined} name */ /** @typedef {PeerInfoBase & { status: 'connecting' }} PeerInfoConnecting */ -/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux }} PeerInfoConnected */ +/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux }} PeerInfoConnected */ /** @typedef {PeerInfoBase & { status: 'disconnected', disconnectedAt: number }} PeerInfoDisconnected */ /** @typedef {PeerInfoConnecting | PeerInfoConnected | PeerInfoDisconnected} PeerInfoInternal */ @@ -57,7 +57,7 @@ class Peer { #name #connectedAt = 0 #disconnectedAt = 0 - /** @type {Protomux} */ + /** @type {Protomux} */ #protomux /** @@ -103,7 +103,7 @@ class Peer { } } } - /** @param {Protomux} protomux */ + /** @param {Protomux} protomux */ connect(protomux) { this.#protomux = protomux /* c8 ignore next 3 */ @@ -166,7 +166,9 @@ class Peer { /** * @typedef {object} LocalPeersEvents * @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status + * @property {(peer: PeerInfoConnected) => void} peer-add Emitted when a new peer is connected * @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received + * @property {(discoveryKey: Buffer, stream: import('./types.js').ReplicationStream) => void} discovery-key Emitted when a new hypercore is replicated (by a peer) to a peer replication stream (passed as the second parameter) */ /** @extends {TypedEmitter} */ @@ -272,6 +274,13 @@ export class LocalPeers extends TypedEmitter { stream.userData = protomux this.#opening.add(stream.opened) + protomux.pair( + { protocol: 'hypercore/alpha' }, + /** @param {Buffer} discoveryKey */ async (discoveryKey) => { + this.emit('discovery-key', discoveryKey, stream.rawStream) + } + ) + // No need to connect error handler to stream because Protomux does this, // and errors are eventually handled by #closePeer @@ -319,16 +328,16 @@ export class LocalPeers extends TypedEmitter { /** * @param {Buffer} publicKey - * @param {Protomux} protomux + * @param {Protomux} protomux */ #openPeer(publicKey, protomux) { const peerId = keyToId(publicKey) const peer = this.#peers.get(peerId) /* c8 ignore next */ if (!peer) return // TODO: report error - this should not happen - const wasConnected = peer.info.status === 'connected' peer.connect(protomux) - if (!wasConnected) this.#emitPeers() + this.#emitPeers() + this.emit('peer-add', /** @type {PeerInfoConnected} */ (peer.info)) } /** @param {Buffer} publicKey */ diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index 32965a23..e37110e2 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -17,6 +17,8 @@ import { ProjectKeys } from './generated/keys.js' import { deNullify, getDeviceId, + keyToId, + openedNoiseSecretStream, projectIdToNonce, projectKeyToId, projectKeyToPublicId, @@ -24,6 +26,7 @@ import { import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js' import { LocalPeers } from './local-peers.js' import { InviteApi } from './invite-api.js' +import { LocalDiscovery } from './discovery/local-discovery.js' /** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */ @@ -48,8 +51,9 @@ export class MapeoManager { #coreStorage #dbFolder #deviceId - #rpc + #localPeers #invite + #localDiscovery /** * @param {Object} opts @@ -69,7 +73,7 @@ export class MapeoManager { migrationsFolder: new URL('../drizzle/client', import.meta.url).pathname, }) - this.#rpc = new LocalPeers() + this.#localPeers = new LocalPeers() this.#keyManager = new KeyManager(rootKey) this.#deviceId = getDeviceId(this.#keyManager) this.#projectSettingsIndexWriter = new IndexWriter({ @@ -79,7 +83,7 @@ export class MapeoManager { this.#activeProjects = new Map() this.#invite = new InviteApi({ - rpc: this.#rpc, + rpc: this.#localPeers, queries: { isMember: async (projectId) => { const projectExists = this.#db @@ -99,17 +103,43 @@ export class MapeoManager { if (typeof coreStorage === 'string') { const pool = new RandomAccessFilePool(MAX_FILE_DESCRIPTORS) // @ts-ignore - this.#coreStorage = Hypercore.createStorage(coreStorage, { pool }) + this.#coreStorage = Hypercore.defaultStorage(coreStorage, { pool }) } else { this.#coreStorage = coreStorage } + + this.#localDiscovery = new LocalDiscovery({ + identityKeypair: this.#keyManager.getIdentityKeypair(), + }) + this.#localDiscovery.on('connection', this.replicate.bind(this)) } /** * MapeoRPC instance, used for tests */ get [kRPC]() { - return this.#rpc + return this.#localPeers + } + + /** + * Replicate Mapeo to a `@hyperswarm/secret-stream`. Should only be used for + * local (trusted) connections, because the RPC channel key is public + * + * @param {import('@hyperswarm/secret-stream')} noiseStream + */ + replicate(noiseStream) { + const replicationStream = this.#localPeers.connect(noiseStream) + Promise.all([this.getDeviceInfo(), openedNoiseSecretStream(noiseStream)]) + .then(([{ name }, openedNoiseStream]) => { + if (openedNoiseStream.destroyed || !name) return + const peerId = keyToId(openedNoiseStream.remotePublicKey) + return this.#localPeers.sendDeviceInfo(peerId, { name }) + }) + .catch((e) => { + // Ignore error but log + console.error('Failed to send device info to peer', e) + }) + return replicationStream } /** @@ -205,15 +235,10 @@ export class MapeoManager { }) // 4. Create MapeoProject instance - const project = new MapeoProject({ - ...this.#projectStorage(projectId), + const project = this.#createProjectInstance({ encryptionKeys, - keyManager: this.#keyManager, projectKey: projectKeypair.publicKey, projectSecretKey: projectKeypair.secretKey, - sharedDb: this.#db, - sharedIndexWriter: this.#projectSettingsIndexWriter, - rpc: this.#rpc, }) // 5. Write project name and any other relevant metadata to project instance @@ -263,19 +288,25 @@ export class MapeoManager { projectId ) - const project = new MapeoProject({ + const project = this.#createProjectInstance(projectKeys) + + // 3. Keep track of project instance as we know it's a properly existing project + this.#activeProjects.set(projectPublicId, project) + + return project + } + + /** @param {ProjectKeys} projectKeys */ + #createProjectInstance(projectKeys) { + const projectId = keyToId(projectKeys.projectKey) + return new MapeoProject({ ...this.#projectStorage(projectId), ...projectKeys, keyManager: this.#keyManager, sharedDb: this.#db, sharedIndexWriter: this.#projectSettingsIndexWriter, - rpc: this.#rpc, + localPeers: this.#localPeers, }) - - // 3. Keep track of project instance as we know it's a properly existing project - this.#activeProjects.set(projectPublicId, project) - - return project } /** diff --git a/src/mapeo-project.js b/src/mapeo-project.js index ed24cbc5..cf980517 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -33,6 +33,7 @@ import { Capabilities } from './capabilities.js' import { getDeviceId, projectKeyToId, valueOf } from './utils.js' import { MemberApi } from './member-api.js' import { SyncController } from './sync/sync-controller.js' +import Hypercore from 'hypercore' /** @typedef {Omit} EditableProjectSettings */ @@ -67,7 +68,7 @@ export class MapeoProject { * @param {import('drizzle-orm/better-sqlite3').BetterSQLite3Database} opts.sharedDb * @param {IndexWriter} opts.sharedIndexWriter * @param {import('./types.js').CoreStorage} opts.coreStorage Folder to store all hypercore data - * @param {import('./local-peers.js').LocalPeers} opts.rpc + * @param {import('./local-peers.js').LocalPeers} opts.localPeers * */ constructor({ @@ -79,7 +80,7 @@ export class MapeoProject { projectKey, projectSecretKey, encryptionKeys, - rpc, + localPeers, }) { this.#deviceId = getDeviceId(keyManager) this.#projectId = projectKeyToId(projectKey) @@ -237,7 +238,7 @@ export class MapeoProject { // @ts-expect-error encryptionKeys, projectKey, - rpc, + rpc: localPeers, dataTypes: { deviceInfo: this.#dataTypes.deviceInfo, project: this.#dataTypes.projectSettings, @@ -249,6 +250,26 @@ export class MapeoProject { capabilities: this.#capabilities, }) + // Replicate already connected local peers + for (const peer of localPeers.peers) { + if (peer.status !== 'connected') continue + this.#syncController.replicate(peer.protomux) + } + + localPeers.on('discovery-key', (discoveryKey, stream) => { + // The core identified by this discovery key might not be part of this + // project, but we can't know that so we will request it from the peer if + // we don't have it. The peer will not return the core key unless it _is_ + // part of this project + this.#coreManager.handleDiscoveryKey(discoveryKey, stream) + }) + + // When a new peer is found, try to replicate (if it is not a member of the + // project it will fail the capability check and be ignored) + localPeers.on('peer-add', (peer) => { + this.#syncController.replicate(peer.protomux) + }) + ///////// 4. Write core ownership record const deferred = pDefer() @@ -396,11 +417,14 @@ export class MapeoProject { /** * - * @param {import('./types.js').ReplicationStream} stream + * @param {Exclude[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance * @returns */ [kReplicate](stream) { - return this.#syncController.replicate(stream) + const replicationStream = Hypercore.createProtocolStream(stream, {}) + const protomux = replicationStream.noiseStream.userData + // @ts-ignore - got fed up jumping through hoops to keep TS heppy + return this.#syncController.replicate(protomux) } /** diff --git a/src/sync/sync-controller.js b/src/sync/sync-controller.js index f68d2f6e..a32b6cb9 100644 --- a/src/sync/sync-controller.js +++ b/src/sync/sync-controller.js @@ -1,6 +1,4 @@ -import Hypercore from 'hypercore' import { TypedEmitter } from 'tiny-typed-emitter' -import Protomux from 'protomux' import { SyncState } from './sync-state.js' import { PeerSyncController } from './peer-sync-controller.js' @@ -8,7 +6,7 @@ export class SyncController extends TypedEmitter { #syncState #coreManager #capabilities - /** @type {Map} */ + /** @type {Map} */ #peerSyncControllers = new Map() /** @@ -30,35 +28,10 @@ export class SyncController extends TypedEmitter { } /** - * @param {Exclude[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance + * @param {import('protomux')} protomux A protomux instance */ - replicate(stream) { - if ( - Protomux.isProtomux(stream) || - ('userData' in stream && Protomux.isProtomux(stream.userData)) || - ('noiseStream' in stream && - Protomux.isProtomux(stream.noiseStream.userData)) - ) { - console.warn( - 'Passed an existing protocol stream to syncController.replicate(). Currently any pairing for the `hypercore/alpha` protocol is overwritten' - ) - } - const protocolStream = Hypercore.createProtocolStream(stream, { - ondiscoverykey: /** @param {Buffer} discoveryKey */ (discoveryKey) => { - return this.#coreManager.handleDiscoveryKey(discoveryKey, stream) - }, - }) - const protomux = - // Need to coerce this until we update Hypercore.createProtocolStream types - /** @type {import('protomux')} */ ( - protocolStream.noiseStream.userData - ) - if (!protomux) throw new Error('Invalid stream') - - if (this.#peerSyncControllers.has(protomux)) { - console.warn('Already replicating to this stream') - return - } + replicate(protomux) { + if (this.#peerSyncControllers.has(protomux)) return const peerSyncController = new PeerSyncController({ protomux, diff --git a/test-types/data-types.ts b/test-types/data-types.ts index 10b2fcb1..aa24bed8 100644 --- a/test-types/data-types.ts +++ b/test-types/data-types.ts @@ -36,7 +36,7 @@ const mapeoProject = new MapeoProject({ tables: [projectSettingsTable], sqlite, }), - rpc: new LocalPeers(), + localPeers: new LocalPeers(), }) ///// Observations From ab77e5119b2a5904d865f38cec58e1722c7061a3 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 9 Nov 2023 15:58:59 +0900 Subject: [PATCH 5/6] feat: `listLocalPeers()` & `local-peers` event (#360) * WIP initial work * rename Rpc to LocalPeers * Handle deviceInfo internally, id -> deviceId * Tests for stream error handling * remove unnecessary constructor * return replication stream * Attach protomux instance to peer info * rename and re-organize * revert changes outside scope of PR * WIP initial work * Tie everything together * rename getProjectInstance * feat: client.listLocalPeers() & `local-peers` evt --- src/mapeo-manager.js | 52 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index e37110e2..99922662 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -27,6 +27,7 @@ import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js' import { LocalPeers } from './local-peers.js' import { InviteApi } from './invite-api.js' import { LocalDiscovery } from './discovery/local-discovery.js' +import { TypedEmitter } from 'tiny-typed-emitter' /** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */ @@ -40,7 +41,19 @@ const MAX_FILE_DESCRIPTORS = 768 export const kRPC = Symbol('rpc') -export class MapeoManager { +/** + * @typedef {Omit} PublicPeerInfo + */ + +/** + * @typedef {object} MapeoManagerEvents + * @property {(peers: PublicPeerInfo[]) => void} local-peers Emitted when the list of connected peers changes (new ones added, or connection status changes) + */ + +/** + * @extends {TypedEmitter} + */ +export class MapeoManager extends TypedEmitter { #keyManager #projectSettingsIndexWriter #db @@ -62,6 +75,7 @@ export class MapeoManager { * @param {string | import('./types.js').CoreStorage} opts.coreStorage Folder for hypercore storage or a function that returns a RandomAccessStorage instance */ constructor({ rootKey, dbFolder, coreStorage }) { + super() this.#dbFolder = dbFolder const sqlite = new Database( dbFolder === ':memory:' @@ -74,6 +88,10 @@ export class MapeoManager { }) this.#localPeers = new LocalPeers() + this.#localPeers.on('peers', (peers) => { + this.emit('local-peers', omitPeerProtomux(peers)) + }) + this.#keyManager = new KeyManager(rootKey) this.#deviceId = getDeviceId(this.#keyManager) this.#projectSettingsIndexWriter = new IndexWriter({ @@ -456,4 +474,36 @@ export class MapeoManager { get invite() { return this.#invite } + + /** + * @returns {Promise} + */ + async listLocalPeers() { + return omitPeerProtomux(this.#localPeers.peers) + } +} + +// We use the `protomux` property of connected peers internally, but we don't +// expose it to the API. I have avoided using a private symbol for this for fear +// that we could accidentally keep references around of protomux instances, +// which could cause a memory leak (it shouldn't, but just to eliminate the +// possibility) + +/** + * Remove the protomux property of connected peers + * + * @param {import('./local-peers.js').PeerInfo[]} peers + * @returns {PublicPeerInfo[]} + */ +function omitPeerProtomux(peers) { + return peers.map( + ({ + // @ts-ignore + // eslint-disable-next-line no-unused-vars + protomux, + ...publicPeerInfo + }) => { + return publicPeerInfo + } + ) } From ca95b2f1105d850c483a9da69a4ed0e0c060ae87 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 9 Nov 2023 16:10:05 +0900 Subject: [PATCH 6/6] feat: add `$sync` API methods (#361) * WIP initial work * rename Rpc to LocalPeers * Handle deviceInfo internally, id -> deviceId * Tests for stream error handling * remove unnecessary constructor * return replication stream * Attach protomux instance to peer info * rename and re-organize * revert changes outside scope of PR * WIP initial work * Tie everything together * rename getProjectInstance * feat: client.listLocalPeers() & `local-peers` evt * feat: add $sync API methods For now this simplifies the API (because we are only supporting local sync, not remote sync over the internet) to: - `project.$sync.getState()` - `project.$sync.start()` - `project.$sync.stop()` - Events - `sync-state` It's currently not possible to stop local discovery, nor is it possible to stop sync of the metadata namespaces (auth, config, blobIndex). The start and stop methods stop the sync of the data and blob namespaces. Fixes #134. Stacked on #360, #358 and #356. --- src/mapeo-manager.js | 12 ++++-- src/mapeo-project.js | 35 +++++++++++----- src/sync/sync-api.js | 82 +++++++++++++++++++++++++++++++++++++ src/sync/sync-controller.js | 44 -------------------- 4 files changed, 115 insertions(+), 58 deletions(-) create mode 100644 src/sync/sync-api.js delete mode 100644 src/sync/sync-controller.js diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index 99922662..2ac56b05 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -40,6 +40,7 @@ const CLIENT_SQLITE_FILE_NAME = 'client.db' const MAX_FILE_DESCRIPTORS = 768 export const kRPC = Symbol('rpc') +export const kManagerReplicate = Symbol('replicate manager') /** * @typedef {Omit} PublicPeerInfo @@ -129,7 +130,7 @@ export class MapeoManager extends TypedEmitter { this.#localDiscovery = new LocalDiscovery({ identityKeypair: this.#keyManager.getIdentityKeypair(), }) - this.#localDiscovery.on('connection', this.replicate.bind(this)) + this.#localDiscovery.on('connection', this[kManagerReplicate].bind(this)) } /** @@ -140,12 +141,15 @@ export class MapeoManager extends TypedEmitter { } /** - * Replicate Mapeo to a `@hyperswarm/secret-stream`. Should only be used for - * local (trusted) connections, because the RPC channel key is public + * Replicate Mapeo to a `@hyperswarm/secret-stream`. This replication connects + * the Mapeo RPC channel and allows invites. All active projects will sync + * automatically to this replication stream. Only use for local (trusted) + * connections, because the RPC channel key is public. To sync a specific + * project without connecting RPC, use project[kProjectReplication]. * * @param {import('@hyperswarm/secret-stream')} noiseStream */ - replicate(noiseStream) { + [kManagerReplicate](noiseStream) { const replicationStream = this.#localPeers.connect(noiseStream) Promise.all([this.getDeviceInfo(), openedNoiseSecretStream(noiseStream)]) .then(([{ name }, openedNoiseStream]) => { diff --git a/src/mapeo-project.js b/src/mapeo-project.js index cf980517..66992296 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -32,7 +32,7 @@ import { import { Capabilities } from './capabilities.js' import { getDeviceId, projectKeyToId, valueOf } from './utils.js' import { MemberApi } from './member-api.js' -import { SyncController } from './sync/sync-controller.js' +import { SyncApi, kSyncReplicate } from './sync/sync-api.js' import Hypercore from 'hypercore' /** @typedef {Omit} EditableProjectSettings */ @@ -42,7 +42,7 @@ const INDEXER_STORAGE_FOLDER_NAME = 'indexer' export const kCoreOwnership = Symbol('coreOwnership') export const kCapabilities = Symbol('capabilities') export const kSetOwnDeviceInfo = Symbol('kSetOwnDeviceInfo') -export const kReplicate = Symbol('replicate') +export const kProjectReplicate = Symbol('replicate project') export class MapeoProject { #projectId @@ -56,7 +56,7 @@ export class MapeoProject { #capabilities #ownershipWriteDone #memberApi - #syncController + #syncApi /** * @param {Object} opts @@ -245,15 +245,17 @@ export class MapeoProject { }, }) - this.#syncController = new SyncController({ + this.#syncApi = new SyncApi({ coreManager: this.#coreManager, capabilities: this.#capabilities, }) + ///////// 4. Wire up sync + // Replicate already connected local peers for (const peer of localPeers.peers) { if (peer.status !== 'connected') continue - this.#syncController.replicate(peer.protomux) + this.#syncApi[kSyncReplicate](peer.protomux) } localPeers.on('discovery-key', (discoveryKey, stream) => { @@ -267,10 +269,10 @@ export class MapeoProject { // When a new peer is found, try to replicate (if it is not a member of the // project it will fail the capability check and be ignored) localPeers.on('peer-add', (peer) => { - this.#syncController.replicate(peer.protomux) + this.#syncApi[kSyncReplicate](peer.protomux) }) - ///////// 4. Write core ownership record + ///////// 5. Write core ownership record const deferred = pDefer() // Avoid uncaught rejection. If this is rejected then project.ready() will reject @@ -365,6 +367,10 @@ export class MapeoProject { return this.#memberApi } + get $sync() { + return this.#syncApi + } + /** * @param {Partial} settings * @returns {Promise} @@ -416,15 +422,24 @@ export class MapeoProject { } /** + * Replicate a project to a @hyperswarm/secret-stream. Invites will not + * function because the RPC channel is not connected for project replication, + * and only this project will replicate (to replicate multiple projects you + * need to replicate the manager instance via manager[kManagerReplicate]) * * @param {Exclude[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance * @returns */ - [kReplicate](stream) { - const replicationStream = Hypercore.createProtocolStream(stream, {}) + [kProjectReplicate](stream) { + const replicationStream = Hypercore.createProtocolStream(stream, { + ondiscoverykey: async (discoveryKey) => { + this.#coreManager.handleDiscoveryKey(discoveryKey, replicationStream) + }, + }) const protomux = replicationStream.noiseStream.userData // @ts-ignore - got fed up jumping through hoops to keep TS heppy - return this.#syncController.replicate(protomux) + this.#syncApi[kSyncReplicate](protomux) + return replicationStream } /** diff --git a/src/sync/sync-api.js b/src/sync/sync-api.js new file mode 100644 index 00000000..9508adc1 --- /dev/null +++ b/src/sync/sync-api.js @@ -0,0 +1,82 @@ +import { TypedEmitter } from 'tiny-typed-emitter' +import { SyncState } from './sync-state.js' +import { PeerSyncController } from './peer-sync-controller.js' + +export const kSyncReplicate = Symbol('replicate sync') + +/** + * @typedef {object} SyncEvents + * @property {(syncState: import('./sync-state.js').State) => void} sync-state + */ + +/** + * @extends {TypedEmitter} + */ +export class SyncApi extends TypedEmitter { + syncState + #coreManager + #capabilities + /** @type {Map} */ + #peerSyncControllers = new Map() + /** @type {Set<'local' | 'remote'>} */ + #dataSyncEnabled = new Set() + + /** + * + * @param {object} opts + * @param {import('../core-manager/index.js').CoreManager} opts.coreManager + * @param {import("../capabilities.js").Capabilities} opts.capabilities + * @param {number} [opts.throttleMs] + */ + constructor({ coreManager, throttleMs = 200, capabilities }) { + super() + this.#coreManager = coreManager + this.#capabilities = capabilities + this.syncState = new SyncState({ coreManager, throttleMs }) + this.syncState.on('state', this.emit.bind(this, 'sync-state')) + } + + getState() { + return this.syncState.getState() + } + + /** + * Start syncing data cores + */ + start() { + if (this.#dataSyncEnabled.has('local')) return + this.#dataSyncEnabled.add('local') + for (const peerSyncController of this.#peerSyncControllers.values()) { + peerSyncController.enableDataSync() + } + } + + /** + * Stop syncing data cores (metadata cores will continue syncing in the background) + */ + stop() { + if (!this.#dataSyncEnabled.has('local')) return + this.#dataSyncEnabled.delete('local') + for (const peerSyncController of this.#peerSyncControllers.values()) { + peerSyncController.disableDataSync() + } + } + + /** + * @param {import('protomux')} protomux A protomux instance + */ + [kSyncReplicate](protomux) { + if (this.#peerSyncControllers.has(protomux)) return + + const peerSyncController = new PeerSyncController({ + protomux, + coreManager: this.#coreManager, + syncState: this.syncState, + capabilities: this.#capabilities, + }) + if (this.#dataSyncEnabled.has('local')) { + peerSyncController.enableDataSync() + } + this.#peerSyncControllers.set(protomux, peerSyncController) + } +} diff --git a/src/sync/sync-controller.js b/src/sync/sync-controller.js deleted file mode 100644 index a32b6cb9..00000000 --- a/src/sync/sync-controller.js +++ /dev/null @@ -1,44 +0,0 @@ -import { TypedEmitter } from 'tiny-typed-emitter' -import { SyncState } from './sync-state.js' -import { PeerSyncController } from './peer-sync-controller.js' - -export class SyncController extends TypedEmitter { - #syncState - #coreManager - #capabilities - /** @type {Map} */ - #peerSyncControllers = new Map() - - /** - * - * @param {object} opts - * @param {import('../core-manager/index.js').CoreManager} opts.coreManager - * @param {import("../capabilities.js").Capabilities} opts.capabilities - * @param {number} [opts.throttleMs] - */ - constructor({ coreManager, throttleMs = 200, capabilities }) { - super() - this.#coreManager = coreManager - this.#capabilities = capabilities - this.#syncState = new SyncState({ coreManager, throttleMs }) - } - - getState() { - return this.#syncState.getState() - } - - /** - * @param {import('protomux')} protomux A protomux instance - */ - replicate(protomux) { - if (this.#peerSyncControllers.has(protomux)) return - - const peerSyncController = new PeerSyncController({ - protomux, - coreManager: this.#coreManager, - syncState: this.#syncState, - capabilities: this.#capabilities, - }) - this.#peerSyncControllers.set(protomux, peerSyncController) - } -}