Skip to content
This repository has been archived by the owner on Oct 10, 2019. It is now read-only.

Update LS to match the latest spec #5

Merged
merged 3 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@
"it-pipe": "^1.0.1",
"it-pushable": "^1.2.1",
"it-reader": "^1.0.0",
"p-defer": "^3.0.0",
"varint": "^5.0.0"
"p-defer": "^3.0.0"
},
"devDependencies": {
"aegir": "^20.0.0",
"chai": "^4.2.0",
"dirty-chai": "^2.0.1",
"it-pair": "^1.0.0",
"mocha": "^6.2.0",
"streaming-iterables": "^4.1.0"
"streaming-iterables": "^4.1.0",
"varint": "^5.0.0"
}
}
19 changes: 8 additions & 11 deletions src/handle.js
Original file line number Diff line number Diff line change
@@ -1,38 +1,35 @@
'use strict'

const log = require('debug')('it-multistream-select:handle')
const { Buffer } = require('buffer')
const BufferList = require('bl')
const Varint = require('varint')
const Multistream = require('./multistream')
const multistream = require('./multistream')
const toReaderWriter = require('./to-reader-writer')

module.exports = async (stream, protocols) => {
protocols = Array.isArray(protocols) ? protocols : [protocols]
const { reader, writer, rest } = toReaderWriter(stream)

while (true) {
const protocol = (await Multistream.read(reader)).toString()
const protocol = (await multistream.read(reader)).toString()
log('read "%s"', protocol)

if (protocols.includes(protocol)) {
Multistream.write(writer, protocol)
multistream.write(writer, protocol)
log('write "%s" "%s"', protocol, protocol)
writer.end()
return { stream: rest, protocol }
}

if (protocol === 'ls') {
// <varint-msg-len><varint-num-protos><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n
Multistream.write(writer, new BufferList([
Buffer.from(Varint.encode(protocols.length)),
...protocols.map(p => Multistream.encode(p))
]).shallowSlice(0, -1))
// <varint-msg-len><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n\n
multistream.write(writer, new BufferList(
protocols.map(p => multistream.encode(p))
))
log('write "%s" %s', protocol, protocols)
continue
}

Multistream.write(writer, 'na')
multistream.write(writer, 'na')
log('write "%s" "na"', protocol)
}
}
40 changes: 19 additions & 21 deletions src/ls.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,38 @@

const Reader = require('it-reader')
const log = require('debug')('it-multistream-select:ls')
const Varint = require('varint')
const Multistream = require('./multistream')
const multistream = require('./multistream')
const toReaderWriter = require('./to-reader-writer')
const lp = require('it-length-prefixed')
const pipe = require('it-pipe')

module.exports = async stream => {
const { reader, writer, rest } = toReaderWriter(stream)

log('write "ls"')
Multistream.write(writer, 'ls')
multistream.write(writer, 'ls')
writer.end()

// Next message from remote will be (e.g. for 2 protocols):
// <varint-msg-len><varint-num-protos><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n
const res = await Multistream.read(reader)
// <varint-msg-len><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n
const res = await multistream.read(reader)

// After reading response we have:
// <varint-num-protos><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>
//
// FIXME: Varint.decode expects a Buffer not a BufferList. The .slice is a
// slow copy of the whole message. We could use a proxy? Hacky but works:
// https://github.com/alanshaw/it-length-prefixed/blob/37d9f181ad9b3e272d5c3636f0ae1f7d9fbf738d/src/decode.js#L10-L12
const totalProtocols = Varint.decode(res.slice())
log('%s total protocols', totalProtocols)

// Append \n because there's no final \n at the end of an ls message
// https://github.com/multiformats/go-multistream/issues/41
const protocolsReader = Reader([res.shallowSlice(Varint.decode.bytes).append('\n')])
// <varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n
const protocolsReader = Reader([res])
const protocols = []

for (let i = 0; i < totalProtocols; i++) {
const protocol = await Multistream.read(protocolsReader)
log('read "%s"', protocol)
protocols.push(protocol.toString())
}
// Decode each of the protocols from the reader
await pipe(
protocolsReader,
lp.decode(),
async source => {
for await (const protocol of source) {
// Remove the newline
protocols.push(protocol.shallowSlice(0, -1).toString())
}
}
)

return { stream: rest, protocols }
}
6 changes: 3 additions & 3 deletions src/multistream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const { Buffer } = require('buffer')
const BufferList = require('bl')
const Lp = require('it-length-prefixed')
const lp = require('it-length-prefixed')
const pipe = require('it-pipe')
const errCode = require('err-code')

Expand All @@ -12,7 +12,7 @@ async function oneChunk (source) {
for await (const chunk of source) return chunk // We only need one!
}

exports.encode = buffer => Lp.encode.single(new BufferList([buffer, NewLine]))
exports.encode = buffer => lp.encode.single(new BufferList([buffer, NewLine]))

exports.write = (writer, buffer) => writer.push(exports.encode(buffer))

Expand All @@ -25,7 +25,7 @@ exports.read = async reader => {

// Once the length has been parsed, read chunk for that length
const onLength = l => { byteLength = l }
const buf = await pipe(varByteSource, Lp.decode({ onLength }), oneChunk)
const buf = await pipe(varByteSource, lp.decode({ onLength }), oneChunk)

if (buf.get(buf.length - 1) !== NewLine[0]) {
throw errCode(new Error('missing newline'), 'ERR_INVALID_MULTISTREAM_SELECT_MESSAGE')
Expand Down
6 changes: 3 additions & 3 deletions src/select.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const log = require('debug')('it-multistream-select:select')
const errCode = require('err-code')
const Multistream = require('./multistream')
const multistream = require('./multistream')
const toReaderWriter = require('./to-reader-writer')

module.exports = async (stream, protocols) => {
Expand All @@ -11,8 +11,8 @@ module.exports = async (stream, protocols) => {

for (const protocol of protocols) {
log('write "%s"', protocol)
Multistream.write(writer, protocol)
const response = (await Multistream.read(reader)).toString()
multistream.write(writer, protocol)
const response = (await multistream.read(reader)).toString()
log('read "%s" "%s"', protocol, response)

if (response === protocol) {
Expand Down
8 changes: 3 additions & 5 deletions test/dialer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const Crypto = require('crypto')
const BufferList = require('bl')
const Pair = require('it-pair')
const Reader = require('it-reader')
const Varint = require('varint')
const throwsAsync = require('./helpers/throws-async')
const Multistream = require('../src/multistream')
const MSS = require('../')
Expand Down Expand Up @@ -127,10 +126,9 @@ describe('Dialer', () => {
expect(msg.toString()).to.eql('ls')

// Respond with protocols
yield Multistream.encode(new BufferList([
Buffer.from(Varint.encode(protocols.length)),
...protocols.map(p => Multistream.encode(p))
]).shallowSlice(0, -1))
yield Multistream.encode(new BufferList(
protocols.map(p => Multistream.encode(p))
))

// Third message will be selectedProtocol
msg = await Multistream.read(reader)
Expand Down
20 changes: 13 additions & 7 deletions test/listener.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const Crypto = require('crypto')
const BufferList = require('bl')
const Reader = require('it-reader')
const { collect } = require('streaming-iterables')
const Varint = require('varint')
const Lp = require('it-length-prefixed')
const Multistream = require('../src/multistream')
const MSS = require('../')

Expand Down Expand Up @@ -111,14 +111,20 @@ describe('Listener', () => {
// Second message will be ls response
msg = await Multistream.read(reader)

const totalProtocols = Varint.decode(msg.slice())
const lsProtocolsReader = Reader([msg.shallowSlice(Varint.decode.bytes).append('\n')])
const protocolsReader = Reader([msg])
const lsProtocols = []

for (let i = 0; i < totalProtocols; i++) {
const protocol = await Multistream.read(lsProtocolsReader)
lsProtocols.push(protocol.toString())
}
// Decode each of the protocols from the reader
await pipe(
protocolsReader,
Lp.decode(),
async source => {
for await (const protocol of source) {
// Remove the newline
lsProtocols.push(protocol.shallowSlice(0, -1).toString())
}
}
)

expect(lsProtocols).to.eql(handledProtocols)

Expand Down