Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

feat: add path multiaddr support #118

Merged
merged 6 commits into from
Dec 6, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,23 @@
"npm": ">=3.0.0"
},
"devDependencies": {
"aegir": "^20.3.1",
"aegir": "^20.4.1",
"chai": "^4.2.0",
"dirty-chai": "^2.0.1",
"interface-transport": "^0.7.0",
"sinon": "^7.3.1"
"libp2p-interfaces": "^0.1.6",
"it-pipe": "^1.1.0",
"sinon": "^7.5.0",
"streaming-iterables": "^4.1.1"
},
"dependencies": {
"abortable-iterator": "^2.1.0",
"abortable-iterator": "^3.0.0",
"class-is": "^1.1.0",
"debug": "^4.1.1",
"err-code": "^2.0.0",
"libp2p-utils": "~0.1.0",
"mafmt": "^7.0.0",
"multiaddr": "^7.1.0",
"stream-to-it": "^0.1.1"
"multiaddr": "^7.2.1",
"stream-to-it": "^0.2.0"
},
"contributors": [
"Alan Shaw <alan.shaw@protocol.ai>",
Expand Down
9 changes: 5 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const errCode = require('err-code')
const log = require('debug')('libp2p:tcp')
const toConnection = require('./socket-to-conn')
const createListener = require('./listener')
const { multiaddrToNetConfig } = require('./utils')
const { AbortError } = require('abortable-iterator')
const { CODE_CIRCUIT, CODE_P2P } = require('./constants')
const assert = require('assert')
Expand Down Expand Up @@ -56,9 +57,9 @@ class TCP {

return new Promise((resolve, reject) => {
const start = Date.now()
const cOpts = ma.toOptions()
const cOpts = multiaddrToNetConfig(ma)

log('dialing %s:%s', cOpts.host, cOpts.port)
log('dialing %j', cOpts)
const rawSocket = net.connect(cOpts)

const onError = err => {
Expand All @@ -74,12 +75,12 @@ class TCP {
}

const onConnect = () => {
log('connection opened %s:%s', cOpts.host, cOpts.port)
log('connection opened %j', cOpts)
done()
}

const onAbort = () => {
log('connection aborted %s:%s', cOpts.host, cOpts.port)
log('connection aborted %j', cOpts)
rawSocket.destroy()
done(new AbortError())
}
Expand Down
41 changes: 9 additions & 32 deletions src/listener.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
'use strict'

const multiaddr = require('multiaddr')
const os = require('os')
const net = require('net')
const EventEmitter = require('events')
const log = require('debug')('libp2p:tcp:listener')
const toConnection = require('./socket-to-conn')
const { CODE_P2P } = require('./constants')
const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' }
const {
getMultiaddrs,
multiaddrToNetConfig
} = require('./utils')

module.exports = ({ handler, upgrader }, options) => {
const listener = new EventEmitter()
Expand All @@ -16,7 +17,7 @@ module.exports = ({ handler, upgrader }, options) => {
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => log('socket error', err))

const maConn = toConnection(socket)
const maConn = toConnection(socket, { listeningAddr })
log('new inbound connection %s', maConn.remoteAddr)

const conn = await upgrader.upgradeInbound(maConn)
Expand Down Expand Up @@ -56,8 +57,8 @@ module.exports = ({ handler, upgrader }, options) => {
}

return new Promise((resolve, reject) => {
const { host, port } = listeningAddr.toOptions()
server.listen(port, host, err => {
const options = multiaddrToNetConfig(listeningAddr)
server.listen(options, err => {
if (err) return reject(err)
log('Listening on %s', server.address())
resolve()
Expand All @@ -76,9 +77,9 @@ module.exports = ({ handler, upgrader }, options) => {
// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMulitaddrs('ip4', address.address, address.port))
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMulitaddrs('ip6', address.address, address.port))
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
}

return addrs.map(ma => peerId ? ma.encapsulate(`/p2p/${peerId}`) : ma)
Expand All @@ -87,30 +88,6 @@ module.exports = ({ handler, upgrader }, options) => {
return listener
}

function getMulitaddrs (proto, ip, port) {
const toMa = ip => multiaddr(`/${proto}/${ip}/tcp/${port}`)
return (isAnyAddr(ip) ? getNetworkAddrs(ProtoFamily[proto]) : [ip]).map(toMa)
}

function isAnyAddr (ip) {
return ['0.0.0.0', '::'].includes(ip)
}

/**
* @private
* @param {string} family One of ['IPv6', 'IPv4']
* @returns {string[]} an array of ip address strings
*/
function getNetworkAddrs (family) {
return Object.values(os.networkInterfaces()).reduce((addresses, netAddrs) => {
netAddrs.forEach(netAddr => {
// Add the ip of each matching network interface
if (netAddr.family === family) addresses.push(netAddr.address)
})
return addresses
}, [])
}

function trackConn (server, maConn) {
server.__connections.push(maConn)

Expand Down
11 changes: 10 additions & 1 deletion src/socket-to-conn.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ const { CLOSE_TIMEOUT } = require('./constants')
module.exports = (socket, options) => {
options = options || {}

// Check if we are connected on a unix path
if (options.listeningAddr && options.listeningAddr.getPath()) {
options.remoteAddr = options.listeningAddr
}

if (options.remoteAddr && options.remoteAddr.getPath()) {
options.localAddr = options.remoteAddr
}

const { sink, source } = toIterable.duplex(socket)
const maConn = {
async sink (source) {
Expand Down Expand Up @@ -40,7 +49,7 @@ module.exports = (socket, options) => {

conn: socket,

localAddr: toMultiaddr(socket.localAddress, socket.localPort),
localAddr: options.localAddr || toMultiaddr(socket.localAddress, socket.localPort),

// If the remote address was passed, use it - it may have the peer ID encapsulated
remoteAddr: options.remoteAddr || toMultiaddr(socket.remoteAddress, socket.remotePort),
Expand Down
46 changes: 46 additions & 0 deletions src/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'use strict'

const multiaddr = require('multiaddr')
const os = require('os')
const { resolve } = require('path')
const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' }

function multiaddrToNetConfig (addr) {
const listenPath = addr.getPath()
// unix socket listening
if (listenPath) {
return resolve(listenPath)
}
// tcp listening
return addr.toOptions()
}

function getMultiaddrs (proto, ip, port) {
const toMa = ip => multiaddr(`/${proto}/${ip}/tcp/${port}`)
return (isAnyAddr(ip) ? getNetworkAddrs(ProtoFamily[proto]) : [ip]).map(toMa)
}

function isAnyAddr (ip) {
return ['0.0.0.0', '::'].includes(ip)
}

/**
* @private
* @param {string} family One of ['IPv6', 'IPv4']
* @returns {string[]} an array of ip address strings
*/
function getNetworkAddrs (family) {
return Object.values(os.networkInterfaces()).reduce((addresses, netAddrs) => {
netAddrs.forEach(netAddr => {
// Add the ip of each matching network interface
if (netAddr.family === family) addresses.push(netAddr.address)
})
return addresses
}, [])
}

module.exports = {
multiaddrToNetConfig,
isAnyAddr,
getMultiaddrs
}
2 changes: 1 addition & 1 deletion test/compliance.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
'use strict'

const sinon = require('sinon')
const tests = require('interface-transport')
const tests = require('libp2p-interfaces/src/transport/tests')
const multiaddr = require('multiaddr')
const net = require('net')
const TCP = require('../src')
Expand Down
69 changes: 47 additions & 22 deletions test/listen-dial.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ const expect = chai.expect
chai.use(dirtyChai)
const TCP = require('../src')
const net = require('net')
const os = require('os')
const path = require('path')
const multiaddr = require('multiaddr')
const pipe = require('it-pipe')
const { collect, map } = require('streaming-iterables')
const isCI = process.env.CI
const isWindows = os.platform() === 'win32'

describe('construction', () => {
it('requires an upgrader', () => {
Expand All @@ -20,6 +23,7 @@ describe('construction', () => {

describe('listen', () => {
let tcp
let listener

beforeEach(() => {
tcp = new TCP({
Expand All @@ -29,10 +33,13 @@ describe('listen', () => {
}
})
})
afterEach(async () => {
listener && await listener.close()
})

it('close listener with connections, through timeout', async () => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')
const listener = tcp.createListener((conn) => {
listener = tcp.createListener((conn) => {
pipe(conn, conn)
})

Expand All @@ -54,87 +61,83 @@ describe('listen', () => {
})
})

it('listen on path', async function () {
// Windows doesn't support unix paths
if (isWindows) return this.skip()
const mh = multiaddr(`/unix${path.resolve(os.tmpdir(), '/tmp/p2pd.sock')}`)

listener = tcp.createListener((conn) => {})
await listener.listen(mh)
})

it('listen on port 0', async () => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/0')
const listener = tcp.createListener((conn) => {})
listener = tcp.createListener((conn) => {})
await listener.listen(mh)
await listener.close()
})

it('listen on IPv6 addr', async () => {
if (isCI) {
return
}
const mh = multiaddr('/ip6/::/tcp/9090')
const listener = tcp.createListener((conn) => {})
listener = tcp.createListener((conn) => {})
await listener.listen(mh)
await listener.close()
})

it('listen on any Interface', async () => {
const mh = multiaddr('/ip4/0.0.0.0/tcp/9090')
const listener = tcp.createListener((conn) => {})
listener = tcp.createListener((conn) => {})
await listener.listen(mh)
await listener.close()
})

it('getAddrs', async () => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090')
const listener = tcp.createListener((conn) => {})
listener = tcp.createListener((conn) => {})
await listener.listen(mh)

const multiaddrs = listener.getAddrs()
expect(multiaddrs.length).to.equal(1)
expect(multiaddrs[0]).to.deep.equal(mh)

await listener.close()
})

it('getAddrs on port 0 listen', async () => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/0')
const listener = tcp.createListener((conn) => {})
listener = tcp.createListener((conn) => {})
await listener.listen(mh)

const multiaddrs = listener.getAddrs()
expect(multiaddrs.length).to.equal(1)

await listener.close()
})

it('getAddrs from listening on 0.0.0.0', async () => {
const mh = multiaddr('/ip4/0.0.0.0/tcp/9090')
const listener = tcp.createListener((conn) => {})
listener = tcp.createListener((conn) => {})
await listener.listen(mh)

const multiaddrs = listener.getAddrs()
expect(multiaddrs.length > 0).to.equal(true)
expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1)

await listener.close()
})

it('getAddrs from listening on 0.0.0.0 and port 0', async () => {
const mh = multiaddr('/ip4/0.0.0.0/tcp/0')
const listener = tcp.createListener((conn) => {})
listener = tcp.createListener((conn) => {})
await listener.listen(mh)

const multiaddrs = listener.getAddrs()
expect(multiaddrs.length > 0).to.equal(true)
expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1)

await listener.close()
})

it('getAddrs preserves IPFS Id', async () => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')
const listener = tcp.createListener((conn) => {})
listener = tcp.createListener((conn) => {})
await listener.listen(mh)

const multiaddrs = listener.getAddrs()
expect(multiaddrs.length).to.equal(1)
expect(multiaddrs[0]).to.deep.equal(mh)

await listener.close()
})
})

Expand Down Expand Up @@ -192,6 +195,28 @@ describe('dial', () => {
await listener.close()
})

it('dial on path', async () => {
// Windows doesn't support unix paths
if (isWindows) return this.skip()
jacobheun marked this conversation as resolved.
Show resolved Hide resolved
const ma = multiaddr(`/unix${path.resolve(os.tmpdir(), '/tmp/p2pd.sock')}`)

const listener = tcp.createListener((conn) => {
pipe(conn, conn)
})
await listener.listen(ma)

const connection = await tcp.dial(ma)

const values = await pipe(
['hey'],
connection,
collect
)

expect(values).to.be.eql([Buffer.from('hey')])
await listener.close()
})

it('dial and destroy on listener', async () => {
let handled
const handledPromise = new Promise((resolve) => {
Expand Down