Skip to content

Commit

Permalink
add file-sharing to go-peer
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjoon-lee committed Jul 2, 2023
1 parent 80f0963 commit 1eed7d5
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 20 deletions.
129 changes: 111 additions & 18 deletions go-peer/chatroom.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package main

import (
"bufio"
"context"
"encoding/binary"
"fmt"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"

pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -19,13 +23,15 @@ type ChatRoom struct {
Messages chan *ChatMessage
SysMessages chan *ChatMessage

ctx context.Context
ps *pubsub.PubSub
topic *pubsub.Topic
sub *pubsub.Subscription
ctx context.Context
h host.Host
ps *pubsub.PubSub
chatTopic *pubsub.Topic
chatSub *pubsub.Subscription
fileTopic *pubsub.Topic
fileSub *pubsub.Subscription

roomName string
self peer.ID
nick string
}

Expand All @@ -38,25 +44,39 @@ type ChatMessage struct {

// JoinChatRoom tries to subscribe to the PubSub topic for the room name, returning
// a ChatRoom on success.
func JoinChatRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, nickname string, roomName string) (*ChatRoom, error) {
// join the pubsub topic
topic, err := ps.Join(topicName(roomName))
func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname string, roomName string) (*ChatRoom, error) {
// join the pubsub chatTopic
chatTopic, err := ps.Join(chatTopicName(roomName))
if err != nil {
return nil, err
}

// and subscribe to it
sub, err := topic.Subscribe()
chatSub, err := chatTopic.Subscribe()
if err != nil {
return nil, err
}

// join the pubsub fileTopic
fileTopic, err := ps.Join(fileTopicName(roomName))
if err != nil {
return nil, err
}

// and subscribe to it
fileSub, err := fileTopic.Subscribe()
if err != nil {
return nil, err
}

cr := &ChatRoom{
ctx: ctx,
h: h,
ps: ps,
topic: topic,
sub: sub,
self: selfID,
chatTopic: chatTopic,
chatSub: chatSub,
fileTopic: fileTopic,
fileSub: fileSub,
nick: nickname,
roomName: roomName,
Messages: make(chan *ChatMessage, ChatRoomBufSize),
Expand All @@ -70,23 +90,29 @@ func JoinChatRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, nickna

// Publish sends a message to the pubsub topic.
func (cr *ChatRoom) Publish(message string) error {
return cr.topic.Publish(cr.ctx, []byte(message))
return cr.chatTopic.Publish(cr.ctx, []byte(message))
}

func (cr *ChatRoom) ListPeers() []peer.ID {
return cr.ps.ListPeers(topicName(cr.roomName))
return cr.ps.ListPeers(chatTopicName(cr.roomName))
}

// readLoop pulls messages from the pubsub topic and pushes them onto the Messages channel.
// readLoop pulls messages from the pubsub chat/file topic and handles them.
func (cr *ChatRoom) readLoop() {
go cr.readChatLoop()
go cr.readFileLoop()
}

// readChatLoop pulls messages from the pubsub chat topic and pushes them onto the Messages channel.
func (cr *ChatRoom) readChatLoop() {
for {
msg, err := cr.sub.Next(cr.ctx)
msg, err := cr.chatSub.Next(cr.ctx)
if err != nil {
close(cr.Messages)
return
}
// only forward messages delivered by others
if msg.ReceivedFrom == cr.self {
if msg.ReceivedFrom == cr.h.ID() {
continue
}
cm := new(ChatMessage)
Expand All @@ -98,6 +124,73 @@ func (cr *ChatRoom) readLoop() {
}
}

func topicName(roomName string) string {
// readFileLoop pulls messages from the pubsub file topic and handles them.
func (cr *ChatRoom) readFileLoop() {
for {
msg, err := cr.fileSub.Next(cr.ctx)
if err != nil {
close(cr.Messages)
return
}
// only forward messages delivered by others
if msg.ReceivedFrom == cr.h.ID() {
continue
}

fileID := msg.Data
fileBody, err := cr.requestFile(msg.GetFrom(), fileID)
if err != nil {
close(cr.Messages)
return
}

cm := new(ChatMessage)
cm.Message = fmt.Sprintf("File: %s (%v bytes) from %s", string(fileID), len(fileBody), msg.GetFrom().String())
cm.SenderID = msg.ID
cm.SenderNick = string(msg.ID[len(msg.ID)-8])
// send valid messages onto the Messages channel
cr.Messages <- cm
}
}

func (cr *ChatRoom) requestFile(toPeer peer.ID, fileID []byte) ([]byte, error) {
stream, err := cr.h.NewStream(context.Background(), toPeer, "/universal-connectivity-file/1")
if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
defer stream.Close()

reqLen := binary.AppendUvarint([]byte{}, uint64(len(fileID)))
if _, err := stream.Write(reqLen); err != nil {
return nil, fmt.Errorf("failed to write fileID to the stream: %w", err)
}
if _, err := stream.Write(fileID); err != nil {
return nil, fmt.Errorf("failed to write fileID to the stream: %w", err)
}
if err := stream.CloseWrite(); err != nil {
return nil, fmt.Errorf("failed to close write stream: %w", err)
}

reader := bufio.NewReader(stream)
respLen, err := binary.ReadUvarint(reader)
if err != nil {
return nil, fmt.Errorf("failed to read response length prefix: %w", err)
}
fileBody := make([]byte, respLen)
if _, err := reader.Read(fileBody); err != nil {
return nil, fmt.Errorf("failed to read fileBody from the stream: %w", err)
}
if err := stream.CloseRead(); err != nil {
return nil, fmt.Errorf("failed to close read stream: %w", err)
}

return fileBody, nil
}

func chatTopicName(roomName string) string {
return roomName
}

func fileTopicName(roomName string) string {
return fmt.Sprintf("%s-file", roomName)
}
4 changes: 2 additions & 2 deletions go-peer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewDHT(ctx context.Context, host host.Host, bootstrapPeers []multiaddr.Mult

// Borrowed from https://medium.com/rahasak/libp2p-pubsub-peer-discovery-with-kademlia-dht-c8b131550ac7
func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, rendezvous string) {
var routingDiscovery = routing.NewRoutingDiscovery(dht)
routingDiscovery := routing.NewRoutingDiscovery(dht)

discovery.Advertise(ctx, routingDiscovery, rendezvous)

Expand Down Expand Up @@ -198,7 +198,7 @@ func main() {
room := *roomFlag

// join the chat room
cr, err := JoinChatRoom(ctx, ps, h.ID(), nick, room)
cr, err := JoinChatRoom(ctx, h, ps, nick, room)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 1eed7d5

Please sign in to comment.