Skip to content
This repository has been archived by the owner on Oct 10, 2019. It is now read-only.

Commit

Permalink
refactor: update LS to match the latest spec (#5)
Browse files Browse the repository at this point in the history
Multistream spec: multiformats/multistream-select#19

fixes #4
  • Loading branch information
jacobheun authored and Alan Shaw committed Sep 10, 2019
1 parent 7e96d4a commit 03714cb
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 53 deletions.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@
"it-pipe": "^1.0.1",
"it-pushable": "^1.2.1",
"it-reader": "^1.0.0",
"p-defer": "^3.0.0",
"varint": "^5.0.0"
"p-defer": "^3.0.0"
},
"devDependencies": {
"aegir": "^20.0.0",
"chai": "^4.2.0",
"dirty-chai": "^2.0.1",
"it-pair": "^1.0.0",
"mocha": "^6.2.0",
"streaming-iterables": "^4.1.0"
"streaming-iterables": "^4.1.0",
"varint": "^5.0.0"
}
}
19 changes: 8 additions & 11 deletions src/handle.js
Original file line number Diff line number Diff line change
@@ -1,38 +1,35 @@
'use strict'

const log = require('debug')('it-multistream-select:handle')
const { Buffer } = require('buffer')
const BufferList = require('bl')
const Varint = require('varint')
const Multistream = require('./multistream')
const multistream = require('./multistream')
const toReaderWriter = require('./to-reader-writer')

module.exports = async (stream, protocols) => {
protocols = Array.isArray(protocols) ? protocols : [protocols]
const { reader, writer, rest } = toReaderWriter(stream)

while (true) {
const protocol = (await Multistream.read(reader)).toString()
const protocol = (await multistream.read(reader)).toString()
log('read "%s"', protocol)

if (protocols.includes(protocol)) {
Multistream.write(writer, protocol)
multistream.write(writer, protocol)
log('write "%s" "%s"', protocol, protocol)
writer.end()
return { stream: rest, protocol }
}

if (protocol === 'ls') {
// <varint-msg-len><varint-num-protos><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n
Multistream.write(writer, new BufferList([
Buffer.from(Varint.encode(protocols.length)),
...protocols.map(p => Multistream.encode(p))
]).shallowSlice(0, -1))
// <varint-msg-len><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n\n
multistream.write(writer, new BufferList(
protocols.map(p => multistream.encode(p))
))
log('write "%s" %s', protocol, protocols)
continue
}

Multistream.write(writer, 'na')
multistream.write(writer, 'na')
log('write "%s" "na"', protocol)
}
}
40 changes: 19 additions & 21 deletions src/ls.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,38 @@

const Reader = require('it-reader')
const log = require('debug')('it-multistream-select:ls')
const Varint = require('varint')
const Multistream = require('./multistream')
const multistream = require('./multistream')
const toReaderWriter = require('./to-reader-writer')
const lp = require('it-length-prefixed')
const pipe = require('it-pipe')

module.exports = async stream => {
const { reader, writer, rest } = toReaderWriter(stream)

log('write "ls"')
Multistream.write(writer, 'ls')
multistream.write(writer, 'ls')
writer.end()

// Next message from remote will be (e.g. for 2 protocols):
// <varint-msg-len><varint-num-protos><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n
const res = await Multistream.read(reader)
// <varint-msg-len><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n
const res = await multistream.read(reader)

// After reading response we have:
// <varint-num-protos><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>
//
// FIXME: Varint.decode expects a Buffer not a BufferList. The .slice is a
// slow copy of the whole message. We could use a proxy? Hacky but works:
// https://github.com/alanshaw/it-length-prefixed/blob/37d9f181ad9b3e272d5c3636f0ae1f7d9fbf738d/src/decode.js#L10-L12
const totalProtocols = Varint.decode(res.slice())
log('%s total protocols', totalProtocols)

// Append \n because there's no final \n at the end of an ls message
// https://github.com/multiformats/go-multistream/issues/41
const protocolsReader = Reader([res.shallowSlice(Varint.decode.bytes).append('\n')])
// <varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n
const protocolsReader = Reader([res])
const protocols = []

for (let i = 0; i < totalProtocols; i++) {
const protocol = await Multistream.read(protocolsReader)
log('read "%s"', protocol)
protocols.push(protocol.toString())
}
// Decode each of the protocols from the reader
await pipe(
protocolsReader,
lp.decode(),
async source => {
for await (const protocol of source) {
// Remove the newline
protocols.push(protocol.shallowSlice(0, -1).toString())
}
}
)

return { stream: rest, protocols }
}
6 changes: 3 additions & 3 deletions src/multistream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const { Buffer } = require('buffer')
const BufferList = require('bl')
const Lp = require('it-length-prefixed')
const lp = require('it-length-prefixed')
const pipe = require('it-pipe')
const errCode = require('err-code')

Expand All @@ -12,7 +12,7 @@ async function oneChunk (source) {
for await (const chunk of source) return chunk // We only need one!
}

exports.encode = buffer => Lp.encode.single(new BufferList([buffer, NewLine]))
exports.encode = buffer => lp.encode.single(new BufferList([buffer, NewLine]))

exports.write = (writer, buffer) => writer.push(exports.encode(buffer))

Expand All @@ -25,7 +25,7 @@ exports.read = async reader => {

// Once the length has been parsed, read chunk for that length
const onLength = l => { byteLength = l }
const buf = await pipe(varByteSource, Lp.decode({ onLength }), oneChunk)
const buf = await pipe(varByteSource, lp.decode({ onLength }), oneChunk)

if (buf.get(buf.length - 1) !== NewLine[0]) {
throw errCode(new Error('missing newline'), 'ERR_INVALID_MULTISTREAM_SELECT_MESSAGE')
Expand Down
6 changes: 3 additions & 3 deletions src/select.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const log = require('debug')('it-multistream-select:select')
const errCode = require('err-code')
const Multistream = require('./multistream')
const multistream = require('./multistream')
const toReaderWriter = require('./to-reader-writer')

module.exports = async (stream, protocols) => {
Expand All @@ -11,8 +11,8 @@ module.exports = async (stream, protocols) => {

for (const protocol of protocols) {
log('write "%s"', protocol)
Multistream.write(writer, protocol)
const response = (await Multistream.read(reader)).toString()
multistream.write(writer, protocol)
const response = (await multistream.read(reader)).toString()
log('read "%s" "%s"', protocol, response)

if (response === protocol) {
Expand Down
8 changes: 3 additions & 5 deletions test/dialer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const Crypto = require('crypto')
const BufferList = require('bl')
const Pair = require('it-pair')
const Reader = require('it-reader')
const Varint = require('varint')
const throwsAsync = require('./helpers/throws-async')
const Multistream = require('../src/multistream')
const MSS = require('../')
Expand Down Expand Up @@ -127,10 +126,9 @@ describe('Dialer', () => {
expect(msg.toString()).to.eql('ls')

// Respond with protocols
yield Multistream.encode(new BufferList([
Buffer.from(Varint.encode(protocols.length)),
...protocols.map(p => Multistream.encode(p))
]).shallowSlice(0, -1))
yield Multistream.encode(new BufferList(
protocols.map(p => Multistream.encode(p))
))

// Third message will be selectedProtocol
msg = await Multistream.read(reader)
Expand Down
20 changes: 13 additions & 7 deletions test/listener.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const Crypto = require('crypto')
const BufferList = require('bl')
const Reader = require('it-reader')
const { collect } = require('streaming-iterables')
const Varint = require('varint')
const Lp = require('it-length-prefixed')
const Multistream = require('../src/multistream')
const MSS = require('../')

Expand Down Expand Up @@ -111,14 +111,20 @@ describe('Listener', () => {
// Second message will be ls response
msg = await Multistream.read(reader)

const totalProtocols = Varint.decode(msg.slice())
const lsProtocolsReader = Reader([msg.shallowSlice(Varint.decode.bytes).append('\n')])
const protocolsReader = Reader([msg])
const lsProtocols = []

for (let i = 0; i < totalProtocols; i++) {
const protocol = await Multistream.read(lsProtocolsReader)
lsProtocols.push(protocol.toString())
}
// Decode each of the protocols from the reader
await pipe(
protocolsReader,
Lp.decode(),
async source => {
for await (const protocol of source) {
// Remove the newline
lsProtocols.push(protocol.shallowSlice(0, -1).toString())
}
}
)

expect(lsProtocols).to.eql(handledProtocols)

Expand Down

0 comments on commit 03714cb

Please sign in to comment.