Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

deps: update it-length-prefixed and uint8arraylists deps #91

Merged
merged 1 commit into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@
"@libp2p/crypto": "^1.0.0",
"@libp2p/interface-connection": "^2.0.0",
"@libp2p/interface-peer-id": "^1.0.2",
"@libp2p/interface-pubsub": "^1.0.3",
"@libp2p/interface-pubsub": "^2.0.0",
"@libp2p/interface-registrar": "^2.0.0",
"@libp2p/interfaces": "^3.0.2",
"@libp2p/logger": "^2.0.0",
Expand All @@ -191,11 +191,12 @@
"abortable-iterator": "^4.0.2",
"err-code": "^3.0.1",
"iso-random-stream": "^2.0.0",
"it-length-prefixed": "^7.0.1",
"it-length-prefixed": "^8.0.2",
"it-pipe": "^2.0.3",
"it-pushable": "^3.0.0",
"multiformats": "^9.6.3",
"p-queue": "^7.2.0",
"uint8arraylist": "^2.0.0",
"uint8arrays": "^3.0.0"
},
"devDependencies": {
Expand All @@ -204,9 +205,9 @@
"delay": "^5.0.0",
"it-pair": "^2.0.2",
"p-defer": "^4.0.0",
"p-wait-for": "^4.1.0",
"protons": "^3.0.4",
"protons-runtime": "^1.0.4",
"p-wait-for": "^5.0.0",
"protons": "^4.0.1",
"protons-runtime": "^2.0.2",
"sinon": "^14.0.0",
"util": "^0.12.4"
}
Expand Down
46 changes: 35 additions & 11 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import Queue from 'p-queue'
import { createTopology } from '@libp2p/topology'
import { codes } from './errors.js'
import { PeerStreams as PeerStreamsImpl } from './peer-streams.js'
import { toMessage, ensureArray, randomSeqno, noSignMsgId, msgId, toRpcMessage } from './utils.js'
import { toMessage, ensureArray, noSignMsgId, msgId, toRpcMessage, randomSeqno } from './utils.js'
import {
signMessage,
verifySignature
Expand All @@ -17,6 +17,7 @@ import type { Connection } from '@libp2p/interface-connection'
import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult } from '@libp2p/interface-pubsub'
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
import { Components, Initializable } from '@libp2p/components'
import type { Uint8ArrayList } from 'uint8arraylist'

const log = logger('libp2p:pubsub')

Expand Down Expand Up @@ -284,7 +285,7 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
/**
* Responsible for processing each RPC message received by other peers.
*/
async processMessages (peerId: PeerId, stream: AsyncIterable<Uint8Array>, peerStreams: PeerStreams) {
async processMessages (peerId: PeerId, stream: AsyncIterable<Uint8ArrayList>, peerStreams: PeerStreams) {
try {
await pipe(
stream,
Expand Down Expand Up @@ -446,6 +447,10 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case 'StrictSign':
if (msg.type !== 'signed') {
throw errcode(new Error('Message type should be "signed" when signature policy is StrictSign but it was not'), codes.ERR_MISSING_SIGNATURE)
}

if (msg.sequenceNumber == null) {
throw errcode(new Error('Need seqno when signature policy is StrictSign but it was missing'), codes.ERR_MISSING_SEQNO)
}
Expand Down Expand Up @@ -474,19 +479,19 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
* Decode Uint8Array into an RPC object.
* This can be override to use a custom router protobuf.
*/
abstract decodeRpc (bytes: Uint8Array): PubSubRPC
abstract decodeRpc (bytes: Uint8Array | Uint8ArrayList): PubSubRPC

/**
* Encode RPC object into a Uint8Array.
* This can be override to use a custom router protobuf.
*/
abstract encodeRpc (rpc: PubSubRPC): Uint8Array
abstract encodeRpc (rpc: PubSubRPC): Uint8ArrayList

/**
* Encode RPC object into a Uint8Array.
* This can be override to use a custom router protobuf.
*/
abstract encodeMessage (rpc: PubSubRPCMessage): Uint8Array
abstract encodeMessage (rpc: PubSubRPCMessage): Uint8ArrayList

/**
* Send an rpc object to a peer
Expand Down Expand Up @@ -523,26 +528,42 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case 'StrictNoSign':
if (message.type !== 'unsigned') {
throw errcode(new Error('Message type should be "unsigned" when signature policy is StrictNoSign but it was not'), codes.ERR_MISSING_SIGNATURE)
}

// @ts-expect-error should not be present
if (message.signature != null) {
throw errcode(new Error('StrictNoSigning: signature should not be present'), codes.ERR_UNEXPECTED_SIGNATURE)
}

// @ts-expect-error should not be present
if (message.key != null) {
throw errcode(new Error('StrictNoSigning: key should not be present'), codes.ERR_UNEXPECTED_KEY)
}

// @ts-expect-error should not be present
if (message.sequenceNumber != null) {
throw errcode(new Error('StrictNoSigning: seqno should not be present'), codes.ERR_UNEXPECTED_SEQNO)
}
break
case 'StrictSign':
if (message.type !== 'signed') {
throw errcode(new Error('Message type should be "signed" when signature policy is StrictSign but it was not'), codes.ERR_MISSING_SIGNATURE)
}

if (message.signature == null) {
throw errcode(new Error('StrictSigning: Signing required and no signature was present'), codes.ERR_MISSING_SIGNATURE)
}

if (message.sequenceNumber == null) {
throw errcode(new Error('StrictSigning: Signing required and no seqno was present'), codes.ERR_MISSING_SEQNO)
throw errcode(new Error('StrictSigning: Signing required and no sequenceNumber was present'), codes.ERR_MISSING_SEQNO)
}

if (!(await verifySignature(message, this.encodeMessage.bind(this)))) {
throw errcode(new Error('StrictSigning: Invalid message signature'), codes.ERR_INVALID_SIGNATURE)
}

break
default:
throw errcode(new Error('Cannot validate message: unhandled signature policy'), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
Expand All @@ -559,14 +580,16 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
* Normalizes the message and signs it, if signing is enabled.
* Should be used by the routers to create the message to send.
*/
async buildMessage (message: Message) {
async buildMessage (message: { from: PeerId, topic: string, data: Uint8Array, sequenceNumber: bigint }): Promise<Message> {
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case 'StrictSign':
message.sequenceNumber = randomSeqno()
return await signMessage(this.components.getPeerId(), message, this.encodeMessage.bind(this))
case 'StrictNoSign':
return await Promise.resolve(message)
return await Promise.resolve({
type: 'unsigned',
...message
})
default:
throw errcode(new Error('Cannot build message: unhandled signature policy'), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
}
Expand Down Expand Up @@ -603,10 +626,11 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
throw new Error('Pubsub has not started')
}

const message: Message = {
const message = {
from: this.components.getPeerId(),
topic,
data: data ?? new Uint8Array(0)
data: data ?? new Uint8Array(0),
sequenceNumber: randomSeqno()
}

log('publish topic: %s from: %p data: %m', topic, message.from, message.data)
Expand Down
12 changes: 7 additions & 5 deletions src/peer-streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { PeerId } from '@libp2p/interface-peer-id'
import type { Stream } from '@libp2p/interface-connection'
import type { Pushable } from 'it-pushable'
import type { PeerStreamEvents } from '@libp2p/interface-pubsub'
import { Uint8ArrayList } from 'uint8arraylist'

const log = logger('libp2p-pubsub:peer-streams')

Expand All @@ -25,11 +26,11 @@ export class PeerStreams extends EventEmitter<PeerStreamEvents> {
/**
* Write stream - it's preferable to use the write method
*/
public outboundStream?: Pushable<Uint8Array>
public outboundStream?: Pushable<Uint8ArrayList>
/**
* Read stream
*/
public inboundStream?: AsyncIterable<Uint8Array>
public inboundStream?: AsyncIterable<Uint8ArrayList>
/**
* The raw outbound stream, as retrieved from conn.newStream
*/
Expand Down Expand Up @@ -72,13 +73,13 @@ export class PeerStreams extends EventEmitter<PeerStreamEvents> {
* Send a message to this peer.
* Throws if there is no `stream` to write to available.
*/
write (data: Uint8Array) {
write (data: Uint8Array | Uint8ArrayList) {
if (this.outboundStream == null) {
const id = this.id.toString()
throw new Error('No writable connection to ' + id)
}

this.outboundStream.push(data)
this.outboundStream.push(data instanceof Uint8Array ? new Uint8ArrayList(data) : data)
}

/**
Expand Down Expand Up @@ -115,7 +116,8 @@ export class PeerStreams extends EventEmitter<PeerStreamEvents> {
}

this._rawOutboundStream = stream
this.outboundStream = pushable({
this.outboundStream = pushable<Uint8ArrayList>({
objectMode: true,
onEnd: (shouldEmit) => {
// close writable side of the stream
if (this._rawOutboundStream != null && this._rawOutboundStream.reset != null) { // eslint-disable-line @typescript-eslint/prefer-optional-chain
Expand Down
49 changes: 31 additions & 18 deletions src/sign.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,16 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toRpcMessage } from './utils.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import { keys } from '@libp2p/crypto'
import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import type { PubSubRPCMessage, SignedMessage } from '@libp2p/interface-pubsub'
import { peerIdFromKeys } from '@libp2p/peer-id'
import type { Uint8ArrayList } from 'uint8arraylist'

export const SignPrefix = uint8ArrayFromString('libp2p-pubsub:')

/**
* Signs the provided message with the given `peerId`
*/
export async function signMessage (peerId: PeerId, message: Message, encode: (rpc: PubSubRPCMessage) => Uint8Array) {
// Get the message in bytes, and prepend with the pubsub prefix
const bytes = uint8ArrayConcat([
SignPrefix,
encode(toRpcMessage(message))
])

export async function signMessage (peerId: PeerId, message: { from: PeerId, topic: string, data: Uint8Array, sequenceNumber: bigint }, encode: (rpc: PubSubRPCMessage) => Uint8ArrayList): Promise<SignedMessage> {
if (peerId.privateKey == null) {
throw new Error('Cannot sign message, no private key present')
}
Expand All @@ -26,22 +21,36 @@ export async function signMessage (peerId: PeerId, message: Message, encode: (rp
throw new Error('Cannot sign message, no public key present')
}

const privateKey = await keys.unmarshalPrivateKey(peerId.privateKey)
const signature = await privateKey.sign(bytes)

const outputMessage: Message = {
...message,
signature: signature,
key: peerId.publicKey
// @ts-expect-error signature field is missing, added below
const outputMessage: SignedMessage = {
type: 'signed',
topic: message.topic,
data: message.data,
sequenceNumber: message.sequenceNumber,
from: peerId
}

// Get the message in bytes, and prepend with the pubsub prefix
const bytes = uint8ArrayConcat([
SignPrefix,
encode(toRpcMessage(outputMessage)).subarray()
])

const privateKey = await keys.unmarshalPrivateKey(peerId.privateKey)
outputMessage.signature = await privateKey.sign(bytes)
outputMessage.key = peerId.publicKey

return outputMessage
}

/**
* Verifies the signature of the given message
*/
export async function verifySignature (message: Message, encode: (rpc: PubSubRPCMessage) => Uint8Array) {
export async function verifySignature (message: SignedMessage, encode: (rpc: PubSubRPCMessage) => Uint8ArrayList) {
if (message.type !== 'signed') {
throw new Error('Message type must be "signed" to be verified')
}

if (message.signature == null) {
throw new Error('Message must contain a signature to be verified')
}
Expand All @@ -57,7 +66,7 @@ export async function verifySignature (message: Message, encode: (rpc: PubSubRPC
...toRpcMessage(message),
signature: undefined,
key: undefined
})
}).subarray()
])

// Get the public key
Expand All @@ -72,7 +81,11 @@ export async function verifySignature (message: Message, encode: (rpc: PubSubRPC
* Returns the PublicKey associated with the given message.
* If no valid PublicKey can be retrieved an error will be returned.
*/
export async function messagePublicKey (message: Message) {
export async function messagePublicKey (message: SignedMessage) {
if (message.type !== 'signed') {
throw new Error('Message type must be "signed" to have a public key')
}

// should be available in the from property of the message (peer id)
if (message.from == null) {
throw new Error('Could not get the public key from the originator id')
Expand Down
32 changes: 24 additions & 8 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,40 @@ export const toMessage = (message: PubSubRPCMessage): Message => {
throw errcode(new Error('RPC message was missing from'), codes.ERR_MISSING_FROM)
}

if (message.sequenceNumber == null || message.from == null || message.signature == null || message.key == null) {
return {
type: 'unsigned',
topic: message.topic ?? '',
data: message.data ?? new Uint8Array(0)
}
}

return {
type: 'signed',
from: peerIdFromBytes(message.from),
topic: message.topic ?? '',
sequenceNumber: message.sequenceNumber == null ? undefined : bigIntFromBytes(message.sequenceNumber),
sequenceNumber: bigIntFromBytes(message.sequenceNumber),
data: message.data ?? new Uint8Array(0),
signature: message.signature ?? undefined,
key: message.key ?? undefined
signature: message.signature,
key: message.key
}
}

export const toRpcMessage = (message: Message): PubSubRPCMessage => {
if (message.type === 'signed') {
return {
from: message.from.multihash.bytes,
data: message.data,
sequenceNumber: bigIntToBytes(message.sequenceNumber),
topic: message.topic,
signature: message.signature,
key: message.key
}
}

return {
from: message.from.multihash.bytes,
data: message.data,
sequenceNumber: message.sequenceNumber == null ? undefined : bigIntToBytes(message.sequenceNumber),
topic: message.topic,
signature: message.signature,
key: message.key
topic: message.topic
}
}

Expand Down
7 changes: 4 additions & 3 deletions test/instance.spec.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import { expect } from 'aegir/chai'
import { PubSubBaseProtocol } from '../src/index.js'
import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import type { Uint8ArrayList } from 'uint8arraylist'

class PubsubProtocol extends PubSubBaseProtocol {
decodeRpc (bytes: Uint8Array): PubSubRPC {
throw new Error('Method not implemented.')
}

encodeRpc (rpc: PubSubRPC): Uint8Array {
encodeRpc (rpc: PubSubRPC): Uint8ArrayList {
throw new Error('Method not implemented.')
}

decodeMessage (bytes: Uint8Array): PubSubRPCMessage {
decodeMessage (bytes: Uint8Array | Uint8ArrayList): PubSubRPCMessage {
throw new Error('Method not implemented.')
}

encodeMessage (rpc: PubSubRPCMessage): Uint8Array {
encodeMessage (rpc: PubSubRPCMessage): Uint8ArrayList {
throw new Error('Method not implemented.')
}

Expand Down
Loading