From 622063163d7c82867423fb452d3e46cedbf7d614 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Sat, 15 Feb 2020 09:43:12 +0100 Subject: [PATCH] feat: stream to multiaddr connection converter (#2) --- package.json | 11 +++++-- src/stream-to-ma-conn.js | 49 +++++++++++++++++++++++++++++ test/stream-to-ma-conn.spec.js | 56 ++++++++++++++++++++++++++++++++++ 3 files changed, 113 insertions(+), 3 deletions(-) create mode 100644 src/stream-to-ma-conn.js create mode 100644 test/stream-to-ma-conn.spec.js diff --git a/package.json b/package.json index c75337d..7dcd94c 100644 --- a/package.json +++ b/package.json @@ -29,14 +29,19 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-utils#readme", "devDependencies": { - "aegir": "^20.3.1", + "aegir": "^20.6.0", "chai": "^4.2.0", - "dirty-chai": "^2.0.1" + "dirty-chai": "^2.0.1", + "it-pair": "^1.0.0", + "it-pipe": "^1.1.0", + "streaming-iterables": "^4.1.2" }, "dependencies": { + "abortable-iterator": "^3.0.0", + "debug": "^4.1.1", "err-code": "^2.0.0", "ip-address": "^6.1.0", - "multiaddr": "^7.1.0" + "multiaddr": "^7.3.0" }, "contributors": [ "Vasco Santos " diff --git a/src/stream-to-ma-conn.js b/src/stream-to-ma-conn.js new file mode 100644 index 0000000..d19e960 --- /dev/null +++ b/src/stream-to-ma-conn.js @@ -0,0 +1,49 @@ +'use strict' + +const abortable = require('abortable-iterator') +const log = require('debug')('libp2p:stream:converter') + +// Convert a duplex iterable into a MultiaddrConnection +// https://github.com/libp2p/interface-transport#multiaddrconnection +module.exports = ({ stream, remoteAddr, localAddr }, options = {}) => { + const { sink, source } = stream + const maConn = { + async sink (source) { + if (options.signal) { + source = abortable(source, options.signal) + } + + try { + await sink(source) + } catch (err) { + // If aborted we can safely ignore + if (err.type !== 'aborted') { + // If the source errored the socket will already have been destroyed by + // toIterable.duplex(). If the socket errored it will already be + // destroyed. There's nothing to do here except log the error & return. + log(err) + } + } + close() + }, + + source: options.signal ? abortable(source, options.signal) : source, + conn: stream, + localAddr, + remoteAddr, + timeline: { open: Date.now() }, + + close () { + sink([]) + close() + } + } + + function close () { + if (!maConn.timeline.close) { + maConn.timeline.close = Date.now() + } + } + + return maConn +} diff --git a/test/stream-to-ma-conn.spec.js b/test/stream-to-ma-conn.spec.js new file mode 100644 index 0000000..9e223b5 --- /dev/null +++ b/test/stream-to-ma-conn.spec.js @@ -0,0 +1,56 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const pair = require('it-pair') +const pipe = require('it-pipe') +const { collect } = require('streaming-iterables') +const multiaddr = require('multiaddr') + +const streamToMaConn = require('../src/stream-to-ma-conn') + +describe('Convert stream into a multiaddr connection', () => { + it('converts a stream and adds the provided metadata', () => { + const stream = pair() + const localAddr = multiaddr('/ip4/101.45.75.219/tcp/6000') + const remoteAddr = multiaddr('/ip4/100.46.74.201/tcp/6002') + + const maConn = streamToMaConn({ + stream, + localAddr, + remoteAddr + }) + + expect(maConn).to.exist() + expect(maConn.sink).to.exist() + expect(maConn.source).to.exist() + expect(maConn.localAddr).to.eql(localAddr) + expect(maConn.remoteAddr).to.eql(remoteAddr) + expect(maConn.timeline).to.exist() + expect(maConn.timeline.open).to.exist() + expect(maConn.timeline.close).to.not.exist() + + maConn.close() + expect(maConn.timeline.close).to.exist() + }) + + it('can stream data over the multiaddr connection', async () => { + const stream = pair() + const maConn = streamToMaConn({ stream }) + + const data = 'hey' + const streamData = await pipe( + [data], + maConn, + collect + ) + + expect(streamData).to.eql([data]) + // underlying stream end closes the connection + expect(maConn.timeline.close).to.exist() + }) +})