diff --git a/package.json b/package.json index 974875d..d832a58 100644 --- a/package.json +++ b/package.json @@ -33,8 +33,7 @@ "it-pipe": "^1.0.1", "it-pushable": "^1.2.1", "it-reader": "^1.0.0", - "p-defer": "^3.0.0", - "varint": "^5.0.0" + "p-defer": "^3.0.0" }, "devDependencies": { "aegir": "^20.0.0", @@ -42,6 +41,7 @@ "dirty-chai": "^2.0.1", "it-pair": "^1.0.0", "mocha": "^6.2.0", - "streaming-iterables": "^4.1.0" + "streaming-iterables": "^4.1.0", + "varint": "^5.0.0" } } diff --git a/src/handle.js b/src/handle.js index 5e787c2..e1ebcf0 100644 --- a/src/handle.js +++ b/src/handle.js @@ -1,10 +1,8 @@ 'use strict' const log = require('debug')('it-multistream-select:handle') -const { Buffer } = require('buffer') const BufferList = require('bl') -const Varint = require('varint') -const Multistream = require('./multistream') +const multistream = require('./multistream') const toReaderWriter = require('./to-reader-writer') module.exports = async (stream, protocols) => { @@ -12,27 +10,26 @@ module.exports = async (stream, protocols) => { const { reader, writer, rest } = toReaderWriter(stream) while (true) { - const protocol = (await Multistream.read(reader)).toString() + const protocol = (await multistream.read(reader)).toString() log('read "%s"', protocol) if (protocols.includes(protocol)) { - Multistream.write(writer, protocol) + multistream.write(writer, protocol) log('write "%s" "%s"', protocol, protocol) writer.end() return { stream: rest, protocol } } if (protocol === 'ls') { - // \n\n - Multistream.write(writer, new BufferList([ - Buffer.from(Varint.encode(protocols.length)), - ...protocols.map(p => Multistream.encode(p)) - ]).shallowSlice(0, -1)) + // \n\n\n + multistream.write(writer, new BufferList( + protocols.map(p => multistream.encode(p)) + )) log('write "%s" %s', protocol, protocols) continue } - Multistream.write(writer, 'na') + multistream.write(writer, 'na') log('write "%s" "na"', protocol) } } diff --git a/src/ls.js b/src/ls.js index 8c5bb27..6db9694 100644 --- a/src/ls.js +++ b/src/ls.js @@ -2,40 +2,38 @@ const Reader = require('it-reader') const log = require('debug')('it-multistream-select:ls') -const Varint = require('varint') -const Multistream = require('./multistream') +const multistream = require('./multistream') const toReaderWriter = require('./to-reader-writer') +const lp = require('it-length-prefixed') +const pipe = require('it-pipe') module.exports = async stream => { const { reader, writer, rest } = toReaderWriter(stream) log('write "ls"') - Multistream.write(writer, 'ls') + multistream.write(writer, 'ls') writer.end() // Next message from remote will be (e.g. for 2 protocols): - // \n\n - const res = await Multistream.read(reader) + // \n\n + const res = await multistream.read(reader) // After reading response we have: - // \n - // - // FIXME: Varint.decode expects a Buffer not a BufferList. The .slice is a - // slow copy of the whole message. We could use a proxy? Hacky but works: - // https://github.com/alanshaw/it-length-prefixed/blob/37d9f181ad9b3e272d5c3636f0ae1f7d9fbf738d/src/decode.js#L10-L12 - const totalProtocols = Varint.decode(res.slice()) - log('%s total protocols', totalProtocols) - - // Append \n because there's no final \n at the end of an ls message - // https://github.com/multiformats/go-multistream/issues/41 - const protocolsReader = Reader([res.shallowSlice(Varint.decode.bytes).append('\n')]) + // \n\n + const protocolsReader = Reader([res]) const protocols = [] - for (let i = 0; i < totalProtocols; i++) { - const protocol = await Multistream.read(protocolsReader) - log('read "%s"', protocol) - protocols.push(protocol.toString()) - } + // Decode each of the protocols from the reader + await pipe( + protocolsReader, + lp.decode(), + async source => { + for await (const protocol of source) { + // Remove the newline + protocols.push(protocol.shallowSlice(0, -1).toString()) + } + } + ) return { stream: rest, protocols } } diff --git a/src/multistream.js b/src/multistream.js index 8d929f4..cdc81d3 100644 --- a/src/multistream.js +++ b/src/multistream.js @@ -2,7 +2,7 @@ const { Buffer } = require('buffer') const BufferList = require('bl') -const Lp = require('it-length-prefixed') +const lp = require('it-length-prefixed') const pipe = require('it-pipe') const errCode = require('err-code') @@ -12,7 +12,7 @@ async function oneChunk (source) { for await (const chunk of source) return chunk // We only need one! } -exports.encode = buffer => Lp.encode.single(new BufferList([buffer, NewLine])) +exports.encode = buffer => lp.encode.single(new BufferList([buffer, NewLine])) exports.write = (writer, buffer) => writer.push(exports.encode(buffer)) @@ -25,7 +25,7 @@ exports.read = async reader => { // Once the length has been parsed, read chunk for that length const onLength = l => { byteLength = l } - const buf = await pipe(varByteSource, Lp.decode({ onLength }), oneChunk) + const buf = await pipe(varByteSource, lp.decode({ onLength }), oneChunk) if (buf.get(buf.length - 1) !== NewLine[0]) { throw errCode(new Error('missing newline'), 'ERR_INVALID_MULTISTREAM_SELECT_MESSAGE') diff --git a/src/select.js b/src/select.js index c8d74a7..bc8cc98 100644 --- a/src/select.js +++ b/src/select.js @@ -2,7 +2,7 @@ const log = require('debug')('it-multistream-select:select') const errCode = require('err-code') -const Multistream = require('./multistream') +const multistream = require('./multistream') const toReaderWriter = require('./to-reader-writer') module.exports = async (stream, protocols) => { @@ -11,8 +11,8 @@ module.exports = async (stream, protocols) => { for (const protocol of protocols) { log('write "%s"', protocol) - Multistream.write(writer, protocol) - const response = (await Multistream.read(reader)).toString() + multistream.write(writer, protocol) + const response = (await multistream.read(reader)).toString() log('read "%s" "%s"', protocol, response) if (response === protocol) { diff --git a/test/dialer.spec.js b/test/dialer.spec.js index 3998f07..260a8a9 100644 --- a/test/dialer.spec.js +++ b/test/dialer.spec.js @@ -10,7 +10,6 @@ const Crypto = require('crypto') const BufferList = require('bl') const Pair = require('it-pair') const Reader = require('it-reader') -const Varint = require('varint') const throwsAsync = require('./helpers/throws-async') const Multistream = require('../src/multistream') const MSS = require('../') @@ -127,10 +126,9 @@ describe('Dialer', () => { expect(msg.toString()).to.eql('ls') // Respond with protocols - yield Multistream.encode(new BufferList([ - Buffer.from(Varint.encode(protocols.length)), - ...protocols.map(p => Multistream.encode(p)) - ]).shallowSlice(0, -1)) + yield Multistream.encode(new BufferList( + protocols.map(p => Multistream.encode(p)) + )) // Third message will be selectedProtocol msg = await Multistream.read(reader) diff --git a/test/listener.spec.js b/test/listener.spec.js index 4d26eff..d888a2c 100644 --- a/test/listener.spec.js +++ b/test/listener.spec.js @@ -9,7 +9,7 @@ const Crypto = require('crypto') const BufferList = require('bl') const Reader = require('it-reader') const { collect } = require('streaming-iterables') -const Varint = require('varint') +const Lp = require('it-length-prefixed') const Multistream = require('../src/multistream') const MSS = require('../') @@ -111,14 +111,20 @@ describe('Listener', () => { // Second message will be ls response msg = await Multistream.read(reader) - const totalProtocols = Varint.decode(msg.slice()) - const lsProtocolsReader = Reader([msg.shallowSlice(Varint.decode.bytes).append('\n')]) + const protocolsReader = Reader([msg]) const lsProtocols = [] - for (let i = 0; i < totalProtocols; i++) { - const protocol = await Multistream.read(lsProtocolsReader) - lsProtocols.push(protocol.toString()) - } + // Decode each of the protocols from the reader + await pipe( + protocolsReader, + Lp.decode(), + async source => { + for await (const protocol of source) { + // Remove the newline + lsProtocols.push(protocol.shallowSlice(0, -1).toString()) + } + } + ) expect(lsProtocols).to.eql(handledProtocols)