Skip to content

Commit

Permalink
feat: file sharing (#76)
Browse files Browse the repository at this point in the history
Co-authored-by: TheDiscordian <43145244+TheDiscordian@users.noreply.github.com>
Co-authored-by: Chad Nehemiah <chad.nehemiah94@gmail.com>
Co-authored-by: GitHub <noreply@github.com>
Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
  • Loading branch information
5 people authored Sep 26, 2023
1 parent 7a79d85 commit 062ba90
Show file tree
Hide file tree
Showing 13 changed files with 562 additions and 81 deletions.
132 changes: 114 additions & 18 deletions go-peer/chatroom.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package main

import (
"bufio"
"context"
"encoding/binary"
"fmt"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"

pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -19,13 +23,15 @@ type ChatRoom struct {
Messages chan *ChatMessage
SysMessages chan *ChatMessage

ctx context.Context
ps *pubsub.PubSub
topic *pubsub.Topic
sub *pubsub.Subscription
ctx context.Context
h host.Host
ps *pubsub.PubSub
chatTopic *pubsub.Topic
chatSub *pubsub.Subscription
fileTopic *pubsub.Topic
fileSub *pubsub.Subscription

roomName string
self peer.ID
nick string
}

Expand All @@ -38,25 +44,39 @@ type ChatMessage struct {

// JoinChatRoom tries to subscribe to the PubSub topic for the room name, returning
// a ChatRoom on success.
func JoinChatRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, nickname string, roomName string) (*ChatRoom, error) {
// join the pubsub topic
topic, err := ps.Join(topicName(roomName))
func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname string, roomName string) (*ChatRoom, error) {
// join the pubsub chatTopic
chatTopic, err := ps.Join(chatTopicName(roomName))
if err != nil {
return nil, err
}

// and subscribe to it
sub, err := topic.Subscribe()
chatSub, err := chatTopic.Subscribe()
if err != nil {
return nil, err
}

// join the pubsub fileTopic
fileTopic, err := ps.Join(fileTopicName(roomName))
if err != nil {
return nil, err
}

// and subscribe to it
fileSub, err := fileTopic.Subscribe()
if err != nil {
return nil, err
}

cr := &ChatRoom{
ctx: ctx,
h: h,
ps: ps,
topic: topic,
sub: sub,
self: selfID,
chatTopic: chatTopic,
chatSub: chatSub,
fileTopic: fileTopic,
fileSub: fileSub,
nick: nickname,
roomName: roomName,
Messages: make(chan *ChatMessage, ChatRoomBufSize),
Expand All @@ -70,23 +90,29 @@ func JoinChatRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, nickna

// Publish sends a message to the pubsub topic.
func (cr *ChatRoom) Publish(message string) error {
return cr.topic.Publish(cr.ctx, []byte(message))
return cr.chatTopic.Publish(cr.ctx, []byte(message))
}

func (cr *ChatRoom) ListPeers() []peer.ID {
return cr.ps.ListPeers(topicName(cr.roomName))
return cr.ps.ListPeers(chatTopicName(cr.roomName))
}

// readLoop pulls messages from the pubsub topic and pushes them onto the Messages channel.
// readLoop pulls messages from the pubsub chat/file topic and handles them.
func (cr *ChatRoom) readLoop() {
go cr.readChatLoop()
go cr.readFileLoop()
}

// readChatLoop pulls messages from the pubsub chat topic and pushes them onto the Messages channel.
func (cr *ChatRoom) readChatLoop() {
for {
msg, err := cr.sub.Next(cr.ctx)
msg, err := cr.chatSub.Next(cr.ctx)
if err != nil {
close(cr.Messages)
return
}
// only forward messages delivered by others
if msg.ReceivedFrom == cr.self {
if msg.ReceivedFrom == cr.h.ID() {
continue
}
cm := new(ChatMessage)
Expand All @@ -98,6 +124,76 @@ func (cr *ChatRoom) readLoop() {
}
}

func topicName(roomName string) string {
// readFileLoop pulls messages from the pubsub file topic and handles them.
func (cr *ChatRoom) readFileLoop() {
for {
msg, err := cr.fileSub.Next(cr.ctx)
if err != nil {
close(cr.Messages)
return
}
// only forward messages delivered by others
if msg.ReceivedFrom == cr.h.ID() {
continue
}

fileID := msg.Data
fileBody, err := cr.requestFile(msg.GetFrom(), fileID)
if err != nil {
close(cr.Messages)
return
}

cm := new(ChatMessage)
cm.Message = fmt.Sprintf("File: %s (%v bytes) from %s", string(fileID), len(fileBody), msg.GetFrom().String())
cm.SenderID = msg.ID
cm.SenderNick = string(msg.ID[len(msg.ID)-8])
// send valid messages onto the Messages channel
cr.Messages <- cm
}
}

// requestFile sends a request to the peer to send the file with the given fileID.
func (cr *ChatRoom) requestFile(toPeer peer.ID, fileID []byte) ([]byte, error) {
stream, err := cr.h.NewStream(context.Background(), toPeer, "/universal-connectivity-file/1")
if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
defer stream.Close()

reqLen := binary.AppendUvarint([]byte{}, uint64(len(fileID)))
if _, err := stream.Write(reqLen); err != nil {
return nil, fmt.Errorf("failed to write fileID to the stream: %w", err)
}
if _, err := stream.Write(fileID); err != nil {
return nil, fmt.Errorf("failed to write fileID to the stream: %w", err)
}
if err := stream.CloseWrite(); err != nil {
return nil, fmt.Errorf("failed to close write stream: %w", err)
}

reader := bufio.NewReader(stream)
respLen, err := binary.ReadUvarint(reader)
if err != nil {
return nil, fmt.Errorf("failed to read response length prefix: %w", err)
}
fileBody := make([]byte, respLen)
if _, err := reader.Read(fileBody); err != nil {
return nil, fmt.Errorf("failed to read fileBody from the stream: %w", err)
}
if err := stream.CloseRead(); err != nil {
return nil, fmt.Errorf("failed to close read stream: %w", err)
}

return fileBody, nil
}

// chatTopicName returns the name of the pubsub topic for the chat room.
func chatTopicName(roomName string) string {
return roomName
}

// fileTopicName returns the name of the pubsub topic used for sending/recieving files in the chat room.
func fileTopicName(roomName string) string {
return fmt.Sprintf("%s-file", roomName)
}
4 changes: 2 additions & 2 deletions go-peer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewDHT(ctx context.Context, host host.Host, bootstrapPeers []multiaddr.Mult

// Borrowed from https://medium.com/rahasak/libp2p-pubsub-peer-discovery-with-kademlia-dht-c8b131550ac7
func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, rendezvous string) {
var routingDiscovery = routing.NewRoutingDiscovery(dht)
routingDiscovery := routing.NewRoutingDiscovery(dht)

discovery.Advertise(ctx, routingDiscovery, rendezvous)

Expand Down Expand Up @@ -198,7 +198,7 @@ func main() {
room := *roomFlag

// join the chat room
cr, err := JoinChatRoom(ctx, ps, h.ID(), nick, room)
cr, err := JoinChatRoom(ctx, h, ps, nick, room)
if err != nil {
panic(err)
}
Expand Down
40 changes: 31 additions & 9 deletions js-peer/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion js-peer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,21 @@
"debug": "^4.3.4",
"eslint": "8.35.0",
"eslint-config-next": "13.2.3",
"it-length-prefixed": "^9.0.1",
"it-map": "^3.0.3",
"it-pipe": "^3.0.1",
"libp2p": "^0.45.5",
"next": "13.2.3",
"private-ip": "^3.0.0",
"react": "18.2.0",
"react-dom": "18.2.0",
"typescript": "4.9.5",
"usehooks-ts": "^2.9.1"
"uint8arrays": "^4.0.4",
"usehooks-ts": "^2.9.1",
"uuid": "^9.0.0"
},
"devDependencies": {
"@types/uuid": "^9.0.2",
"autoprefixer": "^10.4.13",
"postcss": "^8.4.21",
"tailwindcss": "^3.2.7"
Expand Down
Loading

1 comment on commit 062ba90

@vercel
Copy link

@vercel vercel bot commented on 062ba90 Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.