Skip to content

Commit

Permalink
feat: add support for message signing
Browse files Browse the repository at this point in the history
BREAKING CHANGE: as .publish should now sign messages (via _buildMessage) it now requires a callback since signing is async. This also adds an options param to the pubsub constructor to allow for disabling signing. While this change shouldnt break things upstream, implementations need to be sure to call _buildMessage for each message they will publish.
  • Loading branch information
jacobheun committed May 6, 2019
1 parent dda1894 commit 5cb17fd
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 22 deletions.
32 changes: 30 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const errcode = require('err-code')

const Peer = require('./peer')
const message = require('./message')
const { signMessage } = require('./message/sign')
const utils = require('./utils')

const nextTick = require('async/nextTick')
Expand All @@ -22,17 +23,28 @@ class PubsubBaseProtocol extends EventEmitter {
* @param {String} debugName
* @param {String} multicodec
* @param {Object} libp2p libp2p implementation
* @param {Object} options
* @param {boolean} options.signMessages if messages should be signed, defaults to true
* @constructor
*/
constructor (debugName, multicodec, libp2p) {
constructor (debugName, multicodec, libp2p, options) {
super()

options = {
signMessages: true,
...options
}

this.log = debug(debugName)
this.log.err = debug(`${debugName}:error`)
this.multicodec = multicodec
this.libp2p = libp2p
this.started = false

if (options.signMessages) {
this.peerId = this.libp2p.peerInfo.id
}

/**
* Map of topics to which peers are subscribed to
*
Expand Down Expand Up @@ -225,16 +237,32 @@ class PubsubBaseProtocol extends EventEmitter {
this._removePeer(peer)
}

/**
* Normalizes the message and signs it, if signing is enabled
*
* @param {Message} message
* @param {function(Error, Message)} callback
*/
_buildMessage (message, callback) {
const msg = utils.normalizeOutRpcMessage(message)
if (this.peerId) {
signMessage(this.peerId, msg, callback)
} else {
nextTick(callback, null, msg)
}
}

/**
* Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation.
* For example, a Floodsub implementation might simply publish each message to each topic for every peer
* @abstract
* @param {Array<string>|string} topics
* @param {Array<any>|any} messages
* @param {function(Error)} callback
* @returns {undefined}
*
*/
publish (topics, messages) {
publish (topics, messages, callback) {
throw errcode('publish must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED')
}

Expand Down
4 changes: 4 additions & 0 deletions src/message/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
const protons = require('protons')

const rpcProto = protons(require('./rpc.proto.js'))
const RPC = rpcProto.RPC
const topicDescriptorProto = protons(require('./topic-descriptor.proto.js'))

exports = module.exports
exports.rpc = rpcProto
exports.td = topicDescriptorProto
exports.RPC = RPC
exports.Message = RPC.Message
exports.SubOpts = RPC.SubOpts
4 changes: 3 additions & 1 deletion src/message/rpc.proto.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ message RPC {
optional bytes from = 1;
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topicIDs = 4;
repeated string topicIDs = 4;
optional bytes signature = 5;
optional bytes key = 6;
}
}`
33 changes: 33 additions & 0 deletions src/message/sign.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict'

const { Message } = require('./index')
const SignPrefix = Buffer.from('libp2p-pubsub:')

module.exports.SignPrefix = SignPrefix

/**
* Signs the provided message with the given `peerId`
*
* @param {PeerId} peerId
* @param {Message} message
* @param {function(Error, Message)} callback
* @returns {void}
*/
module.exports.signMessage = function (peerId, message, callback) {
// Get the message in bytes, and prepend with the pubsub prefix
const bytes = Buffer.concat([
SignPrefix,
Message.encode(message)
])

// Sign the bytes with the private key
peerId.privKey.sign(bytes, (err, signature) => {
if (err) return callback(err)

callback(null, {
...message,
signature: signature,
key: peerId.pubKey.bytes
})
})
}
6 changes: 3 additions & 3 deletions src/peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const pull = require('pull-stream')
const setImmediate = require('async/setImmediate')
const EventEmitter = require('events')

const rpc = require('./message').rpc.RPC
const { RPC } = require('./message')

/**
* The known state of a connected peer.
Expand Down Expand Up @@ -109,7 +109,7 @@ class Peer extends EventEmitter {
})
})

this.write(rpc.encode({
this.write(RPC.encode({
subscriptions: subs
}))
}
Expand Down Expand Up @@ -139,7 +139,7 @@ class Peer extends EventEmitter {
* @returns {undefined}
*/
sendMessages (msgs) {
this.write(rpc.encode({
this.write(RPC.encode({
msgs: msgs
}))
}
Expand Down
16 changes: 9 additions & 7 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,17 @@ exports.normalizeInRpcMessages = (messages) => {
})
}

exports.normalizeOutRpcMessage = (message) => {
const m = Object.assign({}, message)
if (typeof message.from === 'string' || message.from instanceof String) {
m.from = bs58.decode(message.from)
}
return m
}

exports.normalizeOutRpcMessages = (messages) => {
if (!messages) {
return messages
}
return messages.map((msg) => {
const m = Object.assign({}, msg)
if (typeof msg.from === 'string' || msg.from instanceof String) {
m.from = bs58.decode(msg.from)
}
return m
})
return messages.map(exports.normalizeOutRpcMessage)
}
44 changes: 35 additions & 9 deletions test/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ const expect = chai.expect
const series = require('async/series')
const parallel = require('async/parallel')

const { Message } = require('../src/message')
const { SignPrefix } = require('../src/message/sign')
const PubsubBaseProtocol = require('../src')
const { randomSeqno, normalizeOutRpcMessage } = require('../src/utils')
const utils = require('./utils')
const createNode = utils.createNode

Expand Down Expand Up @@ -55,14 +58,7 @@ describe('pubsub base protocol', () => {
})
})

after((done) => {
parallel([
(cb) => nodeA.stop(cb),
(cb) => nodeB.stop(cb)
], done)
})

it('mount the pubsub protocol', (done) => {
before('mount the pubsub protocol', (done) => {
psA = new PubsubImplementation(nodeA)
psB = new PubsubImplementation(nodeB)

Expand All @@ -73,13 +69,20 @@ describe('pubsub base protocol', () => {
}, 50)
})

it('start both Pubsub', (done) => {
before('start both Pubsub', (done) => {
parallel([
(cb) => psA.start(cb),
(cb) => psB.start(cb)
], done)
})

after((done) => {
parallel([
(cb) => nodeA.stop(cb),
(cb) => nodeB.stop(cb)
], done)
})

it('Dial from nodeA to nodeB', (done) => {
series([
(cb) => nodeA.dial(nodeB.peerInfo, cb),
Expand All @@ -90,6 +93,29 @@ describe('pubsub base protocol', () => {
}, 1000)
], done)
})

it('_buildMessage normalizes and signs messages', (done) => {
const message = {
from: 'QmABC',
data: 'hello',
seqno: randomSeqno(),
topicIDs: ['test-topic']
}

psA._buildMessage(message, (err, signedMessage) => {
expect(err).to.not.exist()

const bytesToSign = Buffer.concat([
SignPrefix,
Message.encode(normalizeOutRpcMessage(message))
])

psA.peerId.pubKey.verify(bytesToSign, signedMessage.signature, (err, verified) => {
expect(verified).to.eql(true)
done(err)
})
})
})
})

describe('dial the pubsub protocol on mount', () => {
Expand Down
53 changes: 53 additions & 0 deletions test/sign.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 5] */
'use strict'

const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect

const { Message } = require('../src/message')
const { signMessage, SignPrefix } = require('../src/message/sign')
const PeerId = require('peer-id')
const { randomSeqno } = require('../src/utils')

describe('message signing', () => {
let peerId
before((done) => {
peerId = PeerId.create({
bits: 1024
}, (err, id) => {
peerId = id
done(err)
})
})

it('should be able to sign a message', (done) => {
const message = {
from: 'QmABC',
data: 'hello',
seqno: randomSeqno(),
topicIDs: ['test-topic']
}

const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)])

peerId.privKey.sign(bytesToSign, (err, expectedSignature) => {
if (err) return done(err)

signMessage(peerId, message, (err, signedMessage) => {
if (err) return done(err)

// Check the signature and public key
expect(signedMessage.signature).to.eql(expectedSignature)
expect(signedMessage.key).to.eql(peerId.pubKey.bytes)

// Verify the signature
peerId.pubKey.verify(bytesToSign, signedMessage.signature, (err, verified) => {
expect(verified).to.eql(true)
done(err)
})
})
})
})
})

0 comments on commit 5cb17fd

Please sign in to comment.