Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: file sharing #76

Merged
merged 28 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f32ac67
feat: add file-sharing UI (#4)
youngjoon-lee Jun 13, 2023
6a4eec4
feat: add request-reponse for rust (#5)
youngjoon-lee Jun 18, 2023
6e2598f
feat: add naive request-response for js (#6)
youngjoon-lee Jun 18, 2023
f9eab7a
feat: add log for file response in rust (#7)
youngjoon-lee Jun 18, 2023
c2e5e2e
refactor: rust and js (#8)
youngjoon-lee Jun 24, 2023
77b1e16
revert constant url
youngjoon-lee Jun 24, 2023
80f0963
fix npm build failure
youngjoon-lee Jul 1, 2023
1eed7d5
add file-sharing to go-peer
youngjoon-lee Jun 24, 2023
0f0e4b8
remove unnecessary debug logs
youngjoon-lee Jul 5, 2023
8c3f766
async cleanup function for useEffect
youngjoon-lee Jul 5, 2023
7fbd908
simply if statements
youngjoon-lee Aug 12, 2023
0f46513
support only ProtocolSupport::Outbound for now
youngjoon-lee Aug 12, 2023
b286c4c
fix comment
youngjoon-lee Aug 12, 2023
cb24cc2
ignore unnecessary event
youngjoon-lee Aug 12, 2023
474b2a6
remove unnecessary close()
youngjoon-lee Aug 12, 2023
fa8d8b5
refmt
youngjoon-lee Aug 12, 2023
e1b046d
do not refmt Cargo.toml
youngjoon-lee Aug 12, 2023
b45a786
modularization
youngjoon-lee Aug 12, 2023
4c8be74
Update go-peer/chatroom.go
youngjoon-lee Aug 12, 2023
0f4a0d9
Update go-peer/chatroom.go
youngjoon-lee Aug 12, 2023
6fed448
Update go-peer/chatroom.go
youngjoon-lee Aug 12, 2023
7bf0111
throw error
youngjoon-lee Aug 12, 2023
bb2cc94
undo refmt
youngjoon-lee Aug 16, 2023
1f0bc87
js: unsubscribe topics
youngjoon-lee Aug 16, 2023
174afe7
feat: resolve conflicts with `main` (#9)
thomaseizinger Sep 22, 2023
e8de2c1
add missing file (#10)
youngjoon-lee Sep 22, 2023
1300e37
revert: do not unsubscribe topic
youngjoon-lee Sep 22, 2023
2c516a8
Merge remote-tracking branch 'upstream/main' into file-sharing
youngjoon-lee Sep 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 114 additions & 18 deletions go-peer/chatroom.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package main

import (
"bufio"
"context"
"encoding/binary"
"fmt"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"

pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -19,13 +23,15 @@ type ChatRoom struct {
Messages chan *ChatMessage
SysMessages chan *ChatMessage

ctx context.Context
ps *pubsub.PubSub
topic *pubsub.Topic
sub *pubsub.Subscription
ctx context.Context
h host.Host
ps *pubsub.PubSub
chatTopic *pubsub.Topic
chatSub *pubsub.Subscription
fileTopic *pubsub.Topic
fileSub *pubsub.Subscription

roomName string
self peer.ID
nick string
}

Expand All @@ -38,25 +44,39 @@ type ChatMessage struct {

// JoinChatRoom tries to subscribe to the PubSub topic for the room name, returning
// a ChatRoom on success.
func JoinChatRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, nickname string, roomName string) (*ChatRoom, error) {
// join the pubsub topic
topic, err := ps.Join(topicName(roomName))
func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname string, roomName string) (*ChatRoom, error) {
// join the pubsub chatTopic
chatTopic, err := ps.Join(chatTopicName(roomName))
if err != nil {
return nil, err
}

// and subscribe to it
sub, err := topic.Subscribe()
chatSub, err := chatTopic.Subscribe()
if err != nil {
return nil, err
}

// join the pubsub fileTopic
fileTopic, err := ps.Join(fileTopicName(roomName))
if err != nil {
return nil, err
}

// and subscribe to it
fileSub, err := fileTopic.Subscribe()
if err != nil {
return nil, err
}

cr := &ChatRoom{
ctx: ctx,
h: h,
ps: ps,
topic: topic,
sub: sub,
self: selfID,
chatTopic: chatTopic,
chatSub: chatSub,
fileTopic: fileTopic,
fileSub: fileSub,
nick: nickname,
roomName: roomName,
Messages: make(chan *ChatMessage, ChatRoomBufSize),
Expand All @@ -70,23 +90,29 @@ func JoinChatRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, nickna

// Publish sends a message to the pubsub topic.
func (cr *ChatRoom) Publish(message string) error {
return cr.topic.Publish(cr.ctx, []byte(message))
return cr.chatTopic.Publish(cr.ctx, []byte(message))
}

func (cr *ChatRoom) ListPeers() []peer.ID {
return cr.ps.ListPeers(topicName(cr.roomName))
return cr.ps.ListPeers(chatTopicName(cr.roomName))
}

// readLoop pulls messages from the pubsub topic and pushes them onto the Messages channel.
// readLoop pulls messages from the pubsub chat/file topic and handles them.
func (cr *ChatRoom) readLoop() {
go cr.readChatLoop()
go cr.readFileLoop()
}

// readChatLoop pulls messages from the pubsub chat topic and pushes them onto the Messages channel.
func (cr *ChatRoom) readChatLoop() {
for {
msg, err := cr.sub.Next(cr.ctx)
msg, err := cr.chatSub.Next(cr.ctx)
if err != nil {
close(cr.Messages)
return
}
// only forward messages delivered by others
if msg.ReceivedFrom == cr.self {
if msg.ReceivedFrom == cr.h.ID() {
continue
}
cm := new(ChatMessage)
Expand All @@ -98,6 +124,76 @@ func (cr *ChatRoom) readLoop() {
}
}

func topicName(roomName string) string {
// readFileLoop pulls messages from the pubsub file topic and handles them.
func (cr *ChatRoom) readFileLoop() {
for {
msg, err := cr.fileSub.Next(cr.ctx)
if err != nil {
close(cr.Messages)
return
}
// only forward messages delivered by others
if msg.ReceivedFrom == cr.h.ID() {
continue
}

fileID := msg.Data
fileBody, err := cr.requestFile(msg.GetFrom(), fileID)
if err != nil {
close(cr.Messages)
return
}

cm := new(ChatMessage)
cm.Message = fmt.Sprintf("File: %s (%v bytes) from %s", string(fileID), len(fileBody), msg.GetFrom().String())
cm.SenderID = msg.ID
cm.SenderNick = string(msg.ID[len(msg.ID)-8])
// send valid messages onto the Messages channel
cr.Messages <- cm
}
}

youngjoon-lee marked this conversation as resolved.
Show resolved Hide resolved
// requestFile sends a request to the peer to send the file with the given fileID.
func (cr *ChatRoom) requestFile(toPeer peer.ID, fileID []byte) ([]byte, error) {
stream, err := cr.h.NewStream(context.Background(), toPeer, "/universal-connectivity-file/1")
if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
defer stream.Close()

reqLen := binary.AppendUvarint([]byte{}, uint64(len(fileID)))
if _, err := stream.Write(reqLen); err != nil {
return nil, fmt.Errorf("failed to write fileID to the stream: %w", err)
}
if _, err := stream.Write(fileID); err != nil {
return nil, fmt.Errorf("failed to write fileID to the stream: %w", err)
}
if err := stream.CloseWrite(); err != nil {
return nil, fmt.Errorf("failed to close write stream: %w", err)
}

reader := bufio.NewReader(stream)
respLen, err := binary.ReadUvarint(reader)
if err != nil {
return nil, fmt.Errorf("failed to read response length prefix: %w", err)
}
fileBody := make([]byte, respLen)
if _, err := reader.Read(fileBody); err != nil {
return nil, fmt.Errorf("failed to read fileBody from the stream: %w", err)
}
if err := stream.CloseRead(); err != nil {
return nil, fmt.Errorf("failed to close read stream: %w", err)
}

return fileBody, nil
}

youngjoon-lee marked this conversation as resolved.
Show resolved Hide resolved
// chatTopicName returns the name of the pubsub topic for the chat room.
func chatTopicName(roomName string) string {
return roomName
}

youngjoon-lee marked this conversation as resolved.
Show resolved Hide resolved
// fileTopicName returns the name of the pubsub topic used for sending/recieving files in the chat room.
func fileTopicName(roomName string) string {
return fmt.Sprintf("%s-file", roomName)
}
4 changes: 2 additions & 2 deletions go-peer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewDHT(ctx context.Context, host host.Host, bootstrapPeers []multiaddr.Mult

// Borrowed from https://medium.com/rahasak/libp2p-pubsub-peer-discovery-with-kademlia-dht-c8b131550ac7
func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, rendezvous string) {
var routingDiscovery = routing.NewRoutingDiscovery(dht)
routingDiscovery := routing.NewRoutingDiscovery(dht)

discovery.Advertise(ctx, routingDiscovery, rendezvous)

Expand Down Expand Up @@ -198,7 +198,7 @@ func main() {
room := *roomFlag

// join the chat room
cr, err := JoinChatRoom(ctx, ps, h.ID(), nick, room)
cr, err := JoinChatRoom(ctx, h, ps, nick, room)
if err != nil {
panic(err)
}
Expand Down
40 changes: 31 additions & 9 deletions js-peer/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion js-peer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,21 @@
"debug": "^4.3.4",
"eslint": "8.35.0",
"eslint-config-next": "13.2.3",
"it-length-prefixed": "^9.0.1",
"it-map": "^3.0.3",
"it-pipe": "^3.0.1",
"libp2p": "^0.45.5",
"next": "13.2.3",
"private-ip": "^3.0.0",
"react": "18.2.0",
"react-dom": "18.2.0",
"typescript": "4.9.5",
"usehooks-ts": "^2.9.1"
"uint8arrays": "^4.0.4",
"usehooks-ts": "^2.9.1",
"uuid": "^9.0.0"
},
"devDependencies": {
"@types/uuid": "^9.0.2",
"autoprefixer": "^10.4.13",
"postcss": "^8.4.21",
"tailwindcss": "^3.2.7"
Expand Down
Loading