From 461224d8b0e81af324c7c0c0d652fae1668d6301 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Tue, 19 Oct 2021 23:30:37 +0200 Subject: [PATCH] fix(pubsub): new wire format in http rpc Implements changes from https://github.com/ipfs/go-ipfs/pull/8183 --- .../src/lib/http-rpc-wire-format.js | 64 +++++++++++++++++++ packages/ipfs-http-client/src/pubsub/ls.js | 4 +- .../ipfs-http-client/src/pubsub/publish.js | 4 +- .../ipfs-http-client/src/pubsub/subscribe.js | 16 ++--- .../ipfs-http-client/test/utils/factory.js | 2 +- .../src/api/resources/pubsub.js | 40 ++++++++---- 6 files changed, 102 insertions(+), 28 deletions(-) create mode 100644 packages/ipfs-http-client/src/lib/http-rpc-wire-format.js diff --git a/packages/ipfs-http-client/src/lib/http-rpc-wire-format.js b/packages/ipfs-http-client/src/lib/http-rpc-wire-format.js new file mode 100644 index 0000000000..82b541dc35 --- /dev/null +++ b/packages/ipfs-http-client/src/lib/http-rpc-wire-format.js @@ -0,0 +1,64 @@ +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import { base64url } from 'multiformats/bases/base64' +import PeerId from 'peer-id' + +/* HTTP RPC: + * - sends PeerIds as Base58btc (legacy) or CIDv1 + * - wraps binary data in multibase. base64url is used to avoid issues + * when a binary data is passed as search param in URL. + * Historical context: https://github.com/ipfs/go-ipfs/issues/7939 + * Multibase wrapping introduced in: https://github.com/ipfs/go-ipfs/pull/8183 + */ + +/** + * @param {Array} strings + * @returns {Array} strings + */ +const mbToTextArray = strings => { + if (Array.isArray(strings)) { + return strings.map(mbToText) + } + return strings +} + +/** + * @param {string} mb + * @returns {string} + */ +const mbToText = mb => uint8ArrayToString(mbToBytes(mb)) + +/** + * @param {string} mb + * @returns {Uint8Array} + */ +const mbToBytes = mb => base64url.decode(mb) + +/** + * @param {string} text + * @returns {string} + */ +const toUrlSafeBase = text => base64url.encode(uint8ArrayFromString(text)) + +/** + * Ensure uniform Peer ID representation in text + * + * @param {Array} peerids + * @returns {Array} peerids + */ +const normalizePeerIds = peerids => { + if (Array.isArray(peerids)) { + return peerids.map(normalizePeerId) + } + return peerids +} + +/** + * Ensure uniform Peer ID representation in text + * + * @param {string} peerid + * @returns {string} + */ +const normalizePeerId = peerid => PeerId.parse(peerid).toB58String() // TODO: toString() when go-ipfs switch to CIDv1 + +export { mbToTextArray, mbToText, mbToBytes, toUrlSafeBase, normalizePeerIds, normalizePeerId } diff --git a/packages/ipfs-http-client/src/pubsub/ls.js b/packages/ipfs-http-client/src/pubsub/ls.js index 49853273ab..caa34e697e 100644 --- a/packages/ipfs-http-client/src/pubsub/ls.js +++ b/packages/ipfs-http-client/src/pubsub/ls.js @@ -1,5 +1,6 @@ import { configure } from '../lib/configure.js' import { toUrlSearchParams } from '../lib/to-url-search-params.js' +import { mbToTextArray } from '../lib/http-rpc-wire-format.js' /** * @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions @@ -17,8 +18,7 @@ export const createLs = configure(api => { headers: options.headers })).json() - // TODO: unwrap topic names from multibase - return Strings || [] + return mbToTextArray(Strings) || [] } return ls }) diff --git a/packages/ipfs-http-client/src/pubsub/publish.js b/packages/ipfs-http-client/src/pubsub/publish.js index ebf6b565e4..d5258fb282 100644 --- a/packages/ipfs-http-client/src/pubsub/publish.js +++ b/packages/ipfs-http-client/src/pubsub/publish.js @@ -2,6 +2,7 @@ import { configure } from '../lib/configure.js' import { toUrlSearchParams } from '../lib/to-url-search-params.js' import { multipartRequest } from 'ipfs-core-utils/multipart-request' import { abortSignal } from '../lib/abort-signal.js' +import { toUrlSafeBase } from '../lib/http-rpc-wire-format.js' import { AbortController } from 'native-abort-controller' /** @@ -14,9 +15,8 @@ export const createPublish = configure(api => { * @type {PubsubAPI["publish"]} */ async function publish (topic, data, options = {}) { - // TODO: wrap topic in multibase const searchParams = toUrlSearchParams({ - arg: topic, + arg: toUrlSafeBase(topic), ...options }) diff --git a/packages/ipfs-http-client/src/pubsub/subscribe.js b/packages/ipfs-http-client/src/pubsub/subscribe.js index 621975a8a0..b8dcb74d1b 100644 --- a/packages/ipfs-http-client/src/pubsub/subscribe.js +++ b/packages/ipfs-http-client/src/pubsub/subscribe.js @@ -1,8 +1,7 @@ -import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import debug from 'debug' import { configure } from '../lib/configure.js' import { toUrlSearchParams } from '../lib/to-url-search-params.js' +import { toUrlSafeBase, mbToTextArray, mbToBytes } from '../lib/http-rpc-wire-format.js' const log = debug('ipfs-http-client:pubsub:subscribe') /** @@ -39,12 +38,11 @@ export const createSubscribe = (options, subsTracker) => { // is received. If this doesn't happen within 1 second assume success const ffWorkaround = setTimeout(() => done(), 1000) - // TODO: wrap topic in multibase // Do this async to not block Firefox api.post('pubsub/sub', { signal: options.signal, searchParams: toUrlSearchParams({ - arg: topic, + arg: toUrlSafeBase(topic), ...options }), headers: options.headers @@ -95,13 +93,11 @@ async function readMessages (response, { onMessage, onEnd, onError }) { continue } - // TODO: multibase data, seqno and topics - // TODO: parse string and get peerid bytes using libp2p lib onMessage({ - from: uint8ArrayToString(uint8ArrayFromString(msg.from, 'base64pad'), 'base58btc'), - data: uint8ArrayFromString(msg.data, 'base64pad'), - seqno: uint8ArrayFromString(msg.seqno, 'base64pad'), - topicIDs: msg.topicIDs + from: msg.from, + data: mbToBytes(msg.data), + seqno: mbToBytes(msg.seqno), + topicIDs: mbToTextArray(msg.topicIDs) }) } catch (/** @type {any} */ err) { err.message = `Failed to parse pubsub message: ${err.message}` diff --git a/packages/ipfs-http-client/test/utils/factory.js b/packages/ipfs-http-client/test/utils/factory.js index 012137867c..401b791e1f 100644 --- a/packages/ipfs-http-client/test/utils/factory.js +++ b/packages/ipfs-http-client/test/utils/factory.js @@ -16,7 +16,7 @@ const commonOptions = { const commonOverrides = { go: { - ipfsBin: isNode ? path() : undefined + ipfsBin: isNode ? (process.env.IPFS_GO_EXEC || path()) : undefined } } diff --git a/packages/ipfs-http-server/src/api/resources/pubsub.js b/packages/ipfs-http-server/src/api/resources/pubsub.js index 18a140494a..e3f450f5f1 100644 --- a/packages/ipfs-http-server/src/api/resources/pubsub.js +++ b/packages/ipfs-http-server/src/api/resources/pubsub.js @@ -6,6 +6,22 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { streamResponse } from '../../utils/stream-response.js' import pushable from 'it-pushable' +import { base64url } from 'multiformats/bases/base64' + +const preDecodeTopicFromHttpRpc = { + assign: 'topic', + /** + * @param {import('../../types').Request} request + * @param {import('@hapi/hapi').ResponseToolkit} _h + */ + method: async (request, _h) => { + try { + return uint8ArrayToString(base64url.decode(request.query.topic)) + } catch (/** @type {any} */ err) { + throw Boom.boomify(err, { message: `Failed to decode topic from HTTP RPC form ${request.query.topic}` }) + } + } +} export const subscribeResource = { options: { @@ -24,7 +40,8 @@ export const subscribeResource = { override: true, ignoreUndefined: true }) - } + }, + pre: [preDecodeTopicFromHttpRpc] }, /** * @param {import('../../types').Request} request @@ -56,13 +73,11 @@ export const subscribeResource = { * @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} */ const handler = (msg) => { - // TODO: data, seqno and topicIDs in multibase - // TODO: from should use canonical toString from peerid libp2p lib output.push({ - from: uint8ArrayToString(uint8ArrayFromString(msg.from, 'base58btc'), 'base64pad'), - data: uint8ArrayToString(msg.data, 'base64pad'), - seqno: uint8ArrayToString(msg.seqno, 'base64pad'), - topicIDs: msg.topicIDs + from: msg.from, // TODO: switch to PeerId.parse(msg.from).toString() when go-ipfs defaults to CIDv1 + data: base64url.encode(msg.data), + seqno: base64url.encode(msg.seqno), + topicIDs: msg.topicIDs.map(t => base64url.encode(uint8ArrayFromString(t))) }) } @@ -92,7 +107,7 @@ export const publishResource = { parse: false, output: 'stream' }, - pre: [{ + pre: [preDecodeTopicFromHttpRpc, { assign: 'data', /** * @param {import('../../types').Request} request @@ -158,7 +173,7 @@ export const publishResource = { } = request try { - // TODO: unwrap topic from multibase? + // TODO: move to reusable pre handler? await ipfs.pubsub.publish(topic, data, { signal, timeout @@ -212,8 +227,7 @@ export const lsResource = { throw Boom.boomify(err, { message: 'Failed to list subscriptions' }) } - // TODO: multibase topic names in Strings array - return h.response({ Strings: subscriptions }) + return h.response({ Strings: subscriptions.map(s => base64url.encode(uint8ArrayFromString(s))) }) } } @@ -232,7 +246,8 @@ export const peersResource = { override: true, ignoreUndefined: true }) - } + }, + pre: [preDecodeTopicFromHttpRpc] }, /** * @param {import('../../types').Request} request @@ -256,7 +271,6 @@ export const peersResource = { let peers try { - // TODO: unwrap topic from multibase peers = await ipfs.pubsub.peers(topic, { signal, timeout