Skip to content

Commit

Permalink
Merge pull request #129 from libp2p/qlog
Browse files Browse the repository at this point in the history
export qlogs when the QLOGDIR env variable is set
  • Loading branch information
marten-seemann committed Apr 6, 2020
2 parents d7a0402 + 4710507 commit e957337
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 16 deletions.
25 changes: 25 additions & 0 deletions p2p/transport/quic/buffered_write_closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package libp2pquic

import (
"bufio"
"io"
)

type bufferedWriteCloser struct {
*bufio.Writer
io.Closer
}

func newBufferedWriteCloser(writer *bufio.Writer, closer io.Closer) io.WriteCloser {
return &bufferedWriteCloser{
Writer: writer,
Closer: closer,
}
}

func (h bufferedWriteCloser) Close() error {
if err := h.Writer.Flush(); err != nil {
return err
}
return h.Closer.Close()
}
26 changes: 26 additions & 0 deletions p2p/transport/quic/buffered_write_closer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package libp2pquic

import (
"bufio"
"bytes"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

type nopCloser struct{}

func (nopCloser) Close() error { return nil }

var _ = Describe("buffered io.WriteCloser", func() {
It("flushes before closing", func() {
buf := &bytes.Buffer{}

w := bufio.NewWriter(buf)
wc := newBufferedWriteCloser(w, &nopCloser{})
wc.Write([]byte("foobar"))
Expect(buf.Len()).To(BeZero())
Expect(wc.Close()).To(Succeed())
Expect(buf.String()).To(Equal("foobar"))
})
})
4 changes: 2 additions & 2 deletions p2p/transport/quic/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,13 @@ var _ = Describe("Connection", func() {
Expect(err).ToNot(HaveOccurred())

// make sure that connection attempts fails
clientTransport.(*transport).config.HandshakeTimeout = 250 * time.Millisecond
clientTransport.(*transport).clientConfig.HandshakeTimeout = 250 * time.Millisecond
_, err = clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())

// now allow the address and make sure the connection goes through
clientTransport.(*transport).config.HandshakeTimeout = 2 * time.Second
clientTransport.(*transport).clientConfig.HandshakeTimeout = 2 * time.Second
filters.AddFilter(ipNet, filter.ActionAccept)
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
Expect(err).ToNot(HaveOccurred())
Expand Down
2 changes: 1 addition & 1 deletion p2p/transport/quic/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newListener(rconn *reuseConn, t *transport, localPeer peer.ID, key ic.PrivK
conf, _ := identity.ConfigForAny()
return conf, nil
}
ln, err := quic.Listen(rconn, &tlsConf, t.config)
ln, err := quic.Listen(rconn, &tlsConf, t.serverConfig)
if err != nil {
return nil, err
}
Expand Down
59 changes: 46 additions & 13 deletions p2p/transport/quic/transport.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package libp2pquic

import (
"bufio"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"net"
"os"
"time"

"github.com/minio/sha256-simd"
"golang.org/x/crypto/hkdf"
Expand Down Expand Up @@ -87,11 +92,12 @@ func (c *connManager) Dial(network string, raddr *net.UDPAddr) (*reuseConn, erro

// The Transport implements the tpt.Transport interface for QUIC connections.
type transport struct {
privKey ic.PrivKey
localPeer peer.ID
identity *p2ptls.Identity
connManager *connManager
config *quic.Config
privKey ic.PrivKey
localPeer peer.ID
identity *p2ptls.Identity
connManager *connManager
serverConfig *quic.Config
clientConfig *quic.Config
}

var _ tpt.Transport = &transport{}
Expand Down Expand Up @@ -125,13 +131,17 @@ func NewTransport(key ic.PrivKey, psk pnet.PSK, filters *filter.Filters) (tpt.Tr
return nil, err
}

return &transport{
privKey: key,
localPeer: localPeer,
identity: identity,
connManager: connManager,
config: config,
}, nil
t := &transport{
privKey: key,
localPeer: localPeer,
identity: identity,
connManager: connManager,
serverConfig: config,
clientConfig: config.Clone(),
}
t.serverConfig.GetLogWriter = t.GetLogWriterFor("server")
t.clientConfig.GetLogWriter = t.GetLogWriterFor("client")
return t, nil
}

// Dial dials a new QUIC connection
Expand All @@ -153,7 +163,7 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp
if err != nil {
return nil, err
}
sess, err := quic.DialContext(ctx, pconn, addr, host, tlsConf, t.config)
sess, err := quic.DialContext(ctx, pconn, addr, host, tlsConf, t.clientConfig)
if err != nil {
pconn.DecreaseCount()
return nil, err
Expand Down Expand Up @@ -190,6 +200,29 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp
}, nil
}

func (t *transport) GetLogWriterFor(role string) func([]byte) io.WriteCloser {
qlogDir := os.Getenv("QLOGDIR")
if len(qlogDir) == 0 {
return nil
}
return func(connID []byte) io.WriteCloser {
// create the QLOGDIR, if it doesn't exist
if err := os.MkdirAll(qlogDir, 0777); err != nil {
log.Errorf("creating the QLOGDIR failed: %s", err)
return nil
}
t := time.Now().Format(time.RFC3339Nano)
filename := fmt.Sprintf("%s/log_%s_%s_%x.qlog.gz", qlogDir, t, role, connID)
f, err := os.Create(filename)
if err != nil {
log.Errorf("unable to create qlog file %s: %s", filename, err)
return nil
}
gz := gzip.NewWriter(f)
return newBufferedWriteCloser(bufio.NewWriter(gz), gz)
}
}

// Don't use mafmt.QUIC as we don't want to dial DNS addresses. Just /ip{4,6}/udp/quic
var dialMatcher = mafmt.And(mafmt.IP, mafmt.Base(ma.P_UDP), mafmt.Base(ma.P_QUIC))

Expand Down

0 comments on commit e957337

Please sign in to comment.