diff --git a/go-peer/chatroom.go b/go-peer/chatroom.go index 46629fe9..e2496044 100644 --- a/go-peer/chatroom.go +++ b/go-peer/chatroom.go @@ -1,8 +1,12 @@ package main import ( + "bufio" "context" + "encoding/binary" + "fmt" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -19,13 +23,15 @@ type ChatRoom struct { Messages chan *ChatMessage SysMessages chan *ChatMessage - ctx context.Context - ps *pubsub.PubSub - topic *pubsub.Topic - sub *pubsub.Subscription + ctx context.Context + h host.Host + ps *pubsub.PubSub + chatTopic *pubsub.Topic + chatSub *pubsub.Subscription + fileTopic *pubsub.Topic + fileSub *pubsub.Subscription roomName string - self peer.ID nick string } @@ -38,25 +44,39 @@ type ChatMessage struct { // JoinChatRoom tries to subscribe to the PubSub topic for the room name, returning // a ChatRoom on success. -func JoinChatRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, nickname string, roomName string) (*ChatRoom, error) { - // join the pubsub topic - topic, err := ps.Join(topicName(roomName)) +func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname string, roomName string) (*ChatRoom, error) { + // join the pubsub chatTopic + chatTopic, err := ps.Join(chatTopicName(roomName)) if err != nil { return nil, err } // and subscribe to it - sub, err := topic.Subscribe() + chatSub, err := chatTopic.Subscribe() + if err != nil { + return nil, err + } + + // join the pubsub fileTopic + fileTopic, err := ps.Join(fileTopicName(roomName)) + if err != nil { + return nil, err + } + + // and subscribe to it + fileSub, err := fileTopic.Subscribe() if err != nil { return nil, err } cr := &ChatRoom{ ctx: ctx, + h: h, ps: ps, - topic: topic, - sub: sub, - self: selfID, + chatTopic: chatTopic, + chatSub: chatSub, + fileTopic: fileTopic, + fileSub: fileSub, nick: nickname, roomName: roomName, Messages: make(chan *ChatMessage, ChatRoomBufSize), @@ -70,23 +90,29 @@ func JoinChatRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, nickna // Publish sends a message to the pubsub topic. func (cr *ChatRoom) Publish(message string) error { - return cr.topic.Publish(cr.ctx, []byte(message)) + return cr.chatTopic.Publish(cr.ctx, []byte(message)) } func (cr *ChatRoom) ListPeers() []peer.ID { - return cr.ps.ListPeers(topicName(cr.roomName)) + return cr.ps.ListPeers(chatTopicName(cr.roomName)) } -// readLoop pulls messages from the pubsub topic and pushes them onto the Messages channel. +// readLoop pulls messages from the pubsub chat/file topic and handles them. func (cr *ChatRoom) readLoop() { + go cr.readChatLoop() + go cr.readFileLoop() +} + +// readChatLoop pulls messages from the pubsub chat topic and pushes them onto the Messages channel. +func (cr *ChatRoom) readChatLoop() { for { - msg, err := cr.sub.Next(cr.ctx) + msg, err := cr.chatSub.Next(cr.ctx) if err != nil { close(cr.Messages) return } // only forward messages delivered by others - if msg.ReceivedFrom == cr.self { + if msg.ReceivedFrom == cr.h.ID() { continue } cm := new(ChatMessage) @@ -98,6 +124,76 @@ func (cr *ChatRoom) readLoop() { } } -func topicName(roomName string) string { +// readFileLoop pulls messages from the pubsub file topic and handles them. +func (cr *ChatRoom) readFileLoop() { + for { + msg, err := cr.fileSub.Next(cr.ctx) + if err != nil { + close(cr.Messages) + return + } + // only forward messages delivered by others + if msg.ReceivedFrom == cr.h.ID() { + continue + } + + fileID := msg.Data + fileBody, err := cr.requestFile(msg.GetFrom(), fileID) + if err != nil { + close(cr.Messages) + return + } + + cm := new(ChatMessage) + cm.Message = fmt.Sprintf("File: %s (%v bytes) from %s", string(fileID), len(fileBody), msg.GetFrom().String()) + cm.SenderID = msg.ID + cm.SenderNick = string(msg.ID[len(msg.ID)-8]) + // send valid messages onto the Messages channel + cr.Messages <- cm + } +} + +// requestFile sends a request to the peer to send the file with the given fileID. +func (cr *ChatRoom) requestFile(toPeer peer.ID, fileID []byte) ([]byte, error) { + stream, err := cr.h.NewStream(context.Background(), toPeer, "/universal-connectivity-file/1") + if err != nil { + return nil, fmt.Errorf("failed to create stream: %w", err) + } + defer stream.Close() + + reqLen := binary.AppendUvarint([]byte{}, uint64(len(fileID))) + if _, err := stream.Write(reqLen); err != nil { + return nil, fmt.Errorf("failed to write fileID to the stream: %w", err) + } + if _, err := stream.Write(fileID); err != nil { + return nil, fmt.Errorf("failed to write fileID to the stream: %w", err) + } + if err := stream.CloseWrite(); err != nil { + return nil, fmt.Errorf("failed to close write stream: %w", err) + } + + reader := bufio.NewReader(stream) + respLen, err := binary.ReadUvarint(reader) + if err != nil { + return nil, fmt.Errorf("failed to read response length prefix: %w", err) + } + fileBody := make([]byte, respLen) + if _, err := reader.Read(fileBody); err != nil { + return nil, fmt.Errorf("failed to read fileBody from the stream: %w", err) + } + if err := stream.CloseRead(); err != nil { + return nil, fmt.Errorf("failed to close read stream: %w", err) + } + + return fileBody, nil +} + +// chatTopicName returns the name of the pubsub topic for the chat room. +func chatTopicName(roomName string) string { return roomName } + +// fileTopicName returns the name of the pubsub topic used for sending/recieving files in the chat room. +func fileTopicName(roomName string) string { + return fmt.Sprintf("%s-file", roomName) +} diff --git a/go-peer/main.go b/go-peer/main.go index 82a2bac8..b5682320 100644 --- a/go-peer/main.go +++ b/go-peer/main.go @@ -78,7 +78,7 @@ func NewDHT(ctx context.Context, host host.Host, bootstrapPeers []multiaddr.Mult // Borrowed from https://medium.com/rahasak/libp2p-pubsub-peer-discovery-with-kademlia-dht-c8b131550ac7 func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, rendezvous string) { - var routingDiscovery = routing.NewRoutingDiscovery(dht) + routingDiscovery := routing.NewRoutingDiscovery(dht) discovery.Advertise(ctx, routingDiscovery, rendezvous) @@ -198,7 +198,7 @@ func main() { room := *roomFlag // join the chat room - cr, err := JoinChatRoom(ctx, ps, h.ID(), nick, room) + cr, err := JoinChatRoom(ctx, h, ps, nick, room) if err != nil { panic(err) } diff --git a/js-peer/package-lock.json b/js-peer/package-lock.json index 2135e9d8..339f4081 100644 --- a/js-peer/package-lock.json +++ b/js-peer/package-lock.json @@ -27,15 +27,21 @@ "debug": "^4.3.4", "eslint": "8.35.0", "eslint-config-next": "13.2.3", + "it-length-prefixed": "^9.0.1", + "it-map": "^3.0.3", + "it-pipe": "^3.0.1", "libp2p": "^0.45.5", "next": "13.2.3", "private-ip": "^3.0.0", "react": "18.2.0", "react-dom": "18.2.0", "typescript": "4.9.5", - "usehooks-ts": "^2.9.1" + "uint8arrays": "^4.0.4", + "usehooks-ts": "^2.9.1", + "uuid": "^9.0.0" }, "devDependencies": { + "@types/uuid": "^9.0.2", "autoprefixer": "^10.4.13", "postcss": "^8.4.21", "tailwindcss": "^3.2.7" @@ -1872,6 +1878,12 @@ "resolved": "https://registry.npmjs.org/@types/sinonjs__fake-timers/-/sinonjs__fake-timers-8.1.2.tgz", "integrity": "sha512-9GcLXF0/v3t80caGs5p2rRfkB+a8VBGLJZVih6CNFkx8IZ994wiKKLSRs9nuFwk1HevWs/1mnUmkApGrSGsShA==" }, + "node_modules/@types/uuid": { + "version": "9.0.2", + "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-9.0.2.tgz", + "integrity": "sha512-kNnC1GFBLuhImSnV7w4njQkUiJi0ZXUycu1rUaouPqiKlXkh77JKgdRnTAp1x5eBwcIwbtI+3otwzuIDEuDoxQ==", + "dev": true + }, "node_modules/@types/ws": { "version": "8.5.4", "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.5.4.tgz", @@ -6612,15 +6624,11 @@ } }, "node_modules/uint8arrays": { - "version": "4.0.3", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-4.0.3.tgz", - "integrity": "sha512-b+aKlI2oTnxnfeSQWV1sMacqSNxqhtXySaH6bflvONGxF8V/fT3ZlYH7z2qgGfydsvpVo4JUgM/Ylyfl2YouCg==", + "version": "4.0.4", + "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-4.0.4.tgz", + "integrity": "sha512-AOoA66e/A7zoXm1mgzQjGmkWDTvCrS3ttWXLHFtlVAwMobLcaOA7G7WRNNAcyfjjYdFDtkEK6njRDX7hZLIO9Q==", "dependencies": { "multiformats": "^11.0.0" - }, - "engines": { - "node": ">=16.0.0", - "npm": ">=7.0.0" } }, "node_modules/unbox-primitive": { @@ -6718,6 +6726,14 @@ "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", "dev": true }, + "node_modules/uuid": { + "version": "9.0.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.0.tgz", + "integrity": "sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==", + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/varint": { "version": "6.0.0", "resolved": "https://registry.npmjs.org/varint/-/varint-6.0.0.tgz", @@ -6904,15 +6920,21 @@ "debug": "^4.3.4", "eslint": "8.35.0", "eslint-config-next": "13.2.3", + "it-length-prefixed": "^9.0.1", + "it-map": "^3.0.3", + "it-pipe": "^3.0.1", "libp2p": "^0.45.5", "next": "13.2.3", "private-ip": "^3.0.0", "react": "18.2.0", "react-dom": "18.2.0", "typescript": "4.9.5", - "usehooks-ts": "^2.9.1" + "uint8arrays": "^4.0.4", + "usehooks-ts": "^2.9.1", + "uuid": "^9.0.0" }, "devDependencies": { + "@types/uuid": "^9.0.2", "autoprefixer": "^10.4.13", "postcss": "^8.4.21", "tailwindcss": "^3.2.7" diff --git a/js-peer/package.json b/js-peer/package.json index 25e0ba4d..677667cf 100644 --- a/js-peer/package.json +++ b/js-peer/package.json @@ -27,15 +27,21 @@ "debug": "^4.3.4", "eslint": "8.35.0", "eslint-config-next": "13.2.3", + "it-length-prefixed": "^9.0.1", + "it-map": "^3.0.3", + "it-pipe": "^3.0.1", "libp2p": "^0.45.5", "next": "13.2.3", "private-ip": "^3.0.0", "react": "18.2.0", "react-dom": "18.2.0", "typescript": "4.9.5", - "usehooks-ts": "^2.9.1" + "uint8arrays": "^4.0.4", + "usehooks-ts": "^2.9.1", + "uuid": "^9.0.0" }, "devDependencies": { + "@types/uuid": "^9.0.2", "autoprefixer": "^10.4.13", "postcss": "^8.4.21", "tailwindcss": "^3.2.7" diff --git a/js-peer/src/components/chat.tsx b/js-peer/src/components/chat.tsx index 3bdf5b80..ec0188ad 100644 --- a/js-peer/src/components/chat.tsx +++ b/js-peer/src/components/chat.tsx @@ -1,14 +1,20 @@ import { useLibp2pContext } from '@/context/ctx' -import React, { useCallback, useEffect, useState } from 'react' +import React, { useCallback, useEffect, useRef, useState } from 'react' import { Message } from '@libp2p/interface-pubsub' -import { CHAT_TOPIC } from '@/lib/constants' +import { CHAT_FILE_TOPIC, CHAT_TOPIC, FILE_EXCHANGE_PROTOCOL } from '@/lib/constants' import { createIcon } from '@download/blockies' import { ChatMessage, useChatContext } from '../context/chat-ctx' - +import { v4 as uuidv4 } from 'uuid'; +import { ChatFile, useFileChatContext } from '@/context/file-ctx' +import { pipe } from 'it-pipe' +import map from 'it-map' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import * as lp from 'it-length-prefixed' interface MessageProps extends ChatMessage { } -function Message({ msg, from, peerId }: MessageProps) { +function Message({ msg, fileObjectUrl, from, peerId }: MessageProps) { const msgref = React.useRef(null) const { libp2p } = useLibp2pContext() @@ -35,6 +41,7 @@ function Message({ msg, from, peerId }: MessageProps) { >
{msg} +

{fileObjectUrl ? Download : ""}

{peerId !== libp2p.peerId.toString() ? `from: ${peerId.slice(-4)}` : null}

@@ -45,29 +52,98 @@ function Message({ msg, from, peerId }: MessageProps) { export default function ChatContainer() { const { libp2p } = useLibp2pContext() const { messageHistory, setMessageHistory } = useChatContext(); + const { files, setFiles } = useFileChatContext(); const [input, setInput] = useState('') + const fileRef = useRef(null); // Effect hook to subscribe to pubsub events and update the message state hook useEffect(() => { - const messageCB = (evt: CustomEvent) => { + const messageCB = async (evt: CustomEvent) => { console.log('gossipsub console log', evt.detail) // FIXME: Why does 'from' not exist on type 'Message'? const { topic, data } = evt.detail + + switch (topic) { + case CHAT_TOPIC: { + chatMessageCB(evt, topic, data) + break + } + case CHAT_FILE_TOPIC: { + chatFileMessageCB(evt, topic, data) + break + } + default: { + throw new Error(`Unexpected gossipsub topic: ${topic}`) + } + } + } + + const chatMessageCB = (evt: CustomEvent, topic: string, data: Uint8Array) => { const msg = new TextDecoder().decode(data) console.log(`${topic}: ${msg}`) // Append signed messages, otherwise discard if (evt.detail.type === 'signed') { - setMessageHistory([...messageHistory, { msg, from: 'other', peerId: evt.detail.from.toString() }]) + setMessageHistory([...messageHistory, { msg, fileObjectUrl: undefined, from: 'other', peerId: evt.detail.from.toString() }]) + } + } + + const chatFileMessageCB = async (evt: CustomEvent, topic: string, data: Uint8Array) => { + const fileId = new TextDecoder().decode(data) + + // if the message isn't signed, discard it. + if (evt.detail.type !== 'signed') { + return } + const senderPeerId = evt.detail.from; + + const stream = await libp2p.dialProtocol(senderPeerId, FILE_EXCHANGE_PROTOCOL) + await pipe( + [uint8ArrayFromString(fileId)], + (source) => lp.encode(source), + stream, + (source) => lp.decode(source), + async function(source) { + for await (const data of source) { + const body: Uint8Array = data.subarray() + console.log(`request_response: response received: size:${body.length}`) + + const msg: ChatMessage = { + msg: newChatFileMessage(fileId, body), + fileObjectUrl: window.URL.createObjectURL(new Blob([body])), + from: 'other', + peerId: senderPeerId.toString(), + } + setMessageHistory([...messageHistory, msg]) + } + } + ) } libp2p.services.pubsub.addEventListener('message', messageCB) + libp2p.handle(FILE_EXCHANGE_PROTOCOL, ({ stream }) => { + pipe( + stream.source, + (source) => lp.decode(source), + (source) => map(source, async (msg) => { + const fileId = uint8ArrayToString(msg.subarray()) + const file = files.get(fileId)! + return file.body + }), + (source) => lp.encode(source), + stream.sink, + ) + }) + return () => { - // Cleanup handlers 👇 - // libp2p.pubsub.unsubscribe(CHAT_TOPIC) - libp2p.services.pubsub.removeEventListener('message', messageCB) + (async () => { + // Cleanup handlers 👇 + // libp2p.services.pubsub.unsubscribe(CHAT_TOPIC) + // libp2p.services.pubsub.unsubscribe(CHAT_FILE_TOPIC) + libp2p.services.pubsub.removeEventListener('message', messageCB) + await libp2p.unhandle(FILE_EXCHANGE_PROTOCOL) + })(); } }, [libp2p, messageHistory, setMessageHistory]) @@ -75,7 +151,7 @@ export default function ChatContainer() { if (input === '') return console.log( - 'peers in gossip:', + `peers in gossip for topic ${CHAT_TOPIC}:`, libp2p.services.pubsub.getSubscribers(CHAT_TOPIC).toString(), ) @@ -90,10 +166,48 @@ export default function ChatContainer() { const myPeerId = libp2p.peerId.toString() - setMessageHistory([...messageHistory, { msg: input, from: 'me', peerId: myPeerId }]) + setMessageHistory([...messageHistory, { msg: input, fileObjectUrl: undefined, from: 'me', peerId: myPeerId }]) setInput('') }, [input, messageHistory, setInput, libp2p, setMessageHistory]) + const sendFile = useCallback(async (readerEvent: ProgressEvent) => { + const fileBody = readerEvent.target?.result as ArrayBuffer; + + const myPeerId = libp2p.peerId.toString() + const file: ChatFile = { + id: uuidv4(), + body: new Uint8Array(fileBody), + sender: myPeerId, + } + setFiles(files.set(file.id, file)) + + console.log( + `peers in gossip for topic ${CHAT_FILE_TOPIC}:`, + libp2p.services.pubsub.getSubscribers(CHAT_FILE_TOPIC).toString(), + ) + + const res = await libp2p.services.pubsub.publish( + CHAT_FILE_TOPIC, + new TextEncoder().encode(file.id) + ) + console.log( + 'sent file to: ', + res.recipients.map((peerId) => peerId.toString()), + ) + + const msg: ChatMessage = { + msg: newChatFileMessage(file.id, file.body), + fileObjectUrl: window.URL.createObjectURL(new Blob([file.body])), + from: 'me', + peerId: myPeerId, + } + setMessageHistory([...messageHistory, msg]) + }, [messageHistory, libp2p, setMessageHistory]) + + const newChatFileMessage = (id: string, body: Uint8Array) => { + return `File: ${id} (${body.length} bytes)` + } + const handleKeyUp = useCallback( async (e: React.KeyboardEvent) => { if (e.key !== 'Enter') { @@ -118,6 +232,26 @@ export default function ChatContainer() { [setInput], ) + const handleFileInput = useCallback( + (e: React.ChangeEvent) => { + if (e.target.files) { + const reader = new FileReader(); + reader.readAsArrayBuffer(e.target.files[0]); + reader.onload = (readerEvent) => { + sendFile(readerEvent) + }; + } + }, + [sendFile], + ) + + const handleFileSend = useCallback( + async (_e: React.MouseEvent) => { + fileRef?.current?.click(); + }, + [fileRef], + ) + return (
@@ -140,8 +274,8 @@ export default function ChatContainer() {
    {/* messages start */} - {messageHistory.map(({ msg, from, peerId }, idx) => ( - + {messageHistory.map(({ msg, fileObjectUrl, from, peerId }, idx) => ( + ))} {/* messages end */}
@@ -164,7 +298,9 @@ export default function ChatContainer() { /> -