diff --git a/package.json b/package.json index 62cd997..188e79c 100644 --- a/package.json +++ b/package.json @@ -33,21 +33,23 @@ "npm": ">=3.0.0" }, "devDependencies": { - "aegir": "^20.3.1", + "aegir": "^20.4.1", "chai": "^4.2.0", "dirty-chai": "^2.0.1", - "interface-transport": "^0.7.0", - "sinon": "^7.3.1" + "libp2p-interfaces": "^0.1.6", + "it-pipe": "^1.1.0", + "sinon": "^7.5.0", + "streaming-iterables": "^4.1.1" }, "dependencies": { - "abortable-iterator": "^2.1.0", + "abortable-iterator": "^3.0.0", "class-is": "^1.1.0", "debug": "^4.1.1", "err-code": "^2.0.0", "libp2p-utils": "~0.1.0", "mafmt": "^7.0.0", - "multiaddr": "^7.1.0", - "stream-to-it": "^0.1.1" + "multiaddr": "^7.2.1", + "stream-to-it": "^0.2.0" }, "contributors": [ "Alan Shaw ", diff --git a/src/index.js b/src/index.js index 56eabac..a4bc858 100644 --- a/src/index.js +++ b/src/index.js @@ -7,6 +7,7 @@ const errCode = require('err-code') const log = require('debug')('libp2p:tcp') const toConnection = require('./socket-to-conn') const createListener = require('./listener') +const { multiaddrToNetConfig } = require('./utils') const { AbortError } = require('abortable-iterator') const { CODE_CIRCUIT, CODE_P2P } = require('./constants') const assert = require('assert') @@ -56,9 +57,9 @@ class TCP { return new Promise((resolve, reject) => { const start = Date.now() - const cOpts = ma.toOptions() + const cOpts = multiaddrToNetConfig(ma) - log('dialing %s:%s', cOpts.host, cOpts.port) + log('dialing %j', cOpts) const rawSocket = net.connect(cOpts) const onError = err => { @@ -74,12 +75,12 @@ class TCP { } const onConnect = () => { - log('connection opened %s:%s', cOpts.host, cOpts.port) + log('connection opened %j', cOpts) done() } const onAbort = () => { - log('connection aborted %s:%s', cOpts.host, cOpts.port) + log('connection aborted %j', cOpts) rawSocket.destroy() done(new AbortError()) } diff --git a/src/listener.js b/src/listener.js index 975eb93..91269c5 100644 --- a/src/listener.js +++ b/src/listener.js @@ -1,13 +1,14 @@ 'use strict' -const multiaddr = require('multiaddr') -const os = require('os') const net = require('net') const EventEmitter = require('events') const log = require('debug')('libp2p:tcp:listener') const toConnection = require('./socket-to-conn') const { CODE_P2P } = require('./constants') -const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' } +const { + getMultiaddrs, + multiaddrToNetConfig +} = require('./utils') module.exports = ({ handler, upgrader }, options) => { const listener = new EventEmitter() @@ -16,7 +17,7 @@ module.exports = ({ handler, upgrader }, options) => { // Avoid uncaught errors caused by unstable connections socket.on('error', err => log('socket error', err)) - const maConn = toConnection(socket) + const maConn = toConnection(socket, { listeningAddr }) log('new inbound connection %s', maConn.remoteAddr) const conn = await upgrader.upgradeInbound(maConn) @@ -56,8 +57,8 @@ module.exports = ({ handler, upgrader }, options) => { } return new Promise((resolve, reject) => { - const { host, port } = listeningAddr.toOptions() - server.listen(port, host, err => { + const options = multiaddrToNetConfig(listeningAddr) + server.listen(options, err => { if (err) return reject(err) log('Listening on %s', server.address()) resolve() @@ -76,9 +77,9 @@ module.exports = ({ handler, upgrader }, options) => { // Because TCP will only return the IPv6 version // we need to capture from the passed multiaddr if (listeningAddr.toString().startsWith('/ip4')) { - addrs = addrs.concat(getMulitaddrs('ip4', address.address, address.port)) + addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port)) } else if (address.family === 'IPv6') { - addrs = addrs.concat(getMulitaddrs('ip6', address.address, address.port)) + addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port)) } return addrs.map(ma => peerId ? ma.encapsulate(`/p2p/${peerId}`) : ma) @@ -87,30 +88,6 @@ module.exports = ({ handler, upgrader }, options) => { return listener } -function getMulitaddrs (proto, ip, port) { - const toMa = ip => multiaddr(`/${proto}/${ip}/tcp/${port}`) - return (isAnyAddr(ip) ? getNetworkAddrs(ProtoFamily[proto]) : [ip]).map(toMa) -} - -function isAnyAddr (ip) { - return ['0.0.0.0', '::'].includes(ip) -} - -/** - * @private - * @param {string} family One of ['IPv6', 'IPv4'] - * @returns {string[]} an array of ip address strings - */ -function getNetworkAddrs (family) { - return Object.values(os.networkInterfaces()).reduce((addresses, netAddrs) => { - netAddrs.forEach(netAddr => { - // Add the ip of each matching network interface - if (netAddr.family === family) addresses.push(netAddr.address) - }) - return addresses - }, []) -} - function trackConn (server, maConn) { server.__connections.push(maConn) diff --git a/src/socket-to-conn.js b/src/socket-to-conn.js index 2cdab2d..fd5b574 100644 --- a/src/socket-to-conn.js +++ b/src/socket-to-conn.js @@ -11,6 +11,15 @@ const { CLOSE_TIMEOUT } = require('./constants') module.exports = (socket, options) => { options = options || {} + // Check if we are connected on a unix path + if (options.listeningAddr && options.listeningAddr.getPath()) { + options.remoteAddr = options.listeningAddr + } + + if (options.remoteAddr && options.remoteAddr.getPath()) { + options.localAddr = options.remoteAddr + } + const { sink, source } = toIterable.duplex(socket) const maConn = { async sink (source) { @@ -40,7 +49,7 @@ module.exports = (socket, options) => { conn: socket, - localAddr: toMultiaddr(socket.localAddress, socket.localPort), + localAddr: options.localAddr || toMultiaddr(socket.localAddress, socket.localPort), // If the remote address was passed, use it - it may have the peer ID encapsulated remoteAddr: options.remoteAddr || toMultiaddr(socket.remoteAddress, socket.remotePort), diff --git a/src/utils.js b/src/utils.js new file mode 100644 index 0000000..9f6fd87 --- /dev/null +++ b/src/utils.js @@ -0,0 +1,46 @@ +'use strict' + +const multiaddr = require('multiaddr') +const os = require('os') +const { resolve } = require('path') +const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' } + +function multiaddrToNetConfig (addr) { + const listenPath = addr.getPath() + // unix socket listening + if (listenPath) { + return resolve(listenPath) + } + // tcp listening + return addr.toOptions() +} + +function getMultiaddrs (proto, ip, port) { + const toMa = ip => multiaddr(`/${proto}/${ip}/tcp/${port}`) + return (isAnyAddr(ip) ? getNetworkAddrs(ProtoFamily[proto]) : [ip]).map(toMa) +} + +function isAnyAddr (ip) { + return ['0.0.0.0', '::'].includes(ip) +} + +/** + * @private + * @param {string} family One of ['IPv6', 'IPv4'] + * @returns {string[]} an array of ip address strings + */ +function getNetworkAddrs (family) { + return Object.values(os.networkInterfaces()).reduce((addresses, netAddrs) => { + netAddrs.forEach(netAddr => { + // Add the ip of each matching network interface + if (netAddr.family === family) addresses.push(netAddr.address) + }) + return addresses + }, []) +} + +module.exports = { + multiaddrToNetConfig, + isAnyAddr, + getMultiaddrs +} diff --git a/test/compliance.spec.js b/test/compliance.spec.js index 09b334f..2ed7818 100644 --- a/test/compliance.spec.js +++ b/test/compliance.spec.js @@ -2,7 +2,7 @@ 'use strict' const sinon = require('sinon') -const tests = require('interface-transport') +const tests = require('libp2p-interfaces/src/transport/tests') const multiaddr = require('multiaddr') const net = require('net') const TCP = require('../src') diff --git a/test/listen-dial.spec.js b/test/listen-dial.spec.js index 4f90413..4e70168 100644 --- a/test/listen-dial.spec.js +++ b/test/listen-dial.spec.js @@ -7,10 +7,15 @@ const expect = chai.expect chai.use(dirtyChai) const TCP = require('../src') const net = require('net') +const os = require('os') +const path = require('path') const multiaddr = require('multiaddr') const pipe = require('it-pipe') const { collect, map } = require('streaming-iterables') const isCI = process.env.CI +const isWindows = os.platform() === 'win32' + +const skipOnWindows = isWindows ? it.skip : it describe('construction', () => { it('requires an upgrader', () => { @@ -20,6 +25,7 @@ describe('construction', () => { describe('listen', () => { let tcp + let listener beforeEach(() => { tcp = new TCP({ @@ -29,10 +35,13 @@ describe('listen', () => { } }) }) + afterEach(async () => { + listener && await listener.close() + }) it('close listener with connections, through timeout', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const listener = tcp.createListener((conn) => { + listener = tcp.createListener((conn) => { pipe(conn, conn) }) @@ -54,11 +63,18 @@ describe('listen', () => { }) }) + // Windows doesn't support unix paths + skipOnWindows('listen on path', async () => { + const mh = multiaddr(`/unix${path.resolve(os.tmpdir(), '/tmp/p2pd.sock')}`) + + listener = tcp.createListener((conn) => {}) + await listener.listen(mh) + }) + it('listen on port 0', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/0') - const listener = tcp.createListener((conn) => {}) + listener = tcp.createListener((conn) => {}) await listener.listen(mh) - await listener.close() }) it('listen on IPv6 addr', async () => { @@ -66,75 +82,63 @@ describe('listen', () => { return } const mh = multiaddr('/ip6/::/tcp/9090') - const listener = tcp.createListener((conn) => {}) + listener = tcp.createListener((conn) => {}) await listener.listen(mh) - await listener.close() }) it('listen on any Interface', async () => { const mh = multiaddr('/ip4/0.0.0.0/tcp/9090') - const listener = tcp.createListener((conn) => {}) + listener = tcp.createListener((conn) => {}) await listener.listen(mh) - await listener.close() }) it('getAddrs', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090') - const listener = tcp.createListener((conn) => {}) + listener = tcp.createListener((conn) => {}) await listener.listen(mh) const multiaddrs = listener.getAddrs() expect(multiaddrs.length).to.equal(1) expect(multiaddrs[0]).to.deep.equal(mh) - - await listener.close() }) it('getAddrs on port 0 listen', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/0') - const listener = tcp.createListener((conn) => {}) + listener = tcp.createListener((conn) => {}) await listener.listen(mh) const multiaddrs = listener.getAddrs() expect(multiaddrs.length).to.equal(1) - - await listener.close() }) it('getAddrs from listening on 0.0.0.0', async () => { const mh = multiaddr('/ip4/0.0.0.0/tcp/9090') - const listener = tcp.createListener((conn) => {}) + listener = tcp.createListener((conn) => {}) await listener.listen(mh) const multiaddrs = listener.getAddrs() expect(multiaddrs.length > 0).to.equal(true) expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1) - - await listener.close() }) it('getAddrs from listening on 0.0.0.0 and port 0', async () => { const mh = multiaddr('/ip4/0.0.0.0/tcp/0') - const listener = tcp.createListener((conn) => {}) + listener = tcp.createListener((conn) => {}) await listener.listen(mh) const multiaddrs = listener.getAddrs() expect(multiaddrs.length > 0).to.equal(true) expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1) - - await listener.close() }) it('getAddrs preserves IPFS Id', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const listener = tcp.createListener((conn) => {}) + listener = tcp.createListener((conn) => {}) await listener.listen(mh) const multiaddrs = listener.getAddrs() expect(multiaddrs.length).to.equal(1) expect(multiaddrs[0]).to.deep.equal(mh) - - await listener.close() }) }) @@ -192,6 +196,27 @@ describe('dial', () => { await listener.close() }) + // Windows doesn't support unix paths + skipOnWindows('dial on path', async () => { + const ma = multiaddr(`/unix${path.resolve(os.tmpdir(), '/tmp/p2pd.sock')}`) + + const listener = tcp.createListener((conn) => { + pipe(conn, conn) + }) + await listener.listen(ma) + + const connection = await tcp.dial(ma) + + const values = await pipe( + ['hey'], + connection, + collect + ) + + expect(values).to.be.eql([Buffer.from('hey')]) + await listener.close() + }) + it('dial and destroy on listener', async () => { let handled const handledPromise = new Promise((resolve) => {