Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use libp2p 0.28.x #217

Merged
merged 8 commits into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
"iso-random-stream": "^1.1.1",
"it-all": "^1.0.2",
"it-drain": "^1.0.1",
"libp2p": "^0.27.0",
"libp2p-kad-dht": "^0.18.3",
"libp2p": "^0.28.0",
"libp2p-kad-dht": "^0.19.1",
"libp2p-mplex": "^0.9.2",
"libp2p-secio": "^0.12.1",
"libp2p-tcp": "^0.14.2",
Expand All @@ -65,9 +65,7 @@
"p-defer": "^3.0.0",
"p-event": "^4.1.0",
"p-wait-for": "^3.1.0",
"peer-book": "~0.9.0",
"peer-id": "^0.13.5",
"peer-info": "^0.17.0",
"promisify-es6": "^1.0.3",
"rimraf": "^3.0.0",
"sinon": "^9.0.0",
Expand All @@ -84,6 +82,7 @@
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.1.0",
"just-debounce-it": "^1.1.0",
"libp2p-interfaces": "^0.3.0",
"moving-average": "^1.0.0",
"multicodec": "^1.0.0",
"multihashing-async": "^0.8.0",
Expand Down
12 changes: 6 additions & 6 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const statsKeys = [
class Bitswap {
constructor (libp2p, blockstore, options) {
this._libp2p = libp2p
this._log = logger(this.peerInfo.id)
this._log = logger(this.peerId)

this._options = Object.assign({}, defaultOptions, options)

Expand All @@ -54,16 +54,16 @@ class Bitswap {
// local database
this.blockstore = blockstore

this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network, this._stats)
this.engine = new DecisionEngine(this.peerId, blockstore, this.network, this._stats)

// handle message sending
this.wm = new WantManager(this.peerInfo.id, this.network, this._stats)
this.wm = new WantManager(this.peerId, this.network, this._stats)

this.notifications = new Notifications(this.peerInfo.id)
this.notifications = new Notifications(this.peerId)
}

get peerInfo () {
return this._libp2p.peerInfo
get peerId () {
return this._libp2p.peerId
}

// handle messages received through the network
Expand Down
35 changes: 22 additions & 13 deletions src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
const lp = require('it-length-prefixed')
const pipe = require('it-pipe')

const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology')

const Message = require('./types/message')
const CONSTANTS = require('./constants')
const logger = require('./utils').logger
Expand All @@ -13,7 +15,7 @@ const BITSWAP120 = '/ipfs/bitswap/1.2.0'

class Network {
constructor (libp2p, bitswap, options, stats) {
this._log = logger(libp2p.peerInfo.id, 'network')
this._log = logger(libp2p.peerId, 'network')
options = options || {}
this.libp2p = libp2p
this.bitswap = bitswap
Expand All @@ -37,14 +39,21 @@ class Network {
this._running = true
this.libp2p.handle(this.protocols, this._onConnection)

this.libp2p.on('peer:connect', this._onPeerConnect)
this.libp2p.on('peer:disconnect', this._onPeerDisconnect)
// register protocol with topology
const topology = new MulticodecTopology({
multicodecs: this.protocols,
handlers: {
onConnect: this._onPeerConnect,
onDisconnect: this._onPeerDisconnect
}
})
this._registrarId = this.libp2p.registrar.register(topology)

// All existing connections are like new ones for us
for (const peer of this.libp2p.peerStore.peers.values()) {
if (this.libp2p.registrar.getConnection(peer)) {
this._onPeerConnect(peer)
}
const conn = this.libp2p.connectionManager.get(peer.id)

conn && this._onPeerConnect(conn)
}
}

Expand All @@ -54,8 +63,8 @@ class Network {
// Unhandle both, libp2p doesn't care if it's not already handled
this.libp2p.unhandle(this.protocols)

this.libp2p.removeListener('peer:connect', this._onPeerConnect)
this.libp2p.removeListener('peer:disconnect', this._onPeerDisconnect)
// unregister protocol and handlers
this.libp2p.registrar.unregister(this._registrarId)
}

/**
Expand Down Expand Up @@ -92,12 +101,12 @@ class Network {
}
}

_onPeerConnect (peerInfo) {
this.bitswap._onPeerConnected(peerInfo.id)
_onPeerConnect (peerId) {
this.bitswap._onPeerConnected(peerId)
}

_onPeerDisconnect (peerInfo) {
this.bitswap._onPeerDisconnected(peerInfo.id)
_onPeerDisconnect (peerId) {
this.bitswap._onPeerDisconnected(peerId)
}

/**
Expand Down Expand Up @@ -181,7 +190,7 @@ class Network {
/**
* Connects to another peer
*
* @param {PeerInfo|PeerId|Multiaddr} peer
* @param {PeerId|Multiaddr} peer
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {Promise<Connection>}
Expand Down
5 changes: 3 additions & 2 deletions test/bitswap-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ describe('bitswap stats', () => {
bs2 = bitswaps[1]
bs2.start()

await libp2pNodes[0].dial(libp2pNodes[1].peerInfo)
const ma = `${libp2pNodes[1].multiaddrs[0]}/p2p/${libp2pNodes[1].peerId.toB58String()}`
await libp2pNodes[0].dial(ma)

block = await makeBlock()

Expand Down Expand Up @@ -212,7 +213,7 @@ describe('bitswap stats', () => {
})

it('has peer stats', async () => {
const peerStats = bs2.stat().forPeer(libp2pNodes[0].peerInfo.id)
const peerStats = bs2.stat().forPeer(libp2pNodes[0].peerId)
expect(peerStats).to.exist()

const stats = await pEvent(peerStats, 'update')
Expand Down
28 changes: 21 additions & 7 deletions test/bitswap.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
'use strict'

const { expect } = require('aegir/utils/chai')
const delay = require('delay')
const PeerId = require('peer-id')
const sinon = require('sinon')
const pWaitFor = require('p-wait-for')

const Bitswap = require('../src')

Expand Down Expand Up @@ -38,9 +38,12 @@ describe('bitswap without DHT', function () {
])

// connect 0 -> 1 && 1 -> 2
const ma1 = `${nodes[1].libp2pNode.multiaddrs[0]}/p2p/${nodes[1].libp2pNode.peerId.toB58String()}`
const ma2 = `${nodes[2].libp2pNode.multiaddrs[0]}/p2p/${nodes[2].libp2pNode.peerId.toB58String()}`

await Promise.all([
nodes[0].libp2pNode.dial(nodes[1].libp2pNode.peerInfo),
nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo)
nodes[0].libp2pNode.dial(ma1),
nodes[1].libp2pNode.dial(ma2)
])
})

Expand Down Expand Up @@ -132,9 +135,19 @@ describe('bitswap with DHT', function () {
])

// connect 0 -> 1 && 1 -> 2
const ma1 = `${nodes[1].libp2pNode.multiaddrs[0]}/p2p/${nodes[1].libp2pNode.peerId.toB58String()}`
const ma2 = `${nodes[2].libp2pNode.multiaddrs[0]}/p2p/${nodes[2].libp2pNode.peerId.toB58String()}`

await Promise.all([
nodes[0].libp2pNode.dial(ma1),
nodes[1].libp2pNode.dial(ma2)
])

// await dht routing table are updated
await Promise.all([
nodes[0].libp2pNode.dial(nodes[1].libp2pNode.peerInfo),
nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo)
pWaitFor(() => nodes[0].libp2pNode._dht.routingTable.size >= 1),
pWaitFor(() => nodes[1].libp2pNode._dht.routingTable.size >= 2),
pWaitFor(() => nodes[2].libp2pNode._dht.routingTable.size >= 1)
])
})

Expand All @@ -148,10 +161,11 @@ describe('bitswap with DHT', function () {

it('put a block in 2, get it in 0', async () => {
const block = await makeBlock()
const provideSpy = sinon.spy(nodes[2].libp2pNode._dht, 'provide')
await nodes[2].bitswap.put(block)

// Give put time to process
await delay(100)
// wait for the DHT to finish providing
await provideSpy.returnValues[0]

const blockRetrieved = await nodes[0].bitswap.get(block.cid)
expect(block.data).to.eql(blockRetrieved.data)
Expand Down
29 changes: 17 additions & 12 deletions test/network/network.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ describe('network', () => {

it('connectTo fail', async () => {
try {
await networkA.connectTo(p2pB.peerInfo.id)
await networkA.connectTo(p2pB.peerId)
assert.fail()
} catch (err) {
expect(err).to.exist()
Expand All @@ -87,24 +87,26 @@ describe('network', () => {
var counter = 0

bitswapMockA._onPeerConnected = (peerId) => {
expect(peerId.toB58String()).to.equal(p2pB.peerInfo.id.toB58String())
expect(peerId.toB58String()).to.equal(p2pB.peerId.toB58String())
counter++
}

bitswapMockB._onPeerConnected = (peerId) => {
expect(peerId.toB58String()).to.equal(p2pA.peerInfo.id.toB58String())
expect(peerId.toB58String()).to.equal(p2pA.peerId.toB58String())
counter++
}

await p2pA.dial(p2pB.peerInfo)
const ma = `${p2pB.multiaddrs[0]}/p2p/${p2pB.peerId.toB58String()}`
await p2pA.dial(ma)

await pWaitFor(() => counter >= 2)
bitswapMockA._onPeerConnected = () => {}
bitswapMockB._onPeerConnected = () => {}
})

it('connectTo success', async () => {
await networkA.connectTo(p2pB.peerInfo)
const ma = `${p2pB.multiaddrs[0]}/p2p/${p2pB.peerId.toB58String()}`
await networkA.connectTo(ma)
})

const versions = [{
Expand Down Expand Up @@ -134,7 +136,8 @@ describe('network', () => {

bitswapMockB._receiveError = (err) => deferred.reject(err)

const { stream } = await p2pA.dialProtocol(p2pB.peerInfo, '/ipfs/bitswap/' + version.num)
const ma = `${p2pB.multiaddrs[0]}/p2p/${p2pB.peerId.toB58String()}`
const { stream } = await p2pA.dialProtocol(ma, '/ipfs/bitswap/' + version.num)
await pipe(
[version.serialize(msg)],
lp.encode(),
Expand Down Expand Up @@ -165,11 +168,12 @@ describe('network', () => {

bitswapMockB._receiveError = deferred.reject

await networkA.sendMessage(p2pB.peerInfo.id, msg)
await networkA.sendMessage(p2pB.peerId, msg)
})

it('dial to peer on Bitswap 1.0.0', async () => {
const { protocol } = await p2pA.dialProtocol(p2pC.peerInfo, ['/ipfs/bitswap/1.1.0', '/ipfs/bitswap/1.0.0'])
const ma = `${p2pC.multiaddrs[0]}/p2p/${p2pC.peerId.toB58String()}`
const { protocol } = await p2pA.dialProtocol(ma, ['/ipfs/bitswap/1.1.0', '/ipfs/bitswap/1.0.0'])

expect(protocol).to.equal('/ipfs/bitswap/1.0.0')
})
Expand All @@ -194,7 +198,7 @@ describe('network', () => {

bitswapMockC._receiveError = deferred.reject

await networkA.sendMessage(p2pC.peerInfo.id, msg)
await networkA.sendMessage(p2pC.peerId, msg)
await deferred.promise
})

Expand All @@ -208,16 +212,17 @@ describe('network', () => {
networkA.start()
networkB.start()

// FIXME: have to already be connected as sendMessage only accepts a peer id, not a PeerInfo
await p2pA.dial(p2pB.peerInfo)
// In a real network scenario, peers will be discovered and their addresses
// will be added to the addressBook before bitswap kicks in
p2pA.peerStore.addressBook.set(p2pB.peerId, p2pB.multiaddrs)

const deferred = pDefer()

bitswapMockB._receiveMessage = () => {
deferred.resolve()
}

await networkA.sendMessage(p2pB.peerInfo.id, new Message(true))
await networkA.sendMessage(p2pB.peerId, new Message(true))

return deferred
})
Expand Down
2 changes: 1 addition & 1 deletion test/utils/connect-all.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const without = require('lodash.without')
module.exports = async (nodes) => {
for (const node of nodes) {
for (const otherNode of without(nodes, node)) {
await node.libp2pNode.dial(otherNode.bitswap.peerInfo)
await node.libp2pNode.dial(otherNode.bitswap.peerId)
}
}
}
13 changes: 8 additions & 5 deletions test/utils/create-libp2p-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ const MPLEX = require('libp2p-mplex')
const SECIO = require('libp2p-secio')
const libp2p = require('libp2p')
const KadDHT = require('libp2p-kad-dht')
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')

const defaultsDeep = require('@nodeutils/defaults-deep')

class Node extends libp2p {
Expand Down Expand Up @@ -38,10 +38,13 @@ class Node extends libp2p {

async function createLibp2pNode (options = {}) {
const id = await PeerId.create({ bits: 512 })
const peerInfo = new PeerInfo(id)
peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/0')
options.peerInfo = peerInfo
const node = new Node(options)
const node = new Node({
peerId: id,
addresses: {
listen: ['/ip4/0.0.0.0/tcp/0']
},
...options
})
await node.start()

return node
Expand Down
Loading