Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat(pull): migrate to pull streams
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire authored and daviddias committed Sep 5, 2016
1 parent 3c3a707 commit 3f58dca
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 199 deletions.
4 changes: 3 additions & 1 deletion gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

const gulp = require('gulp')
const multiaddr = require('multiaddr')
const pull = require('pull-stream')

const WS = require('./src')

let listener
Expand All @@ -10,7 +12,7 @@ gulp.task('test:browser:before', (done) => {
const ws = new WS()
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
listener = ws.createListener((conn) => {
conn.pipe(conn)
pull(conn, conn)
})
listener.listen(ma, done)
})
Expand Down
14 changes: 7 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@
"homepage": "https://github.com/libp2p/js-libp2p-websockets#readme",
"dependencies": {
"detect-node": "^2.0.3",
"interface-connection": "^0.1.8",
"interface-connection": "^0.2.1",
"lodash.contains": "^2.4.3",
"mafmt": "^2.1.1",
"run-parallel": "^1.1.6",
"simple-websocket": "^4.1.0",
"simple-websocket-server": "^0.1.4"
"pull-ws": "^3.2.3"
},
"devDependencies": {
"aegir": "^6.0.1",
"multiaddr": "^2.0.2",
"chai": "^3.5.0",
"gulp": "^3.9.1",
"interface-transport": "^0.2.0",
"pre-commit": "^1.1.3"
"interface-transport": "^0.3.1",
"multiaddr": "^2.0.2",
"pre-commit": "^1.1.3",
"pull-goodbye": "0.0.1",
"pull-stream": "^3.4.3"
},
"contributors": [
"David Dias <daviddias.p@gmail.com>",
Expand Down
130 changes: 19 additions & 111 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,147 +1,55 @@
'use strict'

const debug = require('debug')
const log = debug('libp2p:websockets')
const SW = require('simple-websocket')
const isNode = require('detect-node')
let SWS
if (isNode) {
SWS = require('simple-websocket-server')
} else {
SWS = {}
}
const connect = require('pull-ws/client')
const mafmt = require('mafmt')
const contains = require('lodash.contains')
const Connection = require('interface-connection').Connection
const debug = require('debug')
const log = debug('libp2p:websockets:dialer')

const CLOSE_TIMEOUT = 2000
// const IPFS_CODE = 421

exports = module.exports = WebSockets

function WebSockets () {
if (!(this instanceof WebSockets)) {
return new WebSockets()
}
const createListener = require('./listener')

this.dial = function (ma, options, callback) {
module.exports = class WebSockets {
dial (ma, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}

if (!callback) {
callback = function noop () {}
callback = () => {}
}

const maOpts = ma.toOptions()

const socket = new SW('ws://' + maOpts.host + ':' + maOpts.port)

const conn = new Connection(socket)

socket.on('timeout', () => {
conn.emit('timeout')
})

socket.on('error', (err) => {
callback(err)
conn.emit('error', err)
})

socket.on('connect', () => {
callback(null, conn)
conn.emit('connect')
const url = `ws://${maOpts.host}:${maOpts.port}`
log('dialing %s', url)
const socket = connect(url, {
binary: true,
onConnect: callback
})

conn.getObservedAddrs = (cb) => {
return cb(null, [ma])
}
const conn = new Connection(socket)
conn.getObservedAddrs = (cb) => cb(null, [ma])
conn.close = (cb) => socket.close(cb)

return conn
}

this.createListener = (options, handler) => {
createListener (options, handler) {
if (typeof options === 'function') {
handler = options
options = {}
}

const listener = SWS.createServer((socket) => {
const conn = new Connection(socket)

conn.getObservedAddrs = (cb) => {
// TODO research if we can reuse the address in anyway
return cb(null, [])
}
handler(conn)
})

let listeningMultiaddr

listener._listen = listener.listen
listener.listen = (ma, callback) => {
if (!callback) {
callback = function noop () {}
}

listeningMultiaddr = ma

if (contains(ma.protoNames(), 'ipfs')) {
ma = ma.decapsulate('ipfs')
}

listener._listen(ma.toOptions(), callback)
}

listener._close = listener.close
listener.close = (options, callback) => {
if (typeof options === 'function') {
callback = options
options = { timeout: CLOSE_TIMEOUT }
}
if (!callback) { callback = function noop () {} }
if (!options) { options = { timeout: CLOSE_TIMEOUT } }

let closed = false
listener.once('close', () => {
closed = true
})
listener._close(callback)
setTimeout(() => {
if (closed) {
return
}
log('unable to close graciously, destroying conns')
Object.keys(listener.__connections).forEach((key) => {
log('destroying %s', key)
listener.__connections[key].destroy()
})
}, options.timeout || CLOSE_TIMEOUT)
}

// Keep track of open connections to destroy in case of timeout
listener.__connections = {}
listener.on('connection', (socket) => {
const key = (~~(Math.random() * 1e9)).toString(36) + Date.now()
listener.__connections[key] = socket

socket.on('close', () => {
delete listener.__connections[key]
})
})

listener.getAddrs = (callback) => {
callback(null, [listeningMultiaddr])
}

return listener
return createListener(options, handler)
}

this.filter = (multiaddrs) => {
filter (multiaddrs) {
if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
}

return multiaddrs.filter((ma) => {
if (contains(ma.protoNames(), 'ipfs')) {
ma = ma.decapsulate('ipfs')
Expand Down
46 changes: 46 additions & 0 deletions src/listener.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'use strict'

const isNode = require('detect-node')
const Connection = require('interface-connection').Connection
const contains = require('lodash.contains')

// const IPFS_CODE = 421

let createServer

if (isNode) {
createServer = require('pull-ws/server')
} else {
createServer = () => {}
}

module.exports = (options, handler) => {
const listener = createServer((socket) => {
socket.getObservedAddrs = (cb) => {
// TODO research if we can reuse the address in anyway
return cb(null, [])
}

handler(new Connection(socket))
})

let listeningMultiaddr

listener._listen = listener.listen
listener.listen = (ma, cb) => {
cb = cb || (() => {})
listeningMultiaddr = ma

if (contains(ma.protoNames(), 'ipfs')) {
ma = ma.decapsulate('ipfs')
}

listener._listen(ma.toOptions(), cb)
}

listener.getAddrs = (cb) => {
cb(null, [listeningMultiaddr])
}

return listener
}
87 changes: 40 additions & 47 deletions test/browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,74 +3,67 @@

const expect = require('chai').expect
const multiaddr = require('multiaddr')
const pull = require('pull-stream')
const goodbye = require('pull-goodbye')

const WS = require('../src')

describe('libp2p-websockets', () => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
let ws
let conn

it('create', (done) => {
beforeEach((done) => {
ws = new WS()
expect(ws).to.exist
done()
conn = ws.dial(ma, done)
})

it('echo', (done) => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
const conn = ws.dial(ma)
const message = 'Hello World!'
conn.write(message)
conn.on('data', (data) => {
expect(data.toString()).to.equal(message)
conn.end()
done()
})
})

describe('stress', () => {
it('one big write', (done) => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
const conn = ws.dial(mh)
const message = new Buffer(1000000).fill('a').toString('hex')
conn.write(message)
conn.on('data', (data) => {
expect(data.toString()).to.equal(message)
conn.end()
const s = goodbye({
source: pull.values([message]),
sink: pull.collect((err, results) => {
expect(err).to.not.exist
expect(results).to.be.eql([message])
done()
})
})

it('many writes in 2 batches', (done) => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
const conn = ws.dial(mh)
let expected = ''
let counter = 0
while (++counter < 10000) {
conn.write(`${counter} `)
expected += `${counter} `
}

setTimeout(() => {
while (++counter < 20000) {
conn.write(`${counter} `)
expected += `${counter} `
}
pull(s, conn, s)
})

conn.write('STOP')
}, 1000)
describe('stress', () => {
it('one big write', (done) => {
const rawMessage = new Buffer(1000000).fill('a')

let result = ''
conn.on('data', (data) => {
if (data.toString() === 'STOP') {
conn.end()
return
}
result += data.toString()
const s = goodbye({
source: pull.values([rawMessage]),
sink: pull.collect((err, results) => {
expect(err).to.not.exist
expect(results).to.be.eql([rawMessage])
done()
})
})
pull(s, conn, s)
})

conn.on('end', () => {
expect(result).to.equal(expected)
done()
it('many writes', (done) => {
const s = goodbye({
source: pull(
pull.infinite(),
pull.take(1000),
pull.map((val) => Buffer(val.toString()))
),
sink: pull.collect((err, result) => {
expect(err).to.not.exist
expect(result).to.have.length(1000)
done()
})
})

pull(s, conn, s)
})
})
})
23 changes: 23 additions & 0 deletions test/compliance.node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/* eslint-env mocha */
'use strict'

const tests = require('interface-transport')
const multiaddr = require('multiaddr')
const Ws = require('../src')

describe('compliance', () => {
tests({
setup (cb) {
let ws = new Ws()
const addrs = [
multiaddr('/ip4/127.0.0.1/tcp/9091/ws'),
multiaddr('/ip4/127.0.0.1/tcp/9092/ws'),
multiaddr('/ip4/127.0.0.1/tcp/9093/ws')
]
cb(null, ws, addrs)
},
teardown (cb) {
cb()
}
})
})
Loading

0 comments on commit 3f58dca

Please sign in to comment.