Skip to content

Commit

Permalink
chore: don't send "haves" to unapproved peers
Browse files Browse the repository at this point in the history
This avoids leaking "have" data to unapproved peers. Most likely, this
is peers with the project key that aren't in the project.

Partly addresses [#268].

[#268]: #268
  • Loading branch information
EvanHahn committed May 29, 2024
1 parent 45c8510 commit bf7e936
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 5 deletions.
39 changes: 34 additions & 5 deletions src/core-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import Hypercore from 'hypercore'
import { HaveExtension, ProjectExtension } from '../generated/extensions.js'
import { Logger } from '../logger.js'
import { NAMESPACES } from '../constants.js'
import { keyToId, noop } from '../utils.js'
import { ExhaustivenessError, keyToId, noop } from '../utils.js'
import { coresTable } from '../schema/project.js'
import * as rle from './bitfield-rle.js'
import { CoreIndex } from './core-index.js'
Expand All @@ -27,6 +27,10 @@ export const kCoreManagerReplicate = Symbol('replicate core manager')
* @property {(coreRecord: CoreRecord) => void} add-core
* @property {(namespace: Namespace, msg: { coreDiscoveryId: string, peerId: string, start: number, bitfield: Uint32Array }) => void} peer-have
*/
/**
* TODO: Use a more precise type.
* @typedef {any} Peer
*/

/**
* @extends {TypedEmitter<Events>}
Expand All @@ -38,6 +42,7 @@ export class CoreManager extends TypedEmitter {
#creatorCore
#projectKey
#queries
#getSyncCapabilities
#encryptionKeys
#projectExtension
/** @type {'opened' | 'closing' | 'closed'} */
Expand Down Expand Up @@ -67,6 +72,7 @@ export class CoreManager extends TypedEmitter {
* @param {Partial<Record<Namespace, Buffer>>} [options.encryptionKeys] Encryption keys for each namespace
* @param {import('hypercore').HypercoreStorage} options.storage Folder to store all hypercore data
* @param {boolean} [options.autoDownload=true] Immediately start downloading cores - should only be set to false for tests
* @param {(peer: Readonly<Peer>) => Promise<Record<Namespace, 'allowed' | 'blocked'>>} options.getSyncCapabilities
* @param {Logger} [options.logger]
*/
constructor({
Expand All @@ -77,6 +83,7 @@ export class CoreManager extends TypedEmitter {
encryptionKeys = {},
storage,
autoDownload = true,
getSyncCapabilities,
logger,
}) {
super()
Expand All @@ -96,6 +103,7 @@ export class CoreManager extends TypedEmitter {
this.#projectKey = projectKey
this.#encryptionKeys = encryptionKeys
this.#autoDownload = autoDownload
this.#getSyncCapabilities = getSyncCapabilities

// Pre-prepare SQL statement for better performance
this.#queries = {
Expand Down Expand Up @@ -401,7 +409,7 @@ export class CoreManager extends TypedEmitter {

/**
* @param {ProjectExtension} msg
* @param {any} peer
* @param {Peer} peer
*/
#handleProjectMessage({ wantCoreKeys, ...coreKeys }, peer) {
const message = ProjectExtension.create()
Expand All @@ -426,7 +434,7 @@ export class CoreManager extends TypedEmitter {

/**
* @param {Omit<HaveMsg, 'namespace'> & { namespace: Namespace | 'UNRECOGNIZED' }} msg
* @param {any} peer
* @param {Peer} peer
*/
#handleHaveMessage(msg, peer) {
const { start, discoveryKey, bitfield, namespace } = msg
Expand All @@ -444,7 +452,7 @@ export class CoreManager extends TypedEmitter {

/**
*
* @param {any} peer
* @param {Peer} peer
* @param {Iterable<{ core: Hypercore<Hypercore.ValueEncoding, Buffer>, namespace: Namespace }>} cores
*/
async #sendHaves(peer, cores) {
Expand All @@ -454,9 +462,13 @@ export class CoreManager extends TypedEmitter {
return
}

const syncCapabilities = await this.#getSyncCapabilities(peer)

peer.protomux.cork()

for (const { core, namespace } of cores) {
if (!canSync(syncCapabilities, namespace)) continue

// We want ready() rather than update() because we are only interested in local data
await core.ready()
const { discoveryKey } = core
Expand Down Expand Up @@ -640,7 +652,7 @@ function findPeer(core, publicKey, { timeout = 200 } = {}) {

core.on('peer-add', onPeer)

/** @param {any} peer */
/** @param {Peer} peer */
function onPeer(peer) {
if (peer.remotePublicKey.equals(publicKey)) {
clearTimeout(timeoutId)
Expand All @@ -650,3 +662,20 @@ function findPeer(core, publicKey, { timeout = 200 } = {}) {
}
})
}

/**
* @param {Record<Namespace, 'allowed' | 'blocked'>} syncCapabilities
* @param {Namespace} namespace
* @returns {boolean}
*/
function canSync(syncCapabilities, namespace) {
const syncCapability = syncCapabilities[namespace]
switch (syncCapability) {
case 'allowed':
return true
case 'blocked':
return false
default:
throw new ExhaustivenessError(syncCapability)
}
}
8 changes: 8 additions & 0 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
import {
assert,
getDeviceId,
keyToId,
projectKeyToId,
projectKeyToPublicId,
valueOf,
Expand All @@ -51,6 +52,7 @@ import { readConfig } from './config-import.js'
import TranslationApi from './translation-api.js'

/** @typedef {Omit<import('@mapeo/schema').ProjectSettingsValue, 'schemaName'>} EditableProjectSettings */
/** @typedef {(typeof import('./constants.js').NAMESPACES)[number]} Namespace */

const CORESTORE_STORAGE_FOLDER_NAME = 'corestore'
const INDEXER_STORAGE_FOLDER_NAME = 'indexer'
Expand All @@ -76,6 +78,7 @@ export class MapeoProject extends TypedEmitter {
#dataTypes
#blobStore
#coreOwnership
/** @type {Roles} */
#roles
/** @ts-ignore */
#ownershipWriteDone
Expand Down Expand Up @@ -150,6 +153,11 @@ export class MapeoProject extends TypedEmitter {
storage: coreManagerStorage,
db,
logger: this.#l,
getSyncCapabilities: async ({ remotePublicKey }) => {
const peerDeviceId = keyToId(remotePublicKey)
const role = await this.#roles.getRole(peerDeviceId)
return role.sync
},
})

this.#indexWriter = new IndexWriter({
Expand Down
56 changes: 56 additions & 0 deletions tests/core-manager.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import test from 'node:test'
import { access, constants } from 'node:fs/promises'
import { pEvent } from 'p-event'
import NoiseSecretStream from '@hyperswarm/secret-stream'
import Hypercore from 'hypercore'
import RAM from 'random-access-memory'
Expand Down Expand Up @@ -390,6 +391,61 @@ test('sends "haves" bitfields over project creator core replication stream', asy
await Promise.all([once(n1, 'close'), once(n2, 'close')])
})

test('only sends "haves" bitfields when recipient is allowed to receive them', async (t) => {
const projectKey = randomBytes(32)
const cm1 = createCoreManager({
projectKey,
getSyncCapabilities: () =>
Promise.resolve({
auth: 'allowed',
config: 'allowed',
data: 'blocked',
blobIndex: 'allowed',
blob: 'blocked',
}),
})
const cm2 = createCoreManager({ projectKey })

const cm2ReceivedHavesForNamespaces = new Set()
const cm2ReceivedHavesPromise = pEvent(cm2, 'peer-have', (namespace) => {
cm2ReceivedHavesForNamespaces.add(namespace)
return (
cm2ReceivedHavesForNamespaces.has('auth') &&
cm2ReceivedHavesForNamespaces.has('config')
)
})

const namespaces = /** @type {const} */ (['auth', 'blob', 'config', 'data'])
await Promise.all(
namespaces.map(async (namespace) => {
const cm1Core = cm1.getWriterCore(namespace).core
await cm1Core.ready()
await cm1Core.append(['block'])
})
)

const n1 = new NoiseSecretStream(true)
const n2 = new NoiseSecretStream(false)
t.after(async () => {
const closedPromise = Promise.all([once(n1, 'close'), once(n2, 'close')])
n1.destroy()
n2.destroy()
await closedPromise
})

n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream)
cm1[kCoreManagerReplicate](n1)
cm2[kCoreManagerReplicate](n2)

await cm2ReceivedHavesPromise

assert.deepEqual(
cm2ReceivedHavesForNamespaces,
new Set(['auth', 'config']),
'only allowed namespace "haves" are sent'
)
})

test('unreplicate', async (t) => {
const WAIT_TIMEOUT = 200
const REPLICATION_DELAY = 20
Expand Down
8 changes: 8 additions & 0 deletions tests/helpers/core-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ export function createCoreManager({
storage: () => new RAM(),
projectKey,
autoDownload: false,
getSyncCapabilities: () =>
Promise.resolve({
auth: 'allowed',
config: 'allowed',
data: 'allowed',
blobIndex: 'allowed',
blob: 'allowed',
}),
...opts,
})
}
Expand Down

0 comments on commit bf7e936

Please sign in to comment.