From aacc91fe0c12f7ac68f8312746f3f29f11b8c380 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 22 May 2019 18:28:04 +0100 Subject: [PATCH 01/13] initial commit. Consolidate test utilities, harnesses and suites (#1) --- p2p/transport/testsuite/stream_suite.go | 479 +++++++++++++++++++++ p2p/transport/testsuite/transport_suite.go | 287 ++++++++++++ p2p/transport/testsuite/utils_suite.go | 45 ++ 3 files changed, 811 insertions(+) create mode 100644 p2p/transport/testsuite/stream_suite.go create mode 100644 p2p/transport/testsuite/transport_suite.go create mode 100644 p2p/transport/testsuite/utils_suite.go diff --git a/p2p/transport/testsuite/stream_suite.go b/p2p/transport/testsuite/stream_suite.go new file mode 100644 index 0000000000..6f978e57be --- /dev/null +++ b/p2p/transport/testsuite/stream_suite.go @@ -0,0 +1,479 @@ +package ttransport + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "os" + "runtime/debug" + "strconv" + "sync" + "testing" + "time" + + crand "crypto/rand" + mrand "math/rand" + + "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/transport" + "github.com/libp2p/go-libp2p-testing/race" + + ma "github.com/multiformats/go-multiaddr" +) + +// VerboseDebugging can be set to true to enable verbose debug logging in the +// stream stress tests. +var VerboseDebugging = false + +var randomness []byte + +var StressTestTimeout = 1 * time.Minute + +func init() { + // read 1MB of randomness + randomness = make([]byte, 1<<20) + if _, err := crand.Read(randomness); err != nil { + panic(err) + } + + if timeout := os.Getenv("TEST_STRESS_TIMEOUT_MS"); timeout != "" { + if v, err := strconv.ParseInt(timeout, 10, 32); err == nil { + StressTestTimeout = time.Duration(v) * time.Millisecond + } + } +} + +type Options struct { + connNum int + streamNum int + msgNum int + msgMin int + msgMax int +} + +func fullClose(t *testing.T, s mux.MuxedStream) { + if err := s.Close(); err != nil { + t.Error(err) + s.Reset() + return + } + b, err := ioutil.ReadAll(s) + if err != nil { + t.Error(err) + } + if len(b) != 0 { + t.Error("expected to be done reading") + } +} + +func randBuf(size int) []byte { + n := len(randomness) - size + if size < 1 { + panic(fmt.Errorf("requested too large buffer (%d). max is %d", size, len(randomness))) + } + + start := mrand.Intn(n) + return randomness[start : start+size] +} + +func checkErr(t *testing.T, err error) { + t.Helper() + if err != nil { + debug.PrintStack() + // TODO: not safe to call in parallel + t.Fatal(err) + } +} + +func debugLog(t *testing.T, s string, args ...interface{}) { + if VerboseDebugging { + t.Logf(s, args...) + } +} + +func echoStream(t *testing.T, s mux.MuxedStream) { + defer s.Close() + // echo everything + var err error + if VerboseDebugging { + t.Logf("accepted stream") + _, err = io.Copy(&logWriter{t, s}, s) + t.Log("closing stream") + } else { + _, err = io.Copy(s, s) // echo everything + } + if err != nil { + t.Error(err) + } +} + +type logWriter struct { + t *testing.T + W io.Writer +} + +func (lw *logWriter) Write(buf []byte) (int, error) { + lw.t.Logf("logwriter: writing %d bytes", len(buf)) + return lw.W.Write(buf) +} + +func goServe(t *testing.T, l transport.Listener) (done func()) { + closed := make(chan struct{}, 1) + + go func() { + for { + c, err := l.Accept() + if err != nil { + select { + case <-closed: + return // closed naturally. + default: + checkErr(t, err) + } + } + + debugLog(t, "accepted connection") + go func() { + for { + str, err := c.AcceptStream() + if err != nil { + break + } + go echoStream(t, str) + } + }() + } + }() + + return func() { + closed <- struct{}{} + } +} + +func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID, opt Options) { + msgsize := 1 << 11 + errs := make(chan error, 0) // dont block anything. + + rateLimitN := 5000 // max of 5k funcs, because -race has 8k max. + rateLimitChan := make(chan struct{}, rateLimitN) + for i := 0; i < rateLimitN; i++ { + rateLimitChan <- struct{}{} + } + + rateLimit := func(f func()) { + <-rateLimitChan + f() + rateLimitChan <- struct{}{} + } + + writeStream := func(s mux.MuxedStream, bufs chan<- []byte) { + debugLog(t, "writeStream %p, %d msgNum", s, opt.msgNum) + + for i := 0; i < opt.msgNum; i++ { + buf := randBuf(msgsize) + bufs <- buf + debugLog(t, "%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.msgNum, buf[:3]) + if _, err := s.Write(buf); err != nil { + errs <- fmt.Errorf("s.Write(buf): %s", err) + continue + } + } + } + + readStream := func(s mux.MuxedStream, bufs <-chan []byte) { + debugLog(t, "readStream %p, %d msgNum", s, opt.msgNum) + + buf2 := make([]byte, msgsize) + i := 0 + for buf1 := range bufs { + i++ + debugLog(t, "%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3]) + + if _, err := io.ReadFull(s, buf2); err != nil { + errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err) + debugLog(t, "%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3]) + continue + } + if !bytes.Equal(buf1, buf2) { + errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]) + } + } + } + + openStreamAndRW := func(c mux.MuxedConn) { + debugLog(t, "openStreamAndRW %p, %d opt.msgNum", c, opt.msgNum) + + s, err := c.OpenStream() + if err != nil { + errs <- fmt.Errorf("Failed to create NewStream: %s", err) + return + } + + bufs := make(chan []byte, opt.msgNum) + go func() { + writeStream(s, bufs) + close(bufs) + }() + + readStream(s, bufs) + fullClose(t, s) + } + + openConnAndRW := func() { + debugLog(t, "openConnAndRW") + + l, err := ta.Listen(maddr) + checkErr(t, err) + + done := goServe(t, l) + defer done() + + c, err := tb.Dial(context.Background(), l.Multiaddr(), peerA) + checkErr(t, err) + + // serve the outgoing conn, because some muxers assume + // that we _always_ call serve. (this is an error?) + go func() { + debugLog(t, "serving connection") + for { + str, err := c.AcceptStream() + if err != nil { + break + } + go echoStream(t, str) + } + }() + + var wg sync.WaitGroup + for i := 0; i < opt.streamNum; i++ { + wg.Add(1) + go rateLimit(func() { + defer wg.Done() + openStreamAndRW(c) + }) + } + wg.Wait() + c.Close() + } + + openConnsAndRW := func() { + debugLog(t, "openConnsAndRW, %d conns", opt.connNum) + + var wg sync.WaitGroup + for i := 0; i < opt.connNum; i++ { + wg.Add(1) + go rateLimit(func() { + defer wg.Done() + openConnAndRW() + }) + } + wg.Wait() + } + + go func() { + openConnsAndRW() + close(errs) // done + }() + + for err := range errs { + t.Error(err) + } + +} + +func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { + l, err := ta.Listen(maddr) + checkErr(t, err) + defer l.Close() + + count := 10000 + workers := 5 + + if race.WithRace() { + // the race detector can only deal with 8128 simultaneous goroutines, so let's make sure we don't go overboard. + count = 1000 + } + + var ( + connA, connB transport.CapableConn + ) + + accepted := make(chan error, 1) + go func() { + var err error + connA, err = l.Accept() + accepted <- err + }() + connB, err = tb.Dial(context.Background(), l.Multiaddr(), peerA) + checkErr(t, err) + checkErr(t, <-accepted) + + defer func() { + if connA != nil { + connA.Close() + } + if connB != nil { + connB.Close() + } + }() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < workers; j++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < count; i++ { + s, err := connA.OpenStream() + if err != nil { + t.Error(err) + return + } + wg.Add(1) + go func() { + defer wg.Done() + fullClose(t, s) + }() + } + }() + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < count*workers; i++ { + str, err := connB.AcceptStream() + if err != nil { + break + } + wg.Add(1) + go func() { + defer wg.Done() + fullClose(t, str) + }() + } + }() + + timeout := time.After(StressTestTimeout) + done := make(chan struct{}) + + go func() { + wg.Wait() + close(done) + }() + + select { + case <-timeout: + t.Fatal("timed out receiving streams") + case <-done: + } +} + +func SubtestStreamReset(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { + l, err := ta.Listen(maddr) + checkErr(t, err) + + done := make(chan struct{}, 2) + go func() { + muxa, err := l.Accept() + checkErr(t, err) + + s, err := muxa.OpenStream() + if err != nil { + panic(err) + } + + // Some transports won't open the stream until we write. That's + // fine. + s.Write([]byte("foo")) + + time.Sleep(time.Millisecond * 50) + + _, err = s.Write([]byte("bar")) + if err == nil { + t.Error("should have failed to write") + } + + s.Close() + done <- struct{}{} + }() + + muxb, err := tb.Dial(context.Background(), l.Multiaddr(), peerA) + checkErr(t, err) + + go func() { + str, err := muxb.AcceptStream() + checkErr(t, err) + str.Reset() + done <- struct{}{} + }() + + <-done + <-done +} + +func SubtestStress1Conn1Stream1Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { + SubtestStress(t, ta, tb, maddr, peerA, Options{ + connNum: 1, + streamNum: 1, + msgNum: 1, + msgMax: 100, + msgMin: 100, + }) +} + +func SubtestStress1Conn1Stream100Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { + SubtestStress(t, ta, tb, maddr, peerA, Options{ + connNum: 1, + streamNum: 1, + msgNum: 100, + msgMax: 100, + msgMin: 100, + }) +} + +func SubtestStress1Conn100Stream100Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { + SubtestStress(t, ta, tb, maddr, peerA, Options{ + connNum: 1, + streamNum: 100, + msgNum: 100, + msgMax: 100, + msgMin: 100, + }) +} + +func SubtestStress50Conn10Stream50Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { + SubtestStress(t, ta, tb, maddr, peerA, Options{ + connNum: 50, + streamNum: 10, + msgNum: 50, + msgMax: 100, + msgMin: 100, + }) +} + +func SubtestStress1Conn1000Stream10Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { + SubtestStress(t, ta, tb, maddr, peerA, Options{ + connNum: 1, + streamNum: 1000, + msgNum: 10, + msgMax: 100, + msgMin: 100, + }) +} + +func SubtestStress1Conn100Stream100Msg10MB(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { + SubtestStress(t, ta, tb, maddr, peerA, Options{ + connNum: 1, + streamNum: 100, + msgNum: 100, + msgMax: 10000, + msgMin: 1000, + }) +} diff --git a/p2p/transport/testsuite/transport_suite.go b/p2p/transport/testsuite/transport_suite.go new file mode 100644 index 0000000000..213cfe936c --- /dev/null +++ b/p2p/transport/testsuite/transport_suite.go @@ -0,0 +1,287 @@ +package ttransport + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "sync" + "testing" + + "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/transport" + + ma "github.com/multiformats/go-multiaddr" +) + +var testData = []byte("this is some test data") + +type streamAndConn struct { + stream mux.MuxedStream + conn transport.CapableConn +} + +func SubtestProtocols(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { + rawIPAddr, _ := ma.NewMultiaddr("/ip4/1.2.3.4") + if ta.CanDial(rawIPAddr) || tb.CanDial(rawIPAddr) { + t.Error("nothing should be able to dial raw IP") + } + + tprotos := make(map[int]bool) + for _, p := range ta.Protocols() { + tprotos[p] = true + } + + if !ta.Proxy() { + protos := maddr.Protocols() + proto := protos[len(protos)-1] + if !tprotos[proto.Code] { + t.Errorf("transport should have reported that it supports protocol '%s' (%d)", proto.Name, proto.Code) + } + } else { + found := false + for _, proto := range maddr.Protocols() { + if tprotos[proto.Code] { + found = true + break + } + } + if !found { + t.Errorf("didn't find any matching proxy protocols in maddr: %s", maddr) + } + } +} + +func SubtestBasic(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + list, err := ta.Listen(maddr) + if err != nil { + t.Fatal(err) + } + defer list.Close() + + var ( + connA, connB transport.CapableConn + done = make(chan struct{}) + ) + defer func() { + <-done + if connA != nil { + connA.Close() + } + if connB != nil { + connB.Close() + } + }() + + go func() { + defer close(done) + var err error + connB, err = list.Accept() + if err != nil { + t.Error(err) + return + } + s, err := connB.AcceptStream() + if err != nil { + t.Error(err) + return + } + + buf, err := ioutil.ReadAll(s) + if err != nil { + t.Error(err) + return + } + + if !bytes.Equal(testData, buf) { + t.Errorf("expected %s, got %s", testData, buf) + } + + n, err := s.Write(testData) + if err != nil { + t.Error(err) + return + } + if n != len(testData) { + t.Error(err) + return + } + + err = s.Close() + if err != nil { + t.Error(err) + } + }() + + if !tb.CanDial(list.Multiaddr()) { + t.Error("CanDial should have returned true") + } + + connA, err = tb.Dial(ctx, list.Multiaddr(), peerA) + if err != nil { + t.Fatal(err) + } + + s, err := connA.OpenStream() + if err != nil { + t.Fatal(err) + } + + n, err := s.Write(testData) + if err != nil { + t.Fatal(err) + return + } + + if n != len(testData) { + t.Fatalf("failed to write enough data (a->b)") + return + } + err = s.Close() + if err != nil { + t.Fatal(err) + return + } + + buf, err := ioutil.ReadAll(s) + if err != nil { + t.Fatal(err) + return + } + if !bytes.Equal(testData, buf) { + t.Errorf("expected %s, got %s", testData, buf) + } +} + +func SubtestPingPong(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { + streams := 100 + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + list, err := ta.Listen(maddr) + if err != nil { + t.Fatal(err) + } + defer list.Close() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + c, err := list.Accept() + if err != nil { + t.Error(err) + return + } + defer c.Close() + + var sWg sync.WaitGroup + for i := 0; i < streams; i++ { + s, err := c.AcceptStream() + if err != nil { + t.Error(err) + return + } + + sWg.Add(1) + go func() { + defer sWg.Done() + + data, err := ioutil.ReadAll(s) + if err != nil { + s.Reset() + t.Error(err) + return + } + if !bytes.HasPrefix(data, testData) { + t.Errorf("expected %q to have prefix %q", string(data), string(testData)) + } + + n, err := s.Write(data) + if err != nil { + s.Reset() + t.Error(err) + return + } + + if n != len(data) { + s.Reset() + t.Error(err) + return + } + s.Close() + }() + } + sWg.Wait() + }() + + if !tb.CanDial(list.Multiaddr()) { + t.Error("CanDial should have returned true") + } + + c, err := tb.Dial(ctx, list.Multiaddr(), peerA) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + for i := 0; i < streams; i++ { + s, err := c.OpenStream() + if err != nil { + t.Error(err) + continue + } + + wg.Add(1) + go func(i int) { + defer wg.Done() + data := []byte(fmt.Sprintf("%s - %d", testData, i)) + n, err := s.Write(data) + if err != nil { + s.Reset() + t.Error(err) + return + } + + if n != len(data) { + s.Reset() + t.Error("failed to write enough data (a->b)") + return + } + s.Close() + + ret, err := ioutil.ReadAll(s) + if err != nil { + s.Reset() + t.Error(err) + return + } + if !bytes.Equal(data, ret) { + t.Errorf("expected %q, got %q", string(data), string(ret)) + } + }(i) + } + wg.Wait() +} + +func SubtestCancel(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { + list, err := ta.Listen(maddr) + if err != nil { + t.Fatal(err) + } + defer list.Close() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + c, err := tb.Dial(ctx, list.Multiaddr(), peerA) + if err == nil { + c.Close() + t.Fatal("dial should have failed") + } +} diff --git a/p2p/transport/testsuite/utils_suite.go b/p2p/transport/testsuite/utils_suite.go new file mode 100644 index 0000000000..1d520ff262 --- /dev/null +++ b/p2p/transport/testsuite/utils_suite.go @@ -0,0 +1,45 @@ +package ttransport + +import ( + "reflect" + "runtime" + "testing" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/transport" + + ma "github.com/multiformats/go-multiaddr" +) + +var Subtests = []func(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID){ + SubtestProtocols, + SubtestBasic, + SubtestCancel, + SubtestPingPong, + + // Stolen from the stream muxer test suite. + SubtestStress1Conn1Stream1Msg, + SubtestStress1Conn1Stream100Msg, + SubtestStress1Conn100Stream100Msg, + SubtestStress50Conn10Stream50Msg, + SubtestStress1Conn1000Stream10Msg, + SubtestStress1Conn100Stream100Msg10MB, + SubtestStreamOpenStress, + SubtestStreamReset, +} + +func getFunctionName(i interface{}) string { + return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() +} + +func SubtestTransport(t *testing.T, ta, tb transport.Transport, addr string, peerA peer.ID) { + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + t.Fatal(err) + } + for _, f := range Subtests { + t.Run(getFunctionName(f), func(t *testing.T) { + f(t, ta, tb, maddr, peerA) + }) + } +} From 2857d54b8012ccc9459baf618235fc5d18f7b7ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 23 May 2019 18:45:35 +0100 Subject: [PATCH 02/13] SubtestPingPong: ensure connections are closed. https://github.com/libp2p/go-libp2p-transport/pull/51 --- p2p/transport/testsuite/transport_suite.go | 23 ++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/p2p/transport/testsuite/transport_suite.go b/p2p/transport/testsuite/transport_suite.go index 213cfe936c..eef75bfbca 100644 --- a/p2p/transport/testsuite/transport_suite.go +++ b/p2p/transport/testsuite/transport_suite.go @@ -169,21 +169,33 @@ func SubtestPingPong(t *testing.T, ta, tb transport.Transport, maddr ma.Multiadd } defer list.Close() + var ( + connA, connB transport.CapableConn + ) + defer func() { + if connA != nil { + connA.Close() + } + if connB != nil { + connB.Close() + } + }() + var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - c, err := list.Accept() + var err error + connA, err = list.Accept() if err != nil { t.Error(err) return } - defer c.Close() var sWg sync.WaitGroup for i := 0; i < streams; i++ { - s, err := c.AcceptStream() + s, err := connA.AcceptStream() if err != nil { t.Error(err) return @@ -225,14 +237,13 @@ func SubtestPingPong(t *testing.T, ta, tb transport.Transport, maddr ma.Multiadd t.Error("CanDial should have returned true") } - c, err := tb.Dial(ctx, list.Multiaddr(), peerA) + connB, err = tb.Dial(ctx, list.Multiaddr(), peerA) if err != nil { t.Fatal(err) } - defer c.Close() for i := 0; i < streams; i++ { - s, err := c.OpenStream() + s, err := connB.OpenStream() if err != nil { t.Error(err) continue From c18bd50d82e835a7e8cf0d9e9b9c242ffe516441 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Tue, 29 Oct 2019 11:38:35 +0000 Subject: [PATCH 03/13] Allow custom SubtestStress. (#7) SubtestStress is exposed but it was not possible to build a working opt parameters. --- p2p/transport/testsuite/stream_suite.go | 92 ++++++++++++------------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/p2p/transport/testsuite/stream_suite.go b/p2p/transport/testsuite/stream_suite.go index 6f978e57be..e04deaeb62 100644 --- a/p2p/transport/testsuite/stream_suite.go +++ b/p2p/transport/testsuite/stream_suite.go @@ -47,11 +47,11 @@ func init() { } type Options struct { - connNum int - streamNum int - msgNum int - msgMin int - msgMax int + ConnNum int + StreamNum int + MsgNum int + MsgMin int + MsgMax int } func fullClose(t *testing.T, s mux.MuxedStream) { @@ -170,12 +170,12 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, } writeStream := func(s mux.MuxedStream, bufs chan<- []byte) { - debugLog(t, "writeStream %p, %d msgNum", s, opt.msgNum) + debugLog(t, "writeStream %p, %d MsgNum", s, opt.MsgNum) - for i := 0; i < opt.msgNum; i++ { + for i := 0; i < opt.MsgNum; i++ { buf := randBuf(msgsize) bufs <- buf - debugLog(t, "%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.msgNum, buf[:3]) + debugLog(t, "%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.MsgNum, buf[:3]) if _, err := s.Write(buf); err != nil { errs <- fmt.Errorf("s.Write(buf): %s", err) continue @@ -184,17 +184,17 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, } readStream := func(s mux.MuxedStream, bufs <-chan []byte) { - debugLog(t, "readStream %p, %d msgNum", s, opt.msgNum) + debugLog(t, "readStream %p, %d MsgNum", s, opt.MsgNum) buf2 := make([]byte, msgsize) i := 0 for buf1 := range bufs { i++ - debugLog(t, "%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3]) + debugLog(t, "%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.MsgNum, buf1[:3]) if _, err := io.ReadFull(s, buf2); err != nil { errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err) - debugLog(t, "%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3]) + debugLog(t, "%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.MsgNum, buf1[:3]) continue } if !bytes.Equal(buf1, buf2) { @@ -204,7 +204,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, } openStreamAndRW := func(c mux.MuxedConn) { - debugLog(t, "openStreamAndRW %p, %d opt.msgNum", c, opt.msgNum) + debugLog(t, "openStreamAndRW %p, %d opt.MsgNum", c, opt.MsgNum) s, err := c.OpenStream() if err != nil { @@ -212,7 +212,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, return } - bufs := make(chan []byte, opt.msgNum) + bufs := make(chan []byte, opt.MsgNum) go func() { writeStream(s, bufs) close(bufs) @@ -248,7 +248,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, }() var wg sync.WaitGroup - for i := 0; i < opt.streamNum; i++ { + for i := 0; i < opt.StreamNum; i++ { wg.Add(1) go rateLimit(func() { defer wg.Done() @@ -260,10 +260,10 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, } openConnsAndRW := func() { - debugLog(t, "openConnsAndRW, %d conns", opt.connNum) + debugLog(t, "openConnsAndRW, %d conns", opt.ConnNum) var wg sync.WaitGroup - for i := 0; i < opt.connNum; i++ { + for i := 0; i < opt.ConnNum; i++ { wg.Add(1) go rateLimit(func() { defer wg.Done() @@ -420,60 +420,60 @@ func SubtestStreamReset(t *testing.T, ta, tb transport.Transport, maddr ma.Multi func SubtestStress1Conn1Stream1Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { SubtestStress(t, ta, tb, maddr, peerA, Options{ - connNum: 1, - streamNum: 1, - msgNum: 1, - msgMax: 100, - msgMin: 100, + ConnNum: 1, + StreamNum: 1, + MsgNum: 1, + MsgMax: 100, + MsgMin: 100, }) } func SubtestStress1Conn1Stream100Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { SubtestStress(t, ta, tb, maddr, peerA, Options{ - connNum: 1, - streamNum: 1, - msgNum: 100, - msgMax: 100, - msgMin: 100, + ConnNum: 1, + StreamNum: 1, + MsgNum: 100, + MsgMax: 100, + MsgMin: 100, }) } func SubtestStress1Conn100Stream100Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { SubtestStress(t, ta, tb, maddr, peerA, Options{ - connNum: 1, - streamNum: 100, - msgNum: 100, - msgMax: 100, - msgMin: 100, + ConnNum: 1, + StreamNum: 100, + MsgNum: 100, + MsgMax: 100, + MsgMin: 100, }) } func SubtestStress50Conn10Stream50Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { SubtestStress(t, ta, tb, maddr, peerA, Options{ - connNum: 50, - streamNum: 10, - msgNum: 50, - msgMax: 100, - msgMin: 100, + ConnNum: 50, + StreamNum: 10, + MsgNum: 50, + MsgMax: 100, + MsgMin: 100, }) } func SubtestStress1Conn1000Stream10Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { SubtestStress(t, ta, tb, maddr, peerA, Options{ - connNum: 1, - streamNum: 1000, - msgNum: 10, - msgMax: 100, - msgMin: 100, + ConnNum: 1, + StreamNum: 1000, + MsgNum: 10, + MsgMax: 100, + MsgMin: 100, }) } func SubtestStress1Conn100Stream100Msg10MB(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { SubtestStress(t, ta, tb, maddr, peerA, Options{ - connNum: 1, - streamNum: 100, - msgNum: 100, - msgMax: 10000, - msgMin: 1000, + ConnNum: 1, + StreamNum: 100, + MsgNum: 100, + MsgMax: 10000, + MsgMin: 1000, }) } From 59f8133627fca5df680d003763bab598a600ab17 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 17 Dec 2020 16:42:55 +0700 Subject: [PATCH 04/13] pass contexts to OpenStream in tests --- p2p/transport/testsuite/stream_suite.go | 6 +++--- p2p/transport/testsuite/transport_suite.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/p2p/transport/testsuite/stream_suite.go b/p2p/transport/testsuite/stream_suite.go index e04deaeb62..f7172b4636 100644 --- a/p2p/transport/testsuite/stream_suite.go +++ b/p2p/transport/testsuite/stream_suite.go @@ -206,7 +206,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, openStreamAndRW := func(c mux.MuxedConn) { debugLog(t, "openStreamAndRW %p, %d opt.MsgNum", c, opt.MsgNum) - s, err := c.OpenStream() + s, err := c.OpenStream(context.Background()) if err != nil { errs <- fmt.Errorf("Failed to create NewStream: %s", err) return @@ -329,7 +329,7 @@ func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma. go func() { defer wg.Done() for i := 0; i < count; i++ { - s, err := connA.OpenStream() + s, err := connA.OpenStream(context.Background()) if err != nil { t.Error(err) return @@ -384,7 +384,7 @@ func SubtestStreamReset(t *testing.T, ta, tb transport.Transport, maddr ma.Multi muxa, err := l.Accept() checkErr(t, err) - s, err := muxa.OpenStream() + s, err := muxa.OpenStream(context.Background()) if err != nil { panic(err) } diff --git a/p2p/transport/testsuite/transport_suite.go b/p2p/transport/testsuite/transport_suite.go index eef75bfbca..234c43c495 100644 --- a/p2p/transport/testsuite/transport_suite.go +++ b/p2p/transport/testsuite/transport_suite.go @@ -126,7 +126,7 @@ func SubtestBasic(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, t.Fatal(err) } - s, err := connA.OpenStream() + s, err := connA.OpenStream(context.Background()) if err != nil { t.Fatal(err) } @@ -243,7 +243,7 @@ func SubtestPingPong(t *testing.T, ta, tb transport.Transport, maddr ma.Multiadd } for i := 0; i < streams; i++ { - s, err := connB.OpenStream() + s, err := connB.OpenStream(context.Background()) if err != nil { t.Error(err) continue From d1d34d880272ce16d9fc5758ff4d90632111de9e Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Sat, 1 May 2021 16:52:33 +0200 Subject: [PATCH 05/13] Defer closing stream for reading --- p2p/transport/testsuite/stream_suite.go | 5 ++++- p2p/transport/testsuite/transport_suite.go | 19 ++++++++++++++++--- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/p2p/transport/testsuite/stream_suite.go b/p2p/transport/testsuite/stream_suite.go index f7172b4636..b1db0bc640 100644 --- a/p2p/transport/testsuite/stream_suite.go +++ b/p2p/transport/testsuite/stream_suite.go @@ -55,7 +55,7 @@ type Options struct { } func fullClose(t *testing.T, s mux.MuxedStream) { - if err := s.Close(); err != nil { + if err := s.CloseWrite(); err != nil { t.Error(err) s.Reset() return @@ -67,6 +67,9 @@ func fullClose(t *testing.T, s mux.MuxedStream) { if len(b) != 0 { t.Error("expected to be done reading") } + if err := s.Close(); err != nil { + t.Error(err) + } } func randBuf(size int) []byte { diff --git a/p2p/transport/testsuite/transport_suite.go b/p2p/transport/testsuite/transport_suite.go index 234c43c495..0b57f00a0a 100644 --- a/p2p/transport/testsuite/transport_suite.go +++ b/p2p/transport/testsuite/transport_suite.go @@ -141,8 +141,8 @@ func SubtestBasic(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, t.Fatalf("failed to write enough data (a->b)") return } - err = s.Close() - if err != nil { + + if err = s.CloseWrite(); err != nil { t.Fatal(err) return } @@ -155,6 +155,11 @@ func SubtestBasic(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, if !bytes.Equal(testData, buf) { t.Errorf("expected %s, got %s", testData, buf) } + + if err = s.Close(); err != nil { + t.Fatal(err) + return + } } func SubtestPingPong(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { @@ -265,7 +270,10 @@ func SubtestPingPong(t *testing.T, ta, tb transport.Transport, maddr ma.Multiadd t.Error("failed to write enough data (a->b)") return } - s.Close() + if err = s.CloseWrite(); err != nil { + t.Fatal(err) + return + } ret, err := ioutil.ReadAll(s) if err != nil { @@ -276,6 +284,11 @@ func SubtestPingPong(t *testing.T, ta, tb transport.Transport, maddr ma.Multiadd if !bytes.Equal(data, ret) { t.Errorf("expected %q, got %q", string(data), string(ret)) } + + if err = s.Close(); err != nil { + t.Fatal(err) + return + } }(i) } wg.Wait() From 6ebf948a1aa826ff67bc51a642c14522b256d03c Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Mon, 19 Jul 2021 15:33:38 +0100 Subject: [PATCH 06/13] Address `go vet` and `saticcheck` issues Run `go mod tidy` Resolve staticcheck issues: - SA2002 `t.Fatal` must be called in the same goroutine as the test - U1000 unused struct - ST1005 error string should not be capitalized The `go vet` issues were the same as the ones reported by `saticcheck`. Relates to: - https://github.com/orgs/ipfs/projects/12#card-58209321 --- p2p/transport/testsuite/stream_suite.go | 4 ++-- p2p/transport/testsuite/transport_suite.go | 10 ++-------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/p2p/transport/testsuite/stream_suite.go b/p2p/transport/testsuite/stream_suite.go index b1db0bc640..00bca4f721 100644 --- a/p2p/transport/testsuite/stream_suite.go +++ b/p2p/transport/testsuite/stream_suite.go @@ -158,7 +158,7 @@ func goServe(t *testing.T, l transport.Listener) (done func()) { func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID, opt Options) { msgsize := 1 << 11 - errs := make(chan error, 0) // dont block anything. + errs := make(chan error) // dont block anything. rateLimitN := 5000 // max of 5k funcs, because -race has 8k max. rateLimitChan := make(chan struct{}, rateLimitN) @@ -211,7 +211,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, s, err := c.OpenStream(context.Background()) if err != nil { - errs <- fmt.Errorf("Failed to create NewStream: %s", err) + errs <- fmt.Errorf("failed to create NewStream: %s", err) return } diff --git a/p2p/transport/testsuite/transport_suite.go b/p2p/transport/testsuite/transport_suite.go index 0b57f00a0a..6e6b300543 100644 --- a/p2p/transport/testsuite/transport_suite.go +++ b/p2p/transport/testsuite/transport_suite.go @@ -8,7 +8,6 @@ import ( "sync" "testing" - "github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/transport" @@ -17,11 +16,6 @@ import ( var testData = []byte("this is some test data") -type streamAndConn struct { - stream mux.MuxedStream - conn transport.CapableConn -} - func SubtestProtocols(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { rawIPAddr, _ := ma.NewMultiaddr("/ip4/1.2.3.4") if ta.CanDial(rawIPAddr) || tb.CanDial(rawIPAddr) { @@ -271,7 +265,7 @@ func SubtestPingPong(t *testing.T, ta, tb transport.Transport, maddr ma.Multiadd return } if err = s.CloseWrite(); err != nil { - t.Fatal(err) + t.Error(err) return } @@ -286,7 +280,7 @@ func SubtestPingPong(t *testing.T, ta, tb transport.Transport, maddr ma.Multiadd } if err = s.Close(); err != nil { - t.Fatal(err) + t.Error(err) return } }(i) From 75a6c48ffca1e7ed6826b98f8beb5e137a3a2902 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 22 Jul 2021 17:59:03 -0700 Subject: [PATCH 07/13] fix: cleanup transport suite --- p2p/transport/testsuite/stream_suite.go | 206 ++++++++++++------------ 1 file changed, 107 insertions(+), 99 deletions(-) diff --git a/p2p/transport/testsuite/stream_suite.go b/p2p/transport/testsuite/stream_suite.go index 00bca4f721..e543999268 100644 --- a/p2p/transport/testsuite/stream_suite.go +++ b/p2p/transport/testsuite/stream_suite.go @@ -7,7 +7,6 @@ import ( "io" "io/ioutil" "os" - "runtime/debug" "strconv" "sync" "testing" @@ -82,15 +81,6 @@ func randBuf(size int) []byte { return randomness[start : start+size] } -func checkErr(t *testing.T, err error) { - t.Helper() - if err != nil { - debug.PrintStack() - // TODO: not safe to call in parallel - t.Fatal(err) - } -} - func debugLog(t *testing.T, s string, args ...interface{}) { if VerboseDebugging { t.Logf(s, args...) @@ -98,7 +88,6 @@ func debugLog(t *testing.T, s string, args ...interface{}) { } func echoStream(t *testing.T, s mux.MuxedStream) { - defer s.Close() // echo everything var err error if VerboseDebugging { @@ -123,42 +112,45 @@ func (lw *logWriter) Write(buf []byte) (int, error) { return lw.W.Write(buf) } -func goServe(t *testing.T, l transport.Listener) (done func()) { - closed := make(chan struct{}, 1) +func echo(t *testing.T, c transport.CapableConn) { + var wg sync.WaitGroup + defer wg.Wait() + for { + str, err := c.AcceptStream() + if err != nil { + break + } + wg.Add(1) + go func() { + defer wg.Done() + defer str.Close() + echoStream(t, str) + }() + } +} - go func() { - for { - c, err := l.Accept() - if err != nil { - select { - case <-closed: - return // closed naturally. - default: - checkErr(t, err) - } - } +func serve(t *testing.T, l transport.Listener) { + var wg sync.WaitGroup + defer wg.Wait() - debugLog(t, "accepted connection") - go func() { - for { - str, err := c.AcceptStream() - if err != nil { - break - } - go echoStream(t, str) - } - }() + for { + c, err := l.Accept() + if err != nil { + return } - }() - return func() { - closed <- struct{}{} + wg.Add(1) + debugLog(t, "accepted connection") + go func() { + defer wg.Done() + defer c.Close() + echo(t, c) + }() } } func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID, opt Options) { msgsize := 1 << 11 - errs := make(chan error) // dont block anything. rateLimitN := 5000 // max of 5k funcs, because -race has 8k max. rateLimitChan := make(chan struct{}, rateLimitN) @@ -180,7 +172,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, bufs <- buf debugLog(t, "%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.MsgNum, buf[:3]) if _, err := s.Write(buf); err != nil { - errs <- fmt.Errorf("s.Write(buf): %s", err) + t.Errorf("s.Write(buf): %s", err) continue } } @@ -196,12 +188,12 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, debugLog(t, "%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.MsgNum, buf1[:3]) if _, err := io.ReadFull(s, buf2); err != nil { - errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err) + t.Errorf("io.ReadFull(s, buf2): %s", err) debugLog(t, "%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.MsgNum, buf1[:3]) continue } if !bytes.Equal(buf1, buf2) { - errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]) + t.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]) } } } @@ -211,7 +203,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, s, err := c.OpenStream(context.Background()) if err != nil { - errs <- fmt.Errorf("failed to create NewStream: %s", err) + t.Errorf("failed to create NewStream: %s", err) return } @@ -228,68 +220,67 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, openConnAndRW := func() { debugLog(t, "openConnAndRW") + var wg sync.WaitGroup + defer wg.Wait() + l, err := ta.Listen(maddr) - checkErr(t, err) + if err != nil { + t.Error(err) + return + } + defer l.Close() - done := goServe(t, l) - defer done() + wg.Add(1) + go func() { + defer wg.Done() + serve(t, l) + }() c, err := tb.Dial(context.Background(), l.Multiaddr(), peerA) - checkErr(t, err) + if err != nil { + t.Error(err) + return + } // serve the outgoing conn, because some muxers assume // that we _always_ call serve. (this is an error?) + wg.Add(1) go func() { + defer wg.Done() + defer c.Close() debugLog(t, "serving connection") - for { - str, err := c.AcceptStream() - if err != nil { - break - } - go echoStream(t, str) - } + echo(t, c) }() - var wg sync.WaitGroup + var openWg sync.WaitGroup for i := 0; i < opt.StreamNum; i++ { - wg.Add(1) + openWg.Add(1) go rateLimit(func() { - defer wg.Done() + defer openWg.Done() openStreamAndRW(c) }) } - wg.Wait() - c.Close() - } - - openConnsAndRW := func() { - debugLog(t, "openConnsAndRW, %d conns", opt.ConnNum) - - var wg sync.WaitGroup - for i := 0; i < opt.ConnNum; i++ { - wg.Add(1) - go rateLimit(func() { - defer wg.Done() - openConnAndRW() - }) - } - wg.Wait() + openWg.Wait() } - go func() { - openConnsAndRW() - close(errs) // done - }() + debugLog(t, "openConnsAndRW, %d conns", opt.ConnNum) - for err := range errs { - t.Error(err) + var wg sync.WaitGroup + defer wg.Wait() + for i := 0; i < opt.ConnNum; i++ { + wg.Add(1) + go rateLimit(func() { + defer wg.Done() + openConnAndRW() + }) } - } func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { l, err := ta.Listen(maddr) - checkErr(t, err) + if err != nil { + t.Fatal(err) + } defer l.Close() count := 10000 @@ -311,8 +302,13 @@ func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma. accepted <- err }() connB, err = tb.Dial(context.Background(), l.Multiaddr(), peerA) - checkErr(t, err) - checkErr(t, <-accepted) + if err != nil { + t.Fatal(err) + } + err = <-accepted + if err != nil { + t.Fatal(err) + } defer func() { if connA != nil { @@ -379,22 +375,36 @@ func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma. } func SubtestStreamReset(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { + var wg sync.WaitGroup + defer wg.Wait() + l, err := ta.Listen(maddr) - checkErr(t, err) + if err != nil { + t.Fatal(err) + } + defer l.Close() - done := make(chan struct{}, 2) + wg.Add(1) go func() { + defer wg.Done() + muxa, err := l.Accept() - checkErr(t, err) + if err != nil { + t.Error(err) + return + } + defer muxa.Close() s, err := muxa.OpenStream(context.Background()) if err != nil { - panic(err) + t.Error(err) + return } + defer s.Close() // Some transports won't open the stream until we write. That's // fine. - s.Write([]byte("foo")) + _, _ = s.Write([]byte("foo")) time.Sleep(time.Millisecond * 50) @@ -403,22 +413,20 @@ func SubtestStreamReset(t *testing.T, ta, tb transport.Transport, maddr ma.Multi t.Error("should have failed to write") } - s.Close() - done <- struct{}{} }() muxb, err := tb.Dial(context.Background(), l.Multiaddr(), peerA) - checkErr(t, err) - - go func() { - str, err := muxb.AcceptStream() - checkErr(t, err) - str.Reset() - done <- struct{}{} - }() + if err != nil { + t.Fatal(err) + } + defer muxb.Close() - <-done - <-done + str, err := muxb.AcceptStream() + if err != nil { + t.Error(err) + return + } + str.Reset() } func SubtestStress1Conn1Stream1Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { From ea4a94069b0fb92eacbf43d62a5833fd6dbca1ec Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 24 Jul 2021 22:15:17 +0200 Subject: [PATCH 08/13] fix deadlock in the transport's serve function We don't close the connection before the echo hasn't returned, but echo won't return before AcceptStream has returned an error, which only happens when the connection is closed. --- p2p/transport/testsuite/stream_suite.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/transport/testsuite/stream_suite.go b/p2p/transport/testsuite/stream_suite.go index e543999268..6d95ac722d 100644 --- a/p2p/transport/testsuite/stream_suite.go +++ b/p2p/transport/testsuite/stream_suite.go @@ -138,12 +138,12 @@ func serve(t *testing.T, l transport.Listener) { if err != nil { return } + defer c.Close() wg.Add(1) debugLog(t, "accepted connection") go func() { defer wg.Done() - defer c.Close() echo(t, c) }() } From f3dcaf7506ef81a69e26b9f09f813c5b22c769df Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 25 Jul 2021 10:00:08 +0200 Subject: [PATCH 09/13] fix closing of dialed connections --- p2p/transport/testsuite/stream_suite.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/transport/testsuite/stream_suite.go b/p2p/transport/testsuite/stream_suite.go index 6d95ac722d..f02824d4cf 100644 --- a/p2p/transport/testsuite/stream_suite.go +++ b/p2p/transport/testsuite/stream_suite.go @@ -241,13 +241,13 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, t.Error(err) return } + defer c.Close() // serve the outgoing conn, because some muxers assume // that we _always_ call serve. (this is an error?) wg.Add(1) go func() { defer wg.Done() - defer c.Close() debugLog(t, "serving connection") echo(t, c) }() From 3bd94ff7cbab97e09b9499f2927de12312e1bbea Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 7 Jan 2022 10:55:40 +0400 Subject: [PATCH 10/13] run go generate on CI --- p2p/transport/testsuite/stream_suite.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/p2p/transport/testsuite/stream_suite.go b/p2p/transport/testsuite/stream_suite.go index f02824d4cf..a2a47063a2 100644 --- a/p2p/transport/testsuite/stream_suite.go +++ b/p2p/transport/testsuite/stream_suite.go @@ -15,7 +15,7 @@ import ( crand "crypto/rand" mrand "math/rand" - "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/transport" "github.com/libp2p/go-libp2p-testing/race" @@ -53,7 +53,7 @@ type Options struct { MsgMax int } -func fullClose(t *testing.T, s mux.MuxedStream) { +func fullClose(t *testing.T, s network.MuxedStream) { if err := s.CloseWrite(); err != nil { t.Error(err) s.Reset() @@ -87,7 +87,7 @@ func debugLog(t *testing.T, s string, args ...interface{}) { } } -func echoStream(t *testing.T, s mux.MuxedStream) { +func echoStream(t *testing.T, s network.MuxedStream) { // echo everything var err error if VerboseDebugging { @@ -164,7 +164,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, rateLimitChan <- struct{}{} } - writeStream := func(s mux.MuxedStream, bufs chan<- []byte) { + writeStream := func(s network.MuxedStream, bufs chan<- []byte) { debugLog(t, "writeStream %p, %d MsgNum", s, opt.MsgNum) for i := 0; i < opt.MsgNum; i++ { @@ -178,7 +178,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, } } - readStream := func(s mux.MuxedStream, bufs <-chan []byte) { + readStream := func(s network.MuxedStream, bufs <-chan []byte) { debugLog(t, "readStream %p, %d MsgNum", s, opt.MsgNum) buf2 := make([]byte, msgsize) @@ -198,7 +198,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, } } - openStreamAndRW := func(c mux.MuxedConn) { + openStreamAndRW := func(c network.MuxedConn) { debugLog(t, "openStreamAndRW %p, %d opt.MsgNum", c, opt.MsgNum) s, err := c.OpenStream(context.Background()) From 4a572e7eb44096cb0e4aa4338cdaa39e1696911e Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 25 Apr 2022 15:48:56 +0100 Subject: [PATCH 11/13] remove debug logging from stream suite --- p2p/transport/testsuite/stream_suite.go | 45 +------------------------ 1 file changed, 1 insertion(+), 44 deletions(-) diff --git a/p2p/transport/testsuite/stream_suite.go b/p2p/transport/testsuite/stream_suite.go index a2a47063a2..8f41a5408d 100644 --- a/p2p/transport/testsuite/stream_suite.go +++ b/p2p/transport/testsuite/stream_suite.go @@ -23,10 +23,6 @@ import ( ma "github.com/multiformats/go-multiaddr" ) -// VerboseDebugging can be set to true to enable verbose debug logging in the -// stream stress tests. -var VerboseDebugging = false - var randomness []byte var StressTestTimeout = 1 * time.Minute @@ -81,37 +77,13 @@ func randBuf(size int) []byte { return randomness[start : start+size] } -func debugLog(t *testing.T, s string, args ...interface{}) { - if VerboseDebugging { - t.Logf(s, args...) - } -} - func echoStream(t *testing.T, s network.MuxedStream) { // echo everything - var err error - if VerboseDebugging { - t.Logf("accepted stream") - _, err = io.Copy(&logWriter{t, s}, s) - t.Log("closing stream") - } else { - _, err = io.Copy(s, s) // echo everything - } - if err != nil { + if _, err := io.Copy(s, s); err != nil { t.Error(err) } } -type logWriter struct { - t *testing.T - W io.Writer -} - -func (lw *logWriter) Write(buf []byte) (int, error) { - lw.t.Logf("logwriter: writing %d bytes", len(buf)) - return lw.W.Write(buf) -} - func echo(t *testing.T, c transport.CapableConn) { var wg sync.WaitGroup defer wg.Wait() @@ -141,7 +113,6 @@ func serve(t *testing.T, l transport.Listener) { defer c.Close() wg.Add(1) - debugLog(t, "accepted connection") go func() { defer wg.Done() echo(t, c) @@ -165,12 +136,9 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, } writeStream := func(s network.MuxedStream, bufs chan<- []byte) { - debugLog(t, "writeStream %p, %d MsgNum", s, opt.MsgNum) - for i := 0; i < opt.MsgNum; i++ { buf := randBuf(msgsize) bufs <- buf - debugLog(t, "%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.MsgNum, buf[:3]) if _, err := s.Write(buf); err != nil { t.Errorf("s.Write(buf): %s", err) continue @@ -179,17 +147,13 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, } readStream := func(s network.MuxedStream, bufs <-chan []byte) { - debugLog(t, "readStream %p, %d MsgNum", s, opt.MsgNum) - buf2 := make([]byte, msgsize) i := 0 for buf1 := range bufs { i++ - debugLog(t, "%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.MsgNum, buf1[:3]) if _, err := io.ReadFull(s, buf2); err != nil { t.Errorf("io.ReadFull(s, buf2): %s", err) - debugLog(t, "%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.MsgNum, buf1[:3]) continue } if !bytes.Equal(buf1, buf2) { @@ -199,8 +163,6 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, } openStreamAndRW := func(c network.MuxedConn) { - debugLog(t, "openStreamAndRW %p, %d opt.MsgNum", c, opt.MsgNum) - s, err := c.OpenStream(context.Background()) if err != nil { t.Errorf("failed to create NewStream: %s", err) @@ -218,8 +180,6 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, } openConnAndRW := func() { - debugLog(t, "openConnAndRW") - var wg sync.WaitGroup defer wg.Wait() @@ -248,7 +208,6 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, wg.Add(1) go func() { defer wg.Done() - debugLog(t, "serving connection") echo(t, c) }() @@ -263,8 +222,6 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, openWg.Wait() } - debugLog(t, "openConnsAndRW, %d conns", opt.ConnNum) - var wg sync.WaitGroup defer wg.Wait() for i := 0; i < opt.ConnNum; i++ { From c130e6e96c931da23fa28b2e1dd5a5002529be69 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 26 Apr 2022 14:24:18 +0100 Subject: [PATCH 12/13] don't continue on read / write error in stream suite (#59) --- p2p/transport/testsuite/stream_suite.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/p2p/transport/testsuite/stream_suite.go b/p2p/transport/testsuite/stream_suite.go index 8f41a5408d..e7770bfa71 100644 --- a/p2p/transport/testsuite/stream_suite.go +++ b/p2p/transport/testsuite/stream_suite.go @@ -141,7 +141,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, bufs <- buf if _, err := s.Write(buf); err != nil { t.Errorf("s.Write(buf): %s", err) - continue + return } } } @@ -154,10 +154,11 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, if _, err := io.ReadFull(s, buf2); err != nil { t.Errorf("io.ReadFull(s, buf2): %s", err) - continue + return } if !bytes.Equal(buf1, buf2) { t.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]) + return } } } From 64cfca6bebe65672dc79b93725fd2c2e1cb66d50 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 18 May 2022 13:16:48 +0200 Subject: [PATCH 13/13] switch from github.com/libp2p/go-libp2p-testing/suites/transport to p2p/transport/testsuite --- p2p/transport/tcp/tcp_test.go | 2 +- p2p/transport/websocket/websocket_test.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/p2p/transport/tcp/tcp_test.go b/p2p/transport/tcp/tcp_test.go index 06ea31b2e9..c0f02e2abc 100644 --- a/p2p/transport/tcp/tcp_test.go +++ b/p2p/transport/tcp/tcp_test.go @@ -8,6 +8,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/muxer/yamux" csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream" tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader" + ttransport "github.com/libp2p/go-libp2p/p2p/transport/testsuite" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/network" @@ -17,7 +18,6 @@ import ( "github.com/libp2p/go-libp2p-core/transport" mocknetwork "github.com/libp2p/go-libp2p-testing/mocks/network" - ttransport "github.com/libp2p/go-libp2p-testing/suites/transport" ma "github.com/multiformats/go-multiaddr" diff --git a/p2p/transport/websocket/websocket_test.go b/p2p/transport/websocket/websocket_test.go index b83f528f84..e7c7aa0f44 100644 --- a/p2p/transport/websocket/websocket_test.go +++ b/p2p/transport/websocket/websocket_test.go @@ -17,6 +17,7 @@ import ( csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream" tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader" + ttransport "github.com/libp2p/go-libp2p/p2p/transport/testsuite" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/network" @@ -27,8 +28,6 @@ import ( "github.com/libp2p/go-libp2p-core/transport" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" - ttransport "github.com/libp2p/go-libp2p-testing/suites/transport" - ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" )