diff --git a/src/index.js b/src/index.js index 1a9c0cd714..271cf3e105 100644 --- a/src/index.js +++ b/src/index.js @@ -10,6 +10,7 @@ const errcode = require('err-code') const Peer = require('./peer') const message = require('./message') +const { signMessage } = require('./message/sign') const utils = require('./utils') const nextTick = require('async/nextTick') @@ -22,17 +23,28 @@ class PubsubBaseProtocol extends EventEmitter { * @param {String} debugName * @param {String} multicodec * @param {Object} libp2p libp2p implementation + * @param {Object} options + * @param {boolean} options.signMessages if messages should be signed, defaults to true * @constructor */ - constructor (debugName, multicodec, libp2p) { + constructor (debugName, multicodec, libp2p, options) { super() + options = { + signMessages: true, + ...options + } + this.log = debug(debugName) this.log.err = debug(`${debugName}:error`) this.multicodec = multicodec this.libp2p = libp2p this.started = false + if (options.signMessages) { + this.peerId = this.libp2p.peerInfo.id + } + /** * Map of topics to which peers are subscribed to * @@ -225,16 +237,32 @@ class PubsubBaseProtocol extends EventEmitter { this._removePeer(peer) } + /** + * Normalizes the message and signs it, if signing is enabled + * + * @param {Message} message + * @param {function(Error, Message)} callback + */ + _buildMessage (message, callback) { + const msg = utils.normalizeOutRpcMessage(message) + if (this.peerId) { + signMessage(this.peerId, msg, callback) + } else { + nextTick(callback, null, msg) + } + } + /** * Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation. * For example, a Floodsub implementation might simply publish each message to each topic for every peer * @abstract * @param {Array|string} topics * @param {Array|any} messages + * @param {function(Error)} callback * @returns {undefined} * */ - publish (topics, messages) { + publish (topics, messages, callback) { throw errcode('publish must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') } diff --git a/src/message/index.js b/src/message/index.js index ed860a60ef..320ab4cf7d 100644 --- a/src/message/index.js +++ b/src/message/index.js @@ -3,8 +3,12 @@ const protons = require('protons') const rpcProto = protons(require('./rpc.proto.js')) +const RPC = rpcProto.RPC const topicDescriptorProto = protons(require('./topic-descriptor.proto.js')) exports = module.exports exports.rpc = rpcProto exports.td = topicDescriptorProto +exports.RPC = RPC +exports.Message = RPC.Message +exports.SubOpts = RPC.SubOpts diff --git a/src/message/rpc.proto.js b/src/message/rpc.proto.js index 6f917bfcef..88b1f83427 100644 --- a/src/message/rpc.proto.js +++ b/src/message/rpc.proto.js @@ -13,6 +13,8 @@ message RPC { optional bytes from = 1; optional bytes data = 2; optional bytes seqno = 3; - repeated string topicIDs = 4; + repeated string topicIDs = 4; + optional bytes signature = 5; + optional bytes key = 6; } }` diff --git a/src/message/sign.js b/src/message/sign.js new file mode 100644 index 0000000000..a275214358 --- /dev/null +++ b/src/message/sign.js @@ -0,0 +1,33 @@ +'use strict' + +const { Message } = require('./index') +const SignPrefix = Buffer.from('libp2p-pubsub:') + +module.exports.SignPrefix = SignPrefix + +/** + * Signs the provided message with the given `peerId` + * + * @param {PeerId} peerId + * @param {Message} message + * @param {function(Error, Message)} callback + * @returns {void} + */ +module.exports.signMessage = function (peerId, message, callback) { + // Get the message in bytes, and prepend with the pubsub prefix + const bytes = Buffer.concat([ + SignPrefix, + Message.encode(message) + ]) + + // Sign the bytes with the private key + peerId.privKey.sign(bytes, (err, signature) => { + if (err) return callback(err) + + callback(null, { + ...message, + signature: signature, + key: peerId.pubKey.bytes + }) + }) +} diff --git a/src/peer.js b/src/peer.js index 45c4573222..7179b4e830 100644 --- a/src/peer.js +++ b/src/peer.js @@ -6,7 +6,7 @@ const pull = require('pull-stream') const setImmediate = require('async/setImmediate') const EventEmitter = require('events') -const rpc = require('./message').rpc.RPC +const { RPC } = require('./message') /** * The known state of a connected peer. @@ -109,7 +109,7 @@ class Peer extends EventEmitter { }) }) - this.write(rpc.encode({ + this.write(RPC.encode({ subscriptions: subs })) } @@ -139,7 +139,7 @@ class Peer extends EventEmitter { * @returns {undefined} */ sendMessages (msgs) { - this.write(rpc.encode({ + this.write(RPC.encode({ msgs: msgs })) } diff --git a/src/utils.js b/src/utils.js index 53c515216e..547aaf550d 100644 --- a/src/utils.js +++ b/src/utils.js @@ -81,15 +81,17 @@ exports.normalizeInRpcMessages = (messages) => { }) } +exports.normalizeOutRpcMessage = (message) => { + const m = Object.assign({}, message) + if (typeof message.from === 'string' || message.from instanceof String) { + m.from = bs58.decode(message.from) + } + return m +} + exports.normalizeOutRpcMessages = (messages) => { if (!messages) { return messages } - return messages.map((msg) => { - const m = Object.assign({}, msg) - if (typeof msg.from === 'string' || msg.from instanceof String) { - m.from = bs58.decode(msg.from) - } - return m - }) + return messages.map(exports.normalizeOutRpcMessage) } diff --git a/test/pubsub.js b/test/pubsub.js index 5691d85607..2c824a7611 100644 --- a/test/pubsub.js +++ b/test/pubsub.js @@ -8,7 +8,10 @@ const expect = chai.expect const series = require('async/series') const parallel = require('async/parallel') +const { Message } = require('../src/message') +const { SignPrefix } = require('../src/message/sign') const PubsubBaseProtocol = require('../src') +const { randomSeqno, normalizeOutRpcMessage } = require('../src/utils') const utils = require('./utils') const createNode = utils.createNode @@ -55,14 +58,7 @@ describe('pubsub base protocol', () => { }) }) - after((done) => { - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], done) - }) - - it('mount the pubsub protocol', (done) => { + before('mount the pubsub protocol', (done) => { psA = new PubsubImplementation(nodeA) psB = new PubsubImplementation(nodeB) @@ -73,13 +69,20 @@ describe('pubsub base protocol', () => { }, 50) }) - it('start both Pubsub', (done) => { + before('start both Pubsub', (done) => { parallel([ (cb) => psA.start(cb), (cb) => psB.start(cb) ], done) }) + after((done) => { + parallel([ + (cb) => nodeA.stop(cb), + (cb) => nodeB.stop(cb) + ], done) + }) + it('Dial from nodeA to nodeB', (done) => { series([ (cb) => nodeA.dial(nodeB.peerInfo, cb), @@ -90,6 +93,29 @@ describe('pubsub base protocol', () => { }, 1000) ], done) }) + + it('_buildMessage normalizes and signs messages', (done) => { + const message = { + from: 'QmABC', + data: 'hello', + seqno: randomSeqno(), + topicIDs: ['test-topic'] + } + + psA._buildMessage(message, (err, signedMessage) => { + expect(err).to.not.exist() + + const bytesToSign = Buffer.concat([ + SignPrefix, + Message.encode(normalizeOutRpcMessage(message)) + ]) + + psA.peerId.pubKey.verify(bytesToSign, signedMessage.signature, (err, verified) => { + expect(verified).to.eql(true) + done(err) + }) + }) + }) }) describe('dial the pubsub protocol on mount', () => { diff --git a/test/sign.spec.js b/test/sign.spec.js new file mode 100644 index 0000000000..e7bbd34e51 --- /dev/null +++ b/test/sign.spec.js @@ -0,0 +1,53 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect + +const { Message } = require('../src/message') +const { signMessage, SignPrefix } = require('../src/message/sign') +const PeerId = require('peer-id') +const { randomSeqno } = require('../src/utils') + +describe('message signing', () => { + let peerId + before((done) => { + peerId = PeerId.create({ + bits: 1024 + }, (err, id) => { + peerId = id + done(err) + }) + }) + + it('should be able to sign a message', (done) => { + const message = { + from: 'QmABC', + data: 'hello', + seqno: randomSeqno(), + topicIDs: ['test-topic'] + } + + const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) + + peerId.privKey.sign(bytesToSign, (err, expectedSignature) => { + if (err) return done(err) + + signMessage(peerId, message, (err, signedMessage) => { + if (err) return done(err) + + // Check the signature and public key + expect(signedMessage.signature).to.eql(expectedSignature) + expect(signedMessage.key).to.eql(peerId.pubKey.bytes) + + // Verify the signature + peerId.pubKey.verify(bytesToSign, signedMessage.signature, (err, verified) => { + expect(verified).to.eql(true) + done(err) + }) + }) + }) + }) +})