From bdbf17697e4484ebea7ab2466e2b1946b68e7e6e Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 26 Mar 2020 21:58:20 +0700 Subject: [PATCH 1/3] export qlogs when the QLOGDIR env variable is set --- p2p/transport/quic/buffered_write_closer.go | 25 ++++++++ .../quic/buffered_write_closer_test.go | 26 ++++++++ p2p/transport/quic/conn_test.go | 4 +- p2p/transport/quic/listener.go | 2 +- p2p/transport/quic/transport.go | 62 +++++++++++++++---- 5 files changed, 103 insertions(+), 16 deletions(-) create mode 100644 p2p/transport/quic/buffered_write_closer.go create mode 100644 p2p/transport/quic/buffered_write_closer_test.go diff --git a/p2p/transport/quic/buffered_write_closer.go b/p2p/transport/quic/buffered_write_closer.go new file mode 100644 index 0000000000..aeeef0035a --- /dev/null +++ b/p2p/transport/quic/buffered_write_closer.go @@ -0,0 +1,25 @@ +package libp2pquic + +import ( + "bufio" + "io" +) + +type bufferedWriteCloser struct { + *bufio.Writer + io.Closer +} + +func newBufferedWriteCloser(writer *bufio.Writer, closer io.Closer) io.WriteCloser { + return &bufferedWriteCloser{ + Writer: writer, + Closer: closer, + } +} + +func (h bufferedWriteCloser) Close() error { + if err := h.Writer.Flush(); err != nil { + return err + } + return h.Closer.Close() +} diff --git a/p2p/transport/quic/buffered_write_closer_test.go b/p2p/transport/quic/buffered_write_closer_test.go new file mode 100644 index 0000000000..949e211c22 --- /dev/null +++ b/p2p/transport/quic/buffered_write_closer_test.go @@ -0,0 +1,26 @@ +package libp2pquic + +import ( + "bufio" + "bytes" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +type nopCloser struct{} + +func (nopCloser) Close() error { return nil } + +var _ = Describe("buffered io.WriteCloser", func() { + It("flushes before closing", func() { + buf := &bytes.Buffer{} + + w := bufio.NewWriter(buf) + wc := newBufferedWriteCloser(w, &nopCloser{}) + wc.Write([]byte("foobar")) + Expect(buf.Len()).To(BeZero()) + Expect(wc.Close()).To(Succeed()) + Expect(buf.String()).To(Equal("foobar")) + }) +}) diff --git a/p2p/transport/quic/conn_test.go b/p2p/transport/quic/conn_test.go index d70af9f6e4..61320343ca 100644 --- a/p2p/transport/quic/conn_test.go +++ b/p2p/transport/quic/conn_test.go @@ -185,13 +185,13 @@ var _ = Describe("Connection", func() { Expect(err).ToNot(HaveOccurred()) // make sure that connection attempts fails - clientTransport.(*transport).config.HandshakeTimeout = 250 * time.Millisecond + clientTransport.(*transport).clientConfig.HandshakeTimeout = 250 * time.Millisecond _, err = clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID) Expect(err).To(HaveOccurred()) Expect(err.(net.Error).Timeout()).To(BeTrue()) // now allow the address and make sure the connection goes through - clientTransport.(*transport).config.HandshakeTimeout = 2 * time.Second + clientTransport.(*transport).clientConfig.HandshakeTimeout = 2 * time.Second filters.AddFilter(ipNet, filter.ActionAccept) conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID) Expect(err).ToNot(HaveOccurred()) diff --git a/p2p/transport/quic/listener.go b/p2p/transport/quic/listener.go index 422f7be96c..9c77d24bac 100644 --- a/p2p/transport/quic/listener.go +++ b/p2p/transport/quic/listener.go @@ -36,7 +36,7 @@ func newListener(rconn *reuseConn, t *transport, localPeer peer.ID, key ic.PrivK conf, _ := identity.ConfigForAny() return conf, nil } - ln, err := quic.Listen(rconn, &tlsConf, t.config) + ln, err := quic.Listen(rconn, &tlsConf, t.serverConfig) if err != nil { return nil, err } diff --git a/p2p/transport/quic/transport.go b/p2p/transport/quic/transport.go index fcb6c82ff1..6707aa64b2 100644 --- a/p2p/transport/quic/transport.go +++ b/p2p/transport/quic/transport.go @@ -1,10 +1,15 @@ package libp2pquic import ( + "bufio" + "compress/gzip" "context" "errors" + "fmt" "io" "net" + "os" + "time" "github.com/minio/sha256-simd" "golang.org/x/crypto/hkdf" @@ -87,11 +92,12 @@ func (c *connManager) Dial(network string, raddr *net.UDPAddr) (*reuseConn, erro // The Transport implements the tpt.Transport interface for QUIC connections. type transport struct { - privKey ic.PrivKey - localPeer peer.ID - identity *p2ptls.Identity - connManager *connManager - config *quic.Config + privKey ic.PrivKey + localPeer peer.ID + identity *p2ptls.Identity + connManager *connManager + serverConfig *quic.Config + clientConfig *quic.Config } var _ tpt.Transport = &transport{} @@ -125,13 +131,17 @@ func NewTransport(key ic.PrivKey, psk pnet.PSK, filters *filter.Filters) (tpt.Tr return nil, err } - return &transport{ - privKey: key, - localPeer: localPeer, - identity: identity, - connManager: connManager, - config: config, - }, nil + t := &transport{ + privKey: key, + localPeer: localPeer, + identity: identity, + connManager: connManager, + serverConfig: config, + clientConfig: config.Clone(), + } + t.serverConfig.GetLogWriter = t.GetLogWriterFor("server") + t.clientConfig.GetLogWriter = t.GetLogWriterFor("client") + return t, nil } // Dial dials a new QUIC connection @@ -153,7 +163,7 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp if err != nil { return nil, err } - sess, err := quic.DialContext(ctx, pconn, addr, host, tlsConf, t.config) + sess, err := quic.DialContext(ctx, pconn, addr, host, tlsConf, t.clientConfig) if err != nil { pconn.DecreaseCount() return nil, err @@ -190,6 +200,32 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp }, nil } +func (t *transport) GetLogWriterFor(role string) func([]byte) io.WriteCloser { + qlogDir := os.Getenv("QLOGDIR") + if len(qlogDir) == 0 { + return nil + } + return func(connID []byte) io.WriteCloser { + // create the QLOGDIR, if it doesn't exist + if _, err := os.Stat(qlogDir); os.IsNotExist(err) { + if err := os.MkdirAll(qlogDir, 0777); err != nil { + log.Errorf("creating the QLOGDIR failed: %s", err) + return nil + } + } + now := time.Now() + t := fmt.Sprintf("%d-%02d-%02dT%02d-%02d-%02d-%06d", now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), now.Second(), now.Nanosecond()/1000) + filename := fmt.Sprintf("%s/log_%s_%s_%x.qlog.gz", qlogDir, t, role, connID) + f, err := os.Create(filename) + if err != nil { + log.Errorf("unable to create qlog file %s: %s", filename, err) + return nil + } + gz := gzip.NewWriter(f) + return newBufferedWriteCloser(bufio.NewWriter(gz), gz) + } +} + // Don't use mafmt.QUIC as we don't want to dial DNS addresses. Just /ip{4,6}/udp/quic var dialMatcher = mafmt.And(mafmt.IP, mafmt.Base(ma.P_UDP), mafmt.Base(ma.P_QUIC)) From 6e90057f87afef38295631521a8450c4009e028e Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 6 Apr 2020 12:32:53 +0700 Subject: [PATCH 2/3] use RFC3339Nano date format for qlog file names --- p2p/transport/quic/transport.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/p2p/transport/quic/transport.go b/p2p/transport/quic/transport.go index 6707aa64b2..7c9b959797 100644 --- a/p2p/transport/quic/transport.go +++ b/p2p/transport/quic/transport.go @@ -213,8 +213,7 @@ func (t *transport) GetLogWriterFor(role string) func([]byte) io.WriteCloser { return nil } } - now := time.Now() - t := fmt.Sprintf("%d-%02d-%02dT%02d-%02d-%02d-%06d", now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), now.Second(), now.Nanosecond()/1000) + t := time.Now().Format(time.RFC3339Nano) filename := fmt.Sprintf("%s/log_%s_%s_%x.qlog.gz", qlogDir, t, role, connID) f, err := os.Create(filename) if err != nil { From 4710507714496fea3a6f03727ccb3573f5d509e2 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 6 Apr 2020 12:34:16 +0700 Subject: [PATCH 3/3] simplify the mkdir for the QLOGDIR --- p2p/transport/quic/transport.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/p2p/transport/quic/transport.go b/p2p/transport/quic/transport.go index 7c9b959797..059b9f3685 100644 --- a/p2p/transport/quic/transport.go +++ b/p2p/transport/quic/transport.go @@ -207,11 +207,9 @@ func (t *transport) GetLogWriterFor(role string) func([]byte) io.WriteCloser { } return func(connID []byte) io.WriteCloser { // create the QLOGDIR, if it doesn't exist - if _, err := os.Stat(qlogDir); os.IsNotExist(err) { - if err := os.MkdirAll(qlogDir, 0777); err != nil { - log.Errorf("creating the QLOGDIR failed: %s", err) - return nil - } + if err := os.MkdirAll(qlogDir, 0777); err != nil { + log.Errorf("creating the QLOGDIR failed: %s", err) + return nil } t := time.Now().Format(time.RFC3339Nano) filename := fmt.Sprintf("%s/log_%s_%s_%x.qlog.gz", qlogDir, t, role, connID)