Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

fix(pubsub): multibase in pubsub http rpc #3922

Merged
merged 15 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions packages/ipfs-cli/test/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ describe('pubsub', () => {
timeout: undefined
}

it('should list toic peers', async () => {
it('should list topic peers', async () => {
const subName = 'sub-name'
const peer = 'peer-id'

Expand All @@ -79,7 +79,7 @@ describe('pubsub', () => {
expect(out).to.equal(`${peer}\n`)
})

it('should list toic peers with a timeout', async () => {
it('should list topic peers with a timeout', async () => {
const subName = 'sub-name'
const peer = 'peer-id'

Expand All @@ -101,19 +101,19 @@ describe('pubsub', () => {
}

it('should publish message', async () => {
const subName = 'sub-name'
const data = 'data'
const subName = 'sub-name-1'
const data = 'data\r\nfirst\nZażółć gęślą jaźń😇'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yellow goose self?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds better in Polish ;) (it is a valid sentence that includes all polish diacritics, good for testing useful utf8)


await cli(`pubsub pub ${subName} ${data}`, { ipfs })
await cli(`pubsub pub ${subName} "${data}"`, { ipfs })

expect(ipfs.pubsub.publish.calledWith(subName, uint8ArrayFromString(data), defaultOptions)).to.be.true()
})

it('should publish message with timeout', async () => {
const subName = 'sub-name'
const data = 'data'
const subName = 'sub-name-2'
const data = 'data\r\nsecond\nZażółć gęślą jaźń😇'

await cli(`pubsub pub ${subName} ${data} --timeout=1s`, { ipfs })
await cli(`pubsub pub ${subName} "${data}" --timeout=1s`, { ipfs })

expect(ipfs.pubsub.publish.calledWith(subName, uint8ArrayFromString(data), {
...defaultOptions,
Expand All @@ -128,17 +128,17 @@ describe('pubsub', () => {
}

it('should subscribe', async () => {
const subName = 'sub-name'
const subName = 'sub\nname'

await cli(`pubsub sub ${subName}`, { ipfs })
await cli(`pubsub sub "${subName}"`, { ipfs })

expect(ipfs.pubsub.subscribe.calledWith(subName, sinon.match.func, defaultOptions)).to.be.true()
})

it('should subscribe with a timeout', async () => {
const subName = 'sub-name'

await cli(`pubsub sub ${subName} --timeout=1s`, { ipfs })
await cli(`pubsub sub "${subName}" --timeout=1s`, { ipfs })

expect(ipfs.pubsub.subscribe.calledWith(subName, sinon.match.func, {
...defaultOptions,
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@
"@types/rimraf": "^3.0.1",
"aegir": "^36.0.1",
"delay": "^5.0.0",
"go-ipfs": "0.9.1",
"go-ipfs": "0.11.0-rc1",
"interface-blockstore-tests": "^2.0.1",
"interface-ipfs-core": "^0.152.2",
"ipfsd-ctl": "^10.0.4",
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-http-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
"devDependencies": {
"aegir": "^36.0.1",
"delay": "^5.0.0",
"go-ipfs": "0.9.1",
"go-ipfs": "0.11.0-rc1",
"ipfsd-ctl": "^10.0.4",
"it-all": "^1.0.4",
"it-first": "^1.0.4",
Expand Down
41 changes: 41 additions & 0 deletions packages/ipfs-http-client/src/lib/http-rpc-wire-format.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { base64url } from 'multiformats/bases/base64'

/* HTTP RPC:
* - wraps binary data in multibase. base64url is used to avoid issues
* when a binary data is passed as search param in URL.
* Historical context: https://github.com/ipfs/go-ipfs/issues/7939
* Multibase wrapping introduced in: https://github.com/ipfs/go-ipfs/pull/8183
*/

/**
* @param {Array<string>} strings
* @returns {Array<string>} strings
*/
const rpcArrayToTextArray = strings => {
if (Array.isArray(strings)) {
return strings.map(rpcToText)
}
return strings
}

/**
* @param {string} mb
* @returns {string}
*/
const rpcToText = mb => uint8ArrayToString(rpcToBytes(mb))

/**
* @param {string} mb
* @returns {Uint8Array}
*/
const rpcToBytes = mb => base64url.decode(mb)

/**
* @param {string} text
* @returns {string}
*/
const textToUrlSafeRpc = text => base64url.encode(uint8ArrayFromString(text))

export { rpcArrayToTextArray, rpcToText, rpcToBytes, textToUrlSafeRpc }
3 changes: 2 additions & 1 deletion packages/ipfs-http-client/src/pubsub/ls.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { rpcArrayToTextArray } from '../lib/http-rpc-wire-format.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -17,7 +18,7 @@ export const createLs = configure(api => {
headers: options.headers
})).json()

return Strings || []
return rpcArrayToTextArray(Strings) || []
}
return ls
})
3 changes: 2 additions & 1 deletion packages/ipfs-http-client/src/pubsub/peers.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { textToUrlSafeRpc } from '../lib/http-rpc-wire-format.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -14,7 +15,7 @@ export const createPeers = configure(api => {
const res = await api.post('pubsub/peers', {
signal: options.signal,
searchParams: toUrlSearchParams({
arg: topic,
arg: textToUrlSafeRpc(topic),
...options
}),
headers: options.headers
Expand Down
3 changes: 2 additions & 1 deletion packages/ipfs-http-client/src/pubsub/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { multipartRequest } from 'ipfs-core-utils/multipart-request'
import { abortSignal } from '../lib/abort-signal.js'
import { textToUrlSafeRpc } from '../lib/http-rpc-wire-format.js'
import { AbortController } from 'native-abort-controller'

/**
Expand All @@ -15,7 +16,7 @@ export const createPublish = configure(api => {
*/
async function publish (topic, data, options = {}) {
const searchParams = toUrlSearchParams({
arg: topic,
arg: textToUrlSafeRpc(topic),
...options
})

Expand Down
13 changes: 6 additions & 7 deletions packages/ipfs-http-client/src/pubsub/subscribe.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import debug from 'debug'
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { textToUrlSafeRpc, rpcArrayToTextArray, rpcToBytes } from '../lib/http-rpc-wire-format.js'
const log = debug('ipfs-http-client:pubsub:subscribe')

/**
Expand Down Expand Up @@ -43,7 +42,7 @@ export const createSubscribe = (options, subsTracker) => {
api.post('pubsub/sub', {
signal: options.signal,
searchParams: toUrlSearchParams({
arg: topic,
arg: textToUrlSafeRpc(topic),
...options
}),
headers: options.headers
Expand Down Expand Up @@ -95,10 +94,10 @@ async function readMessages (response, { onMessage, onEnd, onError }) {
}

onMessage({
from: uint8ArrayToString(uint8ArrayFromString(msg.from, 'base64pad'), 'base58btc'),
data: uint8ArrayFromString(msg.data, 'base64pad'),
seqno: uint8ArrayFromString(msg.seqno, 'base64pad'),
topicIDs: msg.topicIDs
from: msg.from,
data: rpcToBytes(msg.data),
seqno: rpcToBytes(msg.seqno),
topicIDs: rpcArrayToTextArray(msg.topicIDs)
})
} catch (/** @type {any} */ err) {
err.message = `Failed to parse pubsub message: ${err.message}`
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-http-client/test/utils/factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const commonOptions = {

const commonOverrides = {
go: {
ipfsBin: isNode ? path() : undefined
ipfsBin: isNode ? (process.env.IPFS_GO_EXEC || path()) : undefined
}
}

Expand Down
44 changes: 32 additions & 12 deletions packages/ipfs-http-server/src/api/resources/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,22 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { streamResponse } from '../../utils/stream-response.js'
import pushable from 'it-pushable'
import { base64url } from 'multiformats/bases/base64'

const preDecodeTopicFromHttpRpc = {
assign: 'topic',
/**
* @param {import('../../types').Request} request
* @param {import('@hapi/hapi').ResponseToolkit} _h
*/
method: async (request, _h) => {
try {
return uint8ArrayToString(base64url.decode(request.query.topic))
} catch (/** @type {any} */ err) {
throw Boom.boomify(err, { message: `Failed to decode topic from HTTP RPC form ${request.query.topic}` })
}
}
}

export const subscribeResource = {
options: {
Expand All @@ -24,7 +40,8 @@ export const subscribeResource = {
override: true,
ignoreUndefined: true
})
}
},
pre: [preDecodeTopicFromHttpRpc]
},
/**
* @param {import('../../types').Request} request
Expand All @@ -40,8 +57,8 @@ export const subscribeResource = {
ipfs
}
},
query: {
topic
pre: {
topic // decoded version created by preDecodeTopicFromHttpRpc
}
} = request

Expand All @@ -57,10 +74,10 @@ export const subscribeResource = {
*/
const handler = (msg) => {
output.push({
from: uint8ArrayToString(uint8ArrayFromString(msg.from, 'base58btc'), 'base64pad'),
data: uint8ArrayToString(msg.data, 'base64pad'),
seqno: uint8ArrayToString(msg.seqno, 'base64pad'),
topicIDs: msg.topicIDs
from: msg.from, // TODO: switch to PeerId.parse(msg.from).toString() when go-ipfs defaults to CIDv1
data: base64url.encode(msg.data),
seqno: base64url.encode(msg.seqno),
topicIDs: msg.topicIDs.map(t => base64url.encode(uint8ArrayFromString(t)))
})
}

Expand Down Expand Up @@ -90,7 +107,7 @@ export const publishResource = {
parse: false,
output: 'stream'
},
pre: [{
pre: [preDecodeTopicFromHttpRpc, {
assign: 'data',
/**
* @param {import('../../types').Request} request
Expand Down Expand Up @@ -147,10 +164,10 @@ export const publishResource = {
}
},
pre: {
topic,
data
},
query: {
topic,
timeout
}
} = request
Expand Down Expand Up @@ -209,7 +226,7 @@ export const lsResource = {
throw Boom.boomify(err, { message: 'Failed to list subscriptions' })
}

return h.response({ Strings: subscriptions })
return h.response({ Strings: subscriptions.map(s => base64url.encode(uint8ArrayFromString(s))) })
}
}

Expand All @@ -228,7 +245,8 @@ export const peersResource = {
override: true,
ignoreUndefined: true
})
}
},
pre: [preDecodeTopicFromHttpRpc]
},
/**
* @param {import('../../types').Request} request
Expand All @@ -244,8 +262,10 @@ export const peersResource = {
ipfs
}
},
pre: {
topic
},
query: {
topic,
timeout
}
} = request
Expand Down
6 changes: 3 additions & 3 deletions packages/ipfs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
"test:interface:client": "aegir test -f test/interface-client.js",
"test:interface:http-js": "aegir test -f test/interface-http-js.js",
"test:interface:http-go": "aegir test -f test/interface-http-go.js",
"test:interop": "cross-env IPFS_JS_EXEC=$PWD/src/cli.js IPFS_JS_MODULE=$PWD/dist IPFS_JS_HTTP_MODULE=$PWD/node_modules/ipfs-http-client IPFS_REUSEPORT=false ipfs-interop",
"test:interop": "cross-env IPFS_JS_EXEC=$PWD/src/cli.js IPFS_JS_MODULE=$PWD/dist IPFS_JS_HTTP_MODULE=$PWD/node_modules/ipfs-http-client LIBP2P_TCP_REUSEPORT=false ipfs-interop",
"test:external": "aegir test-dependant",
"clean": "rimraf ./dist",
"dep-check": "aegir dep-check -i ipfs-core-types -i @types/* -i npm-run-all -i copyfiles"
Expand All @@ -84,12 +84,12 @@
"copyfiles": "^2.4.1",
"cross-env": "^7.0.0",
"electron-webrtc": "^0.3.0",
"go-ipfs": "0.9.1",
"go-ipfs": "0.11.0-rc1",
"interface-ipfs-core": "^0.152.2",
"ipfs-client": "^0.7.4",
"ipfs-core-types": "^0.8.4",
"ipfs-http-client": "^54.0.2",
"ipfs-interop": "^7.0.2",
"ipfs-interop": "ipfs/interop#feat/pubsub-require-multibase",
"ipfs-utils": "^9.0.2",
"ipfsd-ctl": "^10.0.4",
"iso-url": "^1.0.0",
Expand Down