Skip to content

Commit

Permalink
error-codes: add implementation for quic, yamux, websockets, webrtc
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Sep 13, 2024
1 parent cd88f97 commit 53d436f
Show file tree
Hide file tree
Showing 26 changed files with 544 additions and 50 deletions.
17 changes: 17 additions & 0 deletions core/network/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package network

import (
"context"
"fmt"
"io"

ic "github.com/libp2p/go-libp2p/core/crypto"
Expand All @@ -11,6 +12,17 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

type ConnErrorCode uint32

type ConnError struct {
Remote bool
ErrorCode ConnErrorCode
}

func (c *ConnError) Error() string {
return fmt.Sprintf("connection closed: code: %d", c.ErrorCode)
}

// Conn is a connection to a remote peer. It multiplexes streams.
// Usually there is no need to use a Conn directly, but it may
// be useful to get information about the peer on the other side:
Expand All @@ -24,6 +36,11 @@ type Conn interface {
ConnStat
ConnScoper

// CloseWithError closes the connection with errCode. The errCode is sent to the
// peer on a best effort basis. For transports that do not support sending error
// codes on connection close, the behavior is identical to calling Close.
CloseWithError(errCode ConnErrorCode) error

// ID returns an identifier that uniquely identifies this Conn within this
// host, during this run. Connection IDs may repeat across restarts.
ID() string
Expand Down
29 changes: 29 additions & 0 deletions core/network/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package network
import (
"context"
"errors"
"fmt"
"io"
"net"
"time"
Expand All @@ -11,6 +12,21 @@ import (
// ErrReset is returned when reading or writing on a reset stream.
var ErrReset = errors.New("stream reset")

type StreamErrorCode uint32

type StreamError struct {
ErrorCode StreamErrorCode
Remote bool
}

func (s *StreamError) Error() string {
return fmt.Sprintf("stream reset: code: %d", s.ErrorCode)
}

func (s *StreamError) Is(target error) bool {
return target == ErrReset
}

// MuxedStream is a bidirectional io pipe within a connection.
type MuxedStream interface {
io.Reader
Expand Down Expand Up @@ -61,6 +77,13 @@ type MuxedStream interface {
SetWriteDeadline(time.Time) error
}

type ResetWithErrorer interface {
// ResetWithError closes both ends of the stream with errCode. The errCode is sent
// to the peer on a best effort basis. For transports that do not support sending
// error codes to remote peer, the behavior is identical to calling Reset
ResetWithError(errCode StreamErrorCode) error
}

// MuxedConn represents a connection to a remote peer that has been
// extended to support stream multiplexing.
//
Expand All @@ -86,6 +109,12 @@ type MuxedConn interface {
AcceptStream() (MuxedStream, error)
}

type CloseWithErrorer interface {
// CloseWithError closes the connection with errCode. The errCode is sent
// to the peer.
CloseWithError(errCode ConnErrorCode) error
}

// Multiplexer wraps a net.Conn with a stream multiplexing
// implementation and returns a MuxedConn that supports opening
// multiple streams over the underlying net.Conn
Expand Down
4 changes: 4 additions & 0 deletions core/network/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ type Stream interface {

// Scope returns the user's view of this stream's resource scope
Scope() StreamScope

// ResetWithError closes both ends of the stream with errCode. The errCode is sent
// to the peer.
ResetWithError(errCode StreamErrorCode) error
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/libp2p/go-nat v0.2.0
github.com/libp2p/go-netroute v0.2.1
github.com/libp2p/go-reuseport v0.4.0
github.com/libp2p/go-yamux/v4 v4.0.1
github.com/libp2p/go-yamux/v4 v4.0.2-0.20240828193053-e17eaa82d8a7
github.com/libp2p/zeroconf/v2 v2.2.0
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ github.com/libp2p/go-netroute v0.2.1 h1:V8kVrpD8GK0Riv15/7VN6RbUQ3URNZVosw7H2v9t
github.com/libp2p/go-netroute v0.2.1/go.mod h1:hraioZr0fhBjG0ZRXJJ6Zj2IVEVNx6tDTFQfSmcq7mQ=
github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQscQm2s=
github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU=
github.com/libp2p/go-yamux/v4 v4.0.1 h1:FfDR4S1wj6Bw2Pqbc8Uz7pCxeRBPbwsBbEdfwiCypkQ=
github.com/libp2p/go-yamux/v4 v4.0.1/go.mod h1:NWjl8ZTLOGlozrXSOZ/HlfG++39iKNnM5wwmtQP1YB4=
github.com/libp2p/go-yamux/v4 v4.0.2-0.20240828193053-e17eaa82d8a7 h1:9DQhrYNrteSCiE8EZC1Na9AZNothvTF+NQtbnOjbxzo=
github.com/libp2p/go-yamux/v4 v4.0.2-0.20240828193053-e17eaa82d8a7/go.mod h1:PGP+3py2ZWDKABvqstBZtMnixEHNC7U/odnGylzur5o=
github.com/libp2p/zeroconf/v2 v2.2.0 h1:Cup06Jv6u81HLhIj1KasuNM/RHHrJ8T7wOTS4+Tv53Q=
github.com/libp2p/zeroconf/v2 v2.2.0/go.mod h1:fuJqLnUwZTshS3U/bMRJ3+ow/v9oid1n0DmyYyNO1Xs=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
Expand Down
8 changes: 6 additions & 2 deletions p2p/muxer/yamux/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func (c *conn) Close() error {
return c.yamux().Close()
}

func (c *conn) CloseWithError(errCode network.ConnErrorCode) error {
return c.yamux().CloseWithError(uint32(errCode))
}

// IsClosed checks if yamux.Session is in closed state.
func (c *conn) IsClosed() bool {
return c.yamux().IsClosed()
Expand All @@ -32,7 +36,7 @@ func (c *conn) IsClosed() bool {
func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
s, err := c.yamux().OpenStream(ctx)
if err != nil {
return nil, err
return nil, parseResetError(err)
}

return (*stream)(s), nil
Expand All @@ -41,7 +45,7 @@ func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (network.MuxedStream, error) {
s, err := c.yamux().AcceptStream()
return (*stream)(s), err
return (*stream)(s), parseResetError(err)
}

func (c *conn) yamux() *yamux.Session {
Expand Down
32 changes: 22 additions & 10 deletions p2p/muxer/yamux/stream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package yamux

import (
"errors"
"time"

"github.com/libp2p/go-libp2p/core/network"
Expand All @@ -13,22 +14,29 @@ type stream yamux.Stream

var _ network.MuxedStream = &stream{}

func (s *stream) Read(b []byte) (n int, err error) {
n, err = s.yamux().Read(b)
if err == yamux.ErrStreamReset {
err = network.ErrReset
func parseResetError(err error) error {
if err == nil {
return err
}
se := &yamux.StreamError{}
if errors.As(err, &se) {
return &network.StreamError{Remote: se.Remote, ErrorCode: network.StreamErrorCode(se.ErrorCode)}
}
ce := &yamux.GoAwayError{}
if errors.As(err, &ce) {
return &network.ConnError{Remote: ce.Remote, ErrorCode: network.ConnErrorCode(ce.ErrorCode)}
}
return err
}

return n, err
func (s *stream) Read(b []byte) (n int, err error) {
n, err = s.yamux().Read(b)
return n, parseResetError(err)
}

func (s *stream) Write(b []byte) (n int, err error) {
n, err = s.yamux().Write(b)
if err == yamux.ErrStreamReset {
err = network.ErrReset
}

return n, err
return n, parseResetError(err)
}

func (s *stream) Close() error {
Expand All @@ -39,6 +47,10 @@ func (s *stream) Reset() error {
return s.yamux().Reset()
}

func (s *stream) ResetWithError(errCode network.StreamErrorCode) error {
return s.yamux().ResetWithError(uint32(errCode))
}

func (s *stream) CloseRead() error {
return s.yamux().CloseRead()
}
Expand Down
1 change: 1 addition & 0 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ type mockConn struct {
}

func (m mockConn) Close() error { panic("implement me") }
func (m mockConn) CloseWithError(errCode network.ConnErrorCode) error { panic("implement me") }
func (m mockConn) LocalPeer() peer.ID { panic("implement me") }
func (m mockConn) RemotePeer() peer.ID { panic("implement me") }
func (m mockConn) RemotePublicKey() crypto.PubKey { panic("implement me") }
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/mock/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ func (s *stream) Reset() error {
return nil
}

func (s *stream) ResetWithError(errCode network.StreamErrorCode) error {
panic("not implemented")
}

func (s *stream) teardown() {
// at this point, no streams are writing.
s.conn.removeStream(s)
Expand Down
8 changes: 8 additions & 0 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,14 @@ func (c connWithMetrics) Close() error {
return c.CapableConn.Close()
}

func (c connWithMetrics) CloseWithError(errCode network.ConnErrorCode) error {
c.metricsTracer.ClosedConnection(c.dir, time.Since(c.opened), c.ConnState(), c.LocalMultiaddr())
if ce, ok := c.CapableConn.(network.CloseWithErrorer); ok {
return ce.CloseWithError(errCode)
}
return c.CapableConn.Close()
}

func (c connWithMetrics) Stat() network.ConnStats {
if cs, ok := c.CapableConn.(network.ConnStat); ok {
return cs.Stat()
Expand Down
23 changes: 20 additions & 3 deletions p2p/net/swarm/swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,20 @@ func (c *Conn) ID() string {
// open notifications must finish before we can fire off the close
// notifications).
func (c *Conn) Close() error {
c.closeOnce.Do(c.doClose)
c.closeOnce.Do(func() {
c.doClose(0)
})
return c.err
}

func (c *Conn) doClose() {
func (c *Conn) CloseWithError(errCode network.ConnErrorCode) error {
c.closeOnce.Do(func() {
c.doClose(errCode)
})
return c.err
}

func (c *Conn) doClose(errCode network.ConnErrorCode) {
c.swarm.removeConn(c)

// Prevent new streams from opening.
Expand All @@ -71,7 +80,15 @@ func (c *Conn) doClose() {
c.streams.m = nil
c.streams.Unlock()

c.err = c.conn.Close()
if errCode != 0 {
if ce, ok := c.conn.(network.CloseWithErrorer); ok {
c.err = ce.CloseWithError(errCode)
} else {
c.err = c.conn.Close()
}
} else {
c.err = c.conn.Close()
}

// Send the connectedness event after closing the connection.
// This ensures that both remote connection close and local connection
Expand Down
11 changes: 11 additions & 0 deletions p2p/net/swarm/swarm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ func (s *Stream) Reset() error {
return err
}

func (s *Stream) ResetWithError(errCode network.StreamErrorCode) error {
var err error
if se, ok := s.stream.(network.ResetWithErrorer); ok {
err = se.ResetWithError(errCode)
} else {
err = s.stream.Reset()
}
s.closeAndRemoveStream()
return err
}

func (s *Stream) closeAndRemoveStream() {
s.closeMx.Lock()
defer s.closeMx.Unlock()
Expand Down
7 changes: 7 additions & 0 deletions p2p/net/upgrader/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,10 @@ func (t *transportConn) ConnState() network.ConnectionState {
UsedEarlyMuxerNegotiation: t.usedEarlyMuxerNegotiation,
}
}

func (t *transportConn) CloseWithError(errCode network.ConnErrorCode) error {
if ce, ok := t.MuxedConn.(network.CloseWithErrorer); ok {
return ce.CloseWithError(errCode)
}
return t.Close()
}
Loading

0 comments on commit 53d436f

Please sign in to comment.