From 757fb56c538a8f6f8ea7246487467217fa455e17 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 19 Mar 2021 12:23:44 -0700 Subject: [PATCH 01/30] fix: prefer non-transient connections Given two relay connections, prefer the one that's non-transient. Otherwise, a transient connection could prevent us from opening streams even if we have a second non-transient connection through a better relay. --- p2p/net/swarm/swarm.go | 47 +++++++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 25f4590178..7a241b4b69 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -385,6 +385,38 @@ func (s *Swarm) ConnsToPeer(p peer.ID) []network.Conn { return output } +func isBetterConn(a, b *Conn) bool { + // If one is transient and not the other, prefer the non-transient connection. + aTransient := a.Stat().Transient + bTransient := b.Stat().Transient + if aTransient != bTransient { + return !aTransient + } + + // If one is direct and not the other, prefer the direct connection. + aDirect := isDirectConn(a) + bDirect := isDirectConn(b) + if aDirect != bDirect { + return aDirect + } + + // Otherwise, prefer the connection with more open streams. + a.streams.Lock() + aLen := len(a.streams.m) + a.streams.Unlock() + + b.streams.Lock() + bLen := len(b.streams.m) + b.streams.Unlock() + + if aLen != bLen { + return aLen > bLen + } + + // finally, pick the last connection. + return true +} + // bestConnToPeer returns the best connection to peer. func (s *Swarm) bestConnToPeer(p peer.ID) *Conn { @@ -395,26 +427,13 @@ func (s *Swarm) bestConnToPeer(p peer.ID) *Conn { defer s.conns.RUnlock() var best *Conn - bestLen := 0 for _, c := range s.conns.m[p] { if c.conn.IsClosed() { // We *will* garbage collect this soon anyways. continue } - c.streams.Lock() - cLen := len(c.streams.m) - c.streams.Unlock() - - // We will never prefer a Relayed connection over a direct connection. - if isDirectConn(best) && !isDirectConn(c) { - continue - } - - // 1. Always prefer a direct connection over a relayed connection. - // 2. If both conns are direct or relayed, pick the one with as many or more streams. - if (!isDirectConn(best) && isDirectConn(c)) || (cLen >= bestLen) { + if best == nil || isBetterConn(c, best) { best = c - bestLen = cLen } } return best From 1e011b1e8460a329a28e2c695942439877d64387 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 19 Mar 2021 12:27:38 -0700 Subject: [PATCH 02/30] fix: DRY direct connect logic --- p2p/net/swarm/swarm.go | 11 +++++++++++ p2p/net/swarm/swarm_dial.go | 29 +++++++---------------------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 7a241b4b69..3b3cf83228 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -439,6 +439,17 @@ func (s *Swarm) bestConnToPeer(p peer.ID) *Conn { return best } +func (s *Swarm) bestAcceptableConnToPeer(ctx context.Context, p peer.ID) *Conn { + conn := s.bestConnToPeer(p) + if conn != nil { + forceDirect, _ := network.GetForceDirectDial(ctx) + if !forceDirect || isDirectConn(conn) { + return conn + } + } + return nil +} + func isDirectConn(c *Conn) bool { return c != nil && !c.conn.Transport().Proxy() } diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 052da67fe5..ccf33c2e4c 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -251,14 +251,9 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done() - conn := s.bestConnToPeer(p) - forceDirect, _ := network.GetForceDirectDial(ctx) - if forceDirect { - if isDirectConn(conn) { - return conn, nil - } - } else if conn != nil { - // check if we already have an open connection first + // check if we already have an open (usable) connection first + conn := s.bestAcceptableConnToPeer(ctx, p) + if conn != nil { return conn, nil } @@ -292,13 +287,8 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { // Short circuit. // By the time we take the dial lock, we may already *have* a connection // to the peer. - forceDirect, _ := network.GetForceDirectDial(ctx) - c := s.bestConnToPeer(p) - if forceDirect { - if isDirectConn(c) { - return c, nil - } - } else if c != nil { + c := s.bestAcceptableConnToPeer(ctx, p) + if c != nil { return c, nil } @@ -310,13 +300,8 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { conn, err := s.dial(ctx, p) if err != nil { - conn = s.bestConnToPeer(p) - if forceDirect { - if isDirectConn(conn) { - log.Debugf("ignoring dial error because we already have a direct connection: %s", err) - return conn, nil - } - } else if conn != nil { + conn := s.bestAcceptableConnToPeer(ctx, p) + if conn != nil { // Hm? What error? // Could have canceled the dial because we received a // connection or some other random reason. From 1b4be471ed9ff90b5a2e0efa29638c1bd475c1c9 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 19 Mar 2021 12:36:04 -0700 Subject: [PATCH 03/30] fix: do not use the request context when dialing Otherwise, canceling one dial request will cancel all "joined" dial requests. --- p2p/net/swarm/dial_sync.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/p2p/net/swarm/dial_sync.go b/p2p/net/swarm/dial_sync.go index edb6c89821..50f3a69821 100644 --- a/p2p/net/swarm/dial_sync.go +++ b/p2p/net/swarm/dial_sync.go @@ -96,13 +96,17 @@ func (ad *activeDial) start(ctx context.Context) { ad.cancel() } -func (ds *DialSync) getActiveDial(ctx context.Context, p peer.ID) *activeDial { +func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { ds.dialsLk.Lock() defer ds.dialsLk.Unlock() actd, ok := ds.dials[p] if !ok { - adctx, cancel := context.WithCancel(ctx) + // This code intentionally uses the background context. Otherwise, if the first call + // to Dial is canceled, subsequent dial calls will also be canceled. + // XXX: this also breaks direct connection logic. We will need to pipe the + // information through some other way. + adctx, cancel := context.WithCancel(context.Background()) actd = &activeDial{ id: p, cancel: cancel, @@ -123,7 +127,7 @@ func (ds *DialSync) getActiveDial(ctx context.Context, p peer.ID) *activeDial { // DialLock initiates a dial to the given peer if there are none in progress // then waits for the dial to that peer to complete. func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) { - return ds.getActiveDial(ctx, p).wait(ctx) + return ds.getActiveDial(p).wait(ctx) } // CancelDial cancels all in-progress dials to the given peer. From 28671c3153f1d18edc4b23aad780540f88a83dc8 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 30 Mar 2021 17:14:50 +0300 Subject: [PATCH 04/30] implement dial worker for synchronizing simultaneous dials --- p2p/net/swarm/dial_sync.go | 114 ++++----- p2p/net/swarm/swarm.go | 8 +- p2p/net/swarm/swarm_dial.go | 493 ++++++++++++++++++++++-------------- 3 files changed, 349 insertions(+), 266 deletions(-) diff --git a/p2p/net/swarm/dial_sync.go b/p2p/net/swarm/dial_sync.go index 50f3a69821..2efdf067b6 100644 --- a/p2p/net/swarm/dial_sync.go +++ b/p2p/net/swarm/dial_sync.go @@ -5,6 +5,7 @@ import ( "errors" "sync" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" ) @@ -12,88 +13,74 @@ import ( var errDialCanceled = errors.New("dial was aborted internally, likely due to https://git.io/Je2wW") // DialFunc is the type of function expected by DialSync. -type DialFunc func(context.Context, peer.ID) (*Conn, error) +type DialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) // NewDialSync constructs a new DialSync -func NewDialSync(dfn DialFunc) *DialSync { +func NewDialSync(worker DialWorkerFunc) *DialSync { return &DialSync{ - dials: make(map[peer.ID]*activeDial), - dialFunc: dfn, + dials: make(map[peer.ID]*activeDial), + dialWorker: worker, } } // DialSync is a dial synchronization helper that ensures that at most one dial // to any given peer is active at any given time. type DialSync struct { - dials map[peer.ID]*activeDial - dialsLk sync.Mutex - dialFunc DialFunc + dials map[peer.ID]*activeDial + dialsLk sync.Mutex + dialWorker DialWorkerFunc } type activeDial struct { - id peer.ID - refCnt int - refCntLk sync.Mutex - cancel func() + id peer.ID + refCnt int - err error - conn *Conn - waitch chan struct{} + ctx context.Context + cancel func() - ds *DialSync -} + reqch chan dialRequest -func (ad *activeDial) wait(ctx context.Context) (*Conn, error) { - defer ad.decref() - select { - case <-ad.waitch: - return ad.conn, ad.err - case <-ctx.Done(): - return nil, ctx.Err() - } + ds *DialSync } func (ad *activeDial) incref() { - ad.refCntLk.Lock() - defer ad.refCntLk.Unlock() ad.refCnt++ } func (ad *activeDial) decref() { - ad.refCntLk.Lock() + ad.ds.dialsLk.Lock() ad.refCnt-- - maybeZero := (ad.refCnt <= 0) - ad.refCntLk.Unlock() - - // make sure to always take locks in correct order. - if maybeZero { - ad.ds.dialsLk.Lock() - ad.refCntLk.Lock() - // check again after lock swap drop to make sure nobody else called incref - // in between locks - if ad.refCnt <= 0 { - ad.cancel() - delete(ad.ds.dials, ad.id) - } - ad.refCntLk.Unlock() - ad.ds.dialsLk.Unlock() + if ad.refCnt == 0 { + ad.cancel() + close(ad.reqch) + delete(ad.ds.dials, ad.id) } + ad.ds.dialsLk.Unlock() } -func (ad *activeDial) start(ctx context.Context) { - ad.conn, ad.err = ad.ds.dialFunc(ctx, ad.id) - - // This isn't the user's context so we should fix the error. - switch ad.err { - case context.Canceled: - // The dial was canceled with `CancelDial`. - ad.err = errDialCanceled - case context.DeadlineExceeded: - // We hit an internal timeout, not a context timeout. - ad.err = ErrDialTimeout +func (ad *activeDial) dial(ctx context.Context, p peer.ID) (*Conn, error) { + dialCtx := ad.ctx + + if forceDirect, reason := network.GetForceDirectDial(ctx); forceDirect { + dialCtx = network.WithForceDirectDial(dialCtx, reason) + } + if simConnect, reason := network.GetSimultaneousConnect(ctx); simConnect { + dialCtx = network.WithSimultaneousConnect(dialCtx, reason) + } + + resch := make(chan dialResponse, 1) + select { + case ad.reqch <- dialRequest{ctx: dialCtx, resch: resch}: + case <-ctx.Done(): + return nil, ctx.Err() + } + + select { + case res := <-resch: + return res.conn, res.err + case <-ctx.Done(): + return nil, ctx.Err() } - close(ad.waitch) - ad.cancel() } func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { @@ -109,13 +96,14 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { adctx, cancel := context.WithCancel(context.Background()) actd = &activeDial{ id: p, + ctx: adctx, cancel: cancel, - waitch: make(chan struct{}), + reqch: make(chan dialRequest), ds: ds, } ds.dials[p] = actd - go actd.start(adctx) + go ds.dialWorker(adctx, p, actd.reqch) } // increase ref count before dropping dialsLk @@ -127,14 +115,8 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { // DialLock initiates a dial to the given peer if there are none in progress // then waits for the dial to that peer to complete. func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) { - return ds.getActiveDial(p).wait(ctx) -} + ad := ds.getActiveDial(p) + defer ad.decref() -// CancelDial cancels all in-progress dials to the given peer. -func (ds *DialSync) CancelDial(p peer.ID) { - ds.dialsLk.Lock() - defer ds.dialsLk.Unlock() - if ad, ok := ds.dials[p]; ok { - ad.cancel() - } + return ad.dial(ctx, p) } diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 3b3cf83228..c57c563c25 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -121,7 +121,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc } } - s.dsync = NewDialSync(s.doDial) + s.dsync = NewDialSync(s.dialWorker) s.limiter = newDialLimiter(s.dialAddr, s.IsFdConsumingAddr) s.proc = goprocessctx.WithContext(ctx) s.ctx = goprocessctx.OnClosingContext(s.proc) @@ -259,12 +259,6 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, c.notifyLk.Lock() s.conns.Unlock() - // We have a connection now. Cancel all other in-progress dials. - // This should be fast, no reason to wait till later. - if dir == network.DirOutbound { - s.dsync.CancelDial(p) - } - s.notifyAll(func(f network.Notifiee) { f.Connected(s, c) }) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index ccf33c2e4c..cc739791dc 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -14,7 +14,6 @@ import ( addrutil "github.com/libp2p/go-addr-util" lgbl "github.com/libp2p/go-libp2p-loggables" - logging "github.com/ipfs/go-log" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" ) @@ -58,6 +57,12 @@ var ( ErrGaterDisallowedConnection = errors.New("gater disallows connection to peer") ) +var ( + DelayDialPrivateAddr = 5 * time.Millisecond + DelayDialPublicAddr = 50 * time.Millisecond + DelayDialRelayAddr = 100 * time.Millisecond +) + // DialAttempts governs how many times a goroutine will try to dial a given peer. // Note: this is down to one, as we have _too many dials_ atm. To add back in, // add loop back in Dial(.) @@ -281,39 +286,306 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { return nil, err } -// doDial is an ugly shim method to retain all the logging and backoff logic -// of the old dialsync code -func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { - // Short circuit. - // By the time we take the dial lock, we may already *have* a connection - // to the peer. - c := s.bestAcceptableConnToPeer(ctx, p) - if c != nil { - return c, nil +/////////////////////////////////////////////////////////////////////////////////// +// lo and behold, The Dialer +// TODO explain how all this works +////////////////////////////////////////////////////////////////////////////////// +type dialRequest struct { + ctx context.Context + resch chan dialResponse +} + +type dialResponse struct { + conn *Conn + err error +} + +type dialComplete struct { + addr ma.Multiaddr + conn *Conn + err error +} + +// dialWorker is an active dial goroutine that synchronizes and executes concurrent dials +func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) { + if p == s.local { + for { + select { + case req, ok := <-reqch: + if !ok { + return + } + + req.resch <- dialResponse{err: ErrDialToSelf} + } + } } - logdial := lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) + s.dialWorkerLoop(ctx, p, reqch) +} - // ok, we have been charged to dial! let's do it. - // if it succeeds, dial will add the conn to the swarm itself. - defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done() +func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dialRequest) { + defer s.limiter.clearAllPeerDials(p) - conn, err := s.dial(ctx, p) - if err != nil { - conn := s.bestAcceptableConnToPeer(ctx, p) - if conn != nil { - // Hm? What error? - // Could have canceled the dial because we received a - // connection or some other random reason. - // Just ignore the error and return the connection. - log.Debugf("ignoring dial error because we already have a connection: %s", err) - return conn, nil + type pendRequest struct { + req dialRequest // the original request + err *DialError // dial error accumulator + addrs map[ma.Multiaddr]struct{} // pending addr dials + } + + type addrDial struct { + ctx context.Context + conn *Conn + err error + requests []int + } + + reqno := 0 + requests := make(map[int]*pendRequest) + pending := make(map[ma.Multiaddr]*addrDial) + + var triggerDial <-chan time.Time + var nextDial []ma.Multiaddr + active := 0 + done := false + + resch := make(chan dialComplete) + +loop: + for { + select { + case req, ok := <-reqch: + if !ok { + // request channel has been closed, wait for pending dials to complete + if active > 0 { + done = true + reqch = nil + triggerDial = nil + continue loop + } + + // no active dials, we are done + return + } + + c := s.bestAcceptableConnToPeer(req.ctx, p) + if c != nil { + req.resch <- dialResponse{conn: c} + continue loop + } + + addrs, err := s.addrsForDial(req.ctx, p) + if err != nil { + req.resch <- dialResponse{err: err} + continue loop + } + + // at this point, len(addrs) > 0 or else it would be error from addrsForDial + // ranke them to process in order + addrs = s.rankAddrs(addrs) + + // create the pending request object + pr := &pendRequest{ + req: req, + err: &DialError{Peer: p}, + addrs: make(map[ma.Multiaddr]struct{}), + } + for _, a := range addrs { + pr.addrs[a] = struct{}{} + } + + // check if any of the addrs has been successfully dialed and accumulate + // errors from complete dials while collecting new addrs to dial/join + var todial []ma.Multiaddr + var tojoin []*addrDial + + for _, a := range addrs { + ad, ok := pending[a] + if !ok { + todial = append(todial, a) + continue + } + + if ad.conn != nil { + // dial to this addr was successful, complete the request + req.resch <- dialResponse{conn: ad.conn} + continue loop + } + + if ad.err != nil { + // dial to this addr errored, accumulate the error + pr.err.recordErr(a, ad.err) + delete(pr.addrs, a) + } + + // dial is still pending, add to the join list + tojoin = append(tojoin, ad) + } + + if len(todial) == 0 && len(tojoin) == 0 { + // all request applicable addrs have been dialed, we must have errored + req.resch <- dialResponse{err: pr.err} + continue loop + } + + // the request has some pending or new dials, track it and schedule new dials + reqno++ + requests[reqno] = pr + + for _, ad := range tojoin { + ad.requests = append(ad.requests, reqno) + } + + if len(todial) > 0 { + for _, a := range todial { + pending[a] = &addrDial{ctx: req.ctx, requests: []int{reqno}} + } + + nextDial = append(nextDial, todial...) + nextDial = s.rankAddrs(nextDial) + + if triggerDial == nil { + trigger := make(chan time.Time) + close(trigger) + triggerDial = trigger + } + } + + case <-triggerDial: + if len(nextDial) == 0 { + triggerDial = nil + continue loop + } + + next := nextDial[0] + nextDial = nextDial[1:] + + // spawn the next dial + ad := pending[next] + go s.dialNextAddr(ad.ctx, p, next, resch) + active++ + + // select an appropriate delay for the next dial trigger + delay := s.delayForNextDial(next) + triggerDial = time.After(delay) + + case res := <-resch: + active-- + + if done && active == 0 { + return + } + + ad := pending[res.addr] + ad.conn = res.conn + ad.err = res.err + + dialRequests := ad.requests + ad.requests = nil + + if res.conn != nil { + // we got a connection, dispatch to still pending requests + for _, reqno := range dialRequests { + pr, ok := requests[reqno] + if !ok { + // it has already dispatched a connection + continue + } + + pr.req.resch <- dialResponse{conn: res.conn} + delete(requests, reqno) + } + + continue loop + } + + // it must be an error, accumulate it and dispatch dial error if the request has tried all addrs + for _, reqno := range dialRequests { + pr, ok := requests[reqno] + if !ok { + // has already been dispatched + continue + } + + // accumulate the error + pr.err.recordErr(res.addr, res.err) + + delete(pr.addrs, res.addr) + if len(pr.addrs) == 0 { + // all addrs have erred, dispatch dial error + pr.req.resch <- dialResponse{err: pr.err} + delete(requests, reqno) + } + } } + } +} - // ok, we failed. - return nil, err +func (s *Swarm) addrsForDial(ctx context.Context, p peer.ID) ([]ma.Multiaddr, error) { + peerAddrs := s.peers.Addrs(p) + if len(peerAddrs) == 0 { + return nil, ErrNoAddresses + } + + goodAddrs := s.filterKnownUndialables(p, peerAddrs) + if forceDirect, _ := network.GetForceDirectDial(ctx); forceDirect { + goodAddrs = addrutil.FilterAddrs(goodAddrs, s.nonProxyAddr) } - return conn, nil + + if len(goodAddrs) == 0 { + return nil, ErrNoGoodAddresses + } + + return goodAddrs, nil +} + +func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialComplete) { + // check the dial backoff + if forceDirect, _ := network.GetForceDirectDial(ctx); !forceDirect { + if s.backf.Backoff(p, addr) { + resch <- dialComplete{addr: addr, err: ErrDialBackoff} + return + } + } + + // start the dial + dresch := make(chan dialResult) + s.limitedDial(ctx, p, addr, dresch) + select { + case res := <-dresch: + if res.Err != nil { + if res.Err != context.Canceled { + s.backf.AddBackoff(p, addr) + } + + resch <- dialComplete{addr: addr, err: res.Err} + return + } + + conn, err := s.addConn(res.Conn, network.DirOutbound) + if err != nil { + res.Conn.Close() + resch <- dialComplete{addr: addr, err: err} + return + } + + resch <- dialComplete{addr: addr, conn: conn} + + case <-ctx.Done(): + resch <- dialComplete{addr: addr, err: ctx.Err()} + } +} + +func (s *Swarm) delayForNextDial(addr ma.Multiaddr) time.Duration { + if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err == nil { + return DelayDialRelayAddr + } + + if manet.IsPrivateAddr(addr) { + return DelayDialPrivateAddr + } + + return DelayDialPublicAddr } func (s *Swarm) canDial(addr ma.Multiaddr) bool { @@ -365,80 +637,6 @@ func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { return append(append(append(localUdpAddrs, othersUdp...), fds...), relays...) } -// dial is the actual swarm's dial logic, gated by Dial. -func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { - forceDirect, _ := network.GetForceDirectDial(ctx) - var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) - if p == s.local { - log.Event(ctx, "swarmDialDoDialSelf", logdial) - return nil, ErrDialToSelf - } - defer log.EventBegin(ctx, "swarmDialDo", logdial).Done() - logdial["dial"] = "failure" // start off with failure. set to "success" at the end. - - sk := s.peers.PrivKey(s.local) - logdial["encrypted"] = sk != nil // log whether this will be an encrypted dial or not. - if sk == nil { - // fine for sk to be nil, just log. - log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.") - } - - ////// - peerAddrs := s.peers.Addrs(p) - if len(peerAddrs) == 0 { - return nil, &DialError{Peer: p, Cause: ErrNoAddresses} - } - goodAddrs := s.filterKnownUndialables(p, peerAddrs) - if forceDirect { - goodAddrs = addrutil.FilterAddrs(goodAddrs, s.nonProxyAddr) - } - if len(goodAddrs) == 0 { - return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses} - } - - if !forceDirect { - /////// Check backoff andnRank addresses - var nonBackoff bool - for _, a := range goodAddrs { - // skip addresses in back-off - if !s.backf.Backoff(p, a) { - nonBackoff = true - } - } - if !nonBackoff { - return nil, ErrDialBackoff - } - } - - connC, dialErr := s.dialAddrs(ctx, p, s.rankAddrs(goodAddrs)) - if dialErr != nil { - logdial["error"] = dialErr.Cause.Error() - switch dialErr.Cause { - case context.Canceled, context.DeadlineExceeded: - // Always prefer the context errors as we rely on being - // able to check them. - // - // Removing this will BREAK backoff (causing us to - // backoff when canceling dials). - return nil, dialErr.Cause - } - return nil, dialErr - } - logdial["conn"] = logging.Metadata{ - "localAddr": connC.LocalMultiaddr(), - "remoteAddr": connC.RemoteMultiaddr(), - } - swarmC, err := s.addConn(connC, network.DirOutbound) - if err != nil { - logdial["error"] = err.Error() - connC.Close() // close the connection. didn't work out :( - return nil, &DialError{Peer: p, Cause: err} - } - - logdial["dial"] = "success" - return swarmC, nil -} - // filterKnownUndialables takes a list of multiaddrs, and removes those // that we definitely don't want to dial: addresses configured to be blocked, // IPv6 link-local addresses, addresses without a dial-capable transport, @@ -466,98 +664,6 @@ func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Mul ) } -func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (transport.CapableConn, *DialError) { - /* - This slice-to-chan code is temporary, the peerstore can currently provide - a channel as an interface for receiving addresses, but more thought - needs to be put into the execution. For now, this allows us to use - the improved rate limiter, while maintaining the outward behaviour - that we previously had (halting a dial when we run out of addrs) - */ - var remoteAddrChan chan ma.Multiaddr - if len(remoteAddrs) > 0 { - remoteAddrChan = make(chan ma.Multiaddr, len(remoteAddrs)) - for i := range remoteAddrs { - remoteAddrChan <- remoteAddrs[i] - } - close(remoteAddrChan) - } - - log.Debugf("%s swarm dialing %s", s.local, p) - - ctx, cancel := context.WithCancel(ctx) - defer cancel() // cancel work when we exit func - - // use a single response type instead of errs and conns, reduces complexity *a ton* - respch := make(chan dialResult) - err := &DialError{Peer: p} - - defer s.limiter.clearAllPeerDials(p) - - var active int -dialLoop: - for remoteAddrChan != nil || active > 0 { - // Check for context cancellations and/or responses first. - select { - case <-ctx.Done(): - break dialLoop - case resp := <-respch: - active-- - if resp.Err != nil { - // Errors are normal, lots of dials will fail - if resp.Err != context.Canceled { - s.backf.AddBackoff(p, resp.Addr) - } - - log.Infof("got error on dial: %s", resp.Err) - err.recordErr(resp.Addr, resp.Err) - } else if resp.Conn != nil { - return resp.Conn, nil - } - - // We got a result, try again from the top. - continue - default: - } - - // Now, attempt to dial. - select { - case addr, ok := <-remoteAddrChan: - if !ok { - remoteAddrChan = nil - continue - } - - s.limitedDial(ctx, p, addr, respch) - active++ - case <-ctx.Done(): - break dialLoop - case resp := <-respch: - active-- - if resp.Err != nil { - // Errors are normal, lots of dials will fail - if resp.Err != context.Canceled { - s.backf.AddBackoff(p, resp.Addr) - } - - log.Infof("got error on dial: %s", resp.Err) - err.recordErr(resp.Addr, resp.Err) - } else if resp.Conn != nil { - return resp.Conn, nil - } - } - } - - if ctxErr := ctx.Err(); ctxErr != nil { - err.Cause = ctxErr - } else if len(err.DialErrors) == 0 { - err.Cause = network.ErrNoRemoteAddrs - } else { - err.Cause = ErrAllDialsFailed - } - return nil, err -} - // limitedDial will start a dial to the given peer when // it is able, respecting the various different types of rate // limiting that occur without using extra goroutines per addr @@ -570,6 +676,7 @@ func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp }) } +// dialAddr is the actual dial for an addr, indirectly invoked through the limiter func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (transport.CapableConn, error) { // Just to double check. Costs nothing. if s.local == p { From 30d51370fc298dac8f9f6cb77d319c5e4caf5e05 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 30 Mar 2021 17:46:01 +0300 Subject: [PATCH 05/30] adjust next dial delays --- p2p/net/swarm/swarm_dial.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index cc739791dc..428845f344 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -59,8 +59,8 @@ var ( var ( DelayDialPrivateAddr = 5 * time.Millisecond - DelayDialPublicAddr = 50 * time.Millisecond - DelayDialRelayAddr = 100 * time.Millisecond + DelayDialPublicAddr = 25 * time.Millisecond + DelayDialRelayAddr = 50 * time.Millisecond ) // DialAttempts governs how many times a goroutine will try to dial a given peer. From 1efe95aff0fff1b2dc776e951f69ae0714ea52b0 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 30 Mar 2021 20:38:48 +0300 Subject: [PATCH 06/30] fix dial_sync tests --- p2p/net/swarm/dial_sync.go | 12 +++--- p2p/net/swarm/dial_sync_test.go | 70 +++++++++++++++++++++++++-------- p2p/net/swarm/swarm_dial.go | 38 +++++++++--------- 3 files changed, 79 insertions(+), 41 deletions(-) diff --git a/p2p/net/swarm/dial_sync.go b/p2p/net/swarm/dial_sync.go index 2efdf067b6..54a21067b7 100644 --- a/p2p/net/swarm/dial_sync.go +++ b/p2p/net/swarm/dial_sync.go @@ -13,7 +13,7 @@ import ( var errDialCanceled = errors.New("dial was aborted internally, likely due to https://git.io/Je2wW") // DialFunc is the type of function expected by DialSync. -type DialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) +type DialWorkerFunc func(context.Context, peer.ID, <-chan DialRequest) // NewDialSync constructs a new DialSync func NewDialSync(worker DialWorkerFunc) *DialSync { @@ -38,7 +38,7 @@ type activeDial struct { ctx context.Context cancel func() - reqch chan dialRequest + reqch chan DialRequest ds *DialSync } @@ -68,16 +68,16 @@ func (ad *activeDial) dial(ctx context.Context, p peer.ID) (*Conn, error) { dialCtx = network.WithSimultaneousConnect(dialCtx, reason) } - resch := make(chan dialResponse, 1) + resch := make(chan DialResponse, 1) select { - case ad.reqch <- dialRequest{ctx: dialCtx, resch: resch}: + case ad.reqch <- DialRequest{Ctx: dialCtx, Resch: resch}: case <-ctx.Done(): return nil, ctx.Err() } select { case res := <-resch: - return res.conn, res.err + return res.Conn, res.Err case <-ctx.Done(): return nil, ctx.Err() } @@ -98,7 +98,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { id: p, ctx: adctx, cancel: cancel, - reqch: make(chan dialRequest), + reqch: make(chan DialRequest), ds: ds, } ds.dials[p] = actd diff --git a/p2p/net/swarm/dial_sync_test.go b/p2p/net/swarm/dial_sync_test.go index 485d1a3171..ef7458a554 100644 --- a/p2p/net/swarm/dial_sync_test.go +++ b/p2p/net/swarm/dial_sync_test.go @@ -12,19 +12,33 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -func getMockDialFunc() (DialFunc, func(), context.Context, <-chan struct{}) { +func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{}) { dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care dialctx, cancel := context.WithCancel(context.Background()) ch := make(chan struct{}) - f := func(ctx context.Context, p peer.ID) (*Conn, error) { + f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { dfcalls <- struct{}{} - defer cancel() - select { - case <-ch: - return new(Conn), nil - case <-ctx.Done(): - return nil, ctx.Err() - } + go func() { + defer cancel() + for { + select { + case req, ok := <-reqch: + if !ok { + return + } + + select { + case <-ch: + req.Resch <- DialResponse{Conn: new(Conn)} + case <-ctx.Done(): + req.Resch <- DialResponse{Err: ctx.Err()} + return + } + case <-ctx.Done(): + return + } + } + }() } o := new(sync.Once) @@ -174,12 +188,25 @@ func TestDialSyncAllCancel(t *testing.T) { func TestFailFirst(t *testing.T) { var count int - f := func(ctx context.Context, p peer.ID) (*Conn, error) { - if count > 0 { - return new(Conn), nil + f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { + for { + select { + case req, ok := <-reqch: + if !ok { + return + } + + if count > 0 { + req.Resch <- DialResponse{Conn: new(Conn)} + } else { + req.Resch <- DialResponse{Err: fmt.Errorf("gophers ate the modem")} + } + count++ + + case <-ctx.Done(): + return + } } - count++ - return nil, fmt.Errorf("gophers ate the modem") } ds := NewDialSync(f) @@ -205,8 +232,19 @@ func TestFailFirst(t *testing.T) { } func TestStressActiveDial(t *testing.T) { - ds := NewDialSync(func(ctx context.Context, p peer.ID) (*Conn, error) { - return nil, nil + ds := NewDialSync(func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { + for { + select { + case req, ok := <-reqch: + if !ok { + return + } + + req.Resch <- DialResponse{} + case <-ctx.Done(): + return + } + } }) wg := sync.WaitGroup{} diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 428845f344..ae4c87f1ad 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -290,14 +290,14 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { // lo and behold, The Dialer // TODO explain how all this works ////////////////////////////////////////////////////////////////////////////////// -type dialRequest struct { - ctx context.Context - resch chan dialResponse +type DialRequest struct { + Ctx context.Context + Resch chan DialResponse } -type dialResponse struct { - conn *Conn - err error +type DialResponse struct { + Conn *Conn + Err error } type dialComplete struct { @@ -307,7 +307,7 @@ type dialComplete struct { } // dialWorker is an active dial goroutine that synchronizes and executes concurrent dials -func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) { +func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { if p == s.local { for { select { @@ -316,7 +316,7 @@ func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequ return } - req.resch <- dialResponse{err: ErrDialToSelf} + req.Resch <- DialResponse{Err: ErrDialToSelf} } } } @@ -324,11 +324,11 @@ func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequ s.dialWorkerLoop(ctx, p, reqch) } -func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dialRequest) { +func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { defer s.limiter.clearAllPeerDials(p) type pendRequest struct { - req dialRequest // the original request + req DialRequest // the original request err *DialError // dial error accumulator addrs map[ma.Multiaddr]struct{} // pending addr dials } @@ -368,15 +368,15 @@ loop: return } - c := s.bestAcceptableConnToPeer(req.ctx, p) + c := s.bestAcceptableConnToPeer(req.Ctx, p) if c != nil { - req.resch <- dialResponse{conn: c} + req.Resch <- DialResponse{Conn: c} continue loop } - addrs, err := s.addrsForDial(req.ctx, p) + addrs, err := s.addrsForDial(req.Ctx, p) if err != nil { - req.resch <- dialResponse{err: err} + req.Resch <- DialResponse{Err: err} continue loop } @@ -408,7 +408,7 @@ loop: if ad.conn != nil { // dial to this addr was successful, complete the request - req.resch <- dialResponse{conn: ad.conn} + req.Resch <- DialResponse{Conn: ad.conn} continue loop } @@ -424,7 +424,7 @@ loop: if len(todial) == 0 && len(tojoin) == 0 { // all request applicable addrs have been dialed, we must have errored - req.resch <- dialResponse{err: pr.err} + req.Resch <- DialResponse{Err: pr.err} continue loop } @@ -438,7 +438,7 @@ loop: if len(todial) > 0 { for _, a := range todial { - pending[a] = &addrDial{ctx: req.ctx, requests: []int{reqno}} + pending[a] = &addrDial{ctx: req.Ctx, requests: []int{reqno}} } nextDial = append(nextDial, todial...) @@ -492,7 +492,7 @@ loop: continue } - pr.req.resch <- dialResponse{conn: res.conn} + pr.req.Resch <- DialResponse{Conn: res.conn} delete(requests, reqno) } @@ -513,7 +513,7 @@ loop: delete(pr.addrs, res.addr) if len(pr.addrs) == 0 { // all addrs have erred, dispatch dial error - pr.req.resch <- dialResponse{err: pr.err} + pr.req.Resch <- DialResponse{Err: pr.err} delete(requests, reqno) } } From b69afe8dd9eb1d813b8b1cfac227de017641b520 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 30 Mar 2021 21:35:48 +0300 Subject: [PATCH 07/30] clear address dial when they fail because of backoff makes TestDialBackoff happy --- p2p/net/swarm/swarm_dial.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index ae4c87f1ad..9b10db165b 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -517,6 +517,11 @@ loop: delete(requests, reqno) } } + + // if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests + if res.err == ErrDialBackoff { + delete(pending, res.addr) + } } } } From 790eddaaa647ba3c91f3729d4af3d5916a119803 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 12:27:51 +0300 Subject: [PATCH 08/30] nuke incref, it's useless --- p2p/net/swarm/dial_sync.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/p2p/net/swarm/dial_sync.go b/p2p/net/swarm/dial_sync.go index 54a21067b7..e5f25478cb 100644 --- a/p2p/net/swarm/dial_sync.go +++ b/p2p/net/swarm/dial_sync.go @@ -43,10 +43,6 @@ type activeDial struct { ds *DialSync } -func (ad *activeDial) incref() { - ad.refCnt++ -} - func (ad *activeDial) decref() { ad.ds.dialsLk.Lock() ad.refCnt-- @@ -107,7 +103,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { } // increase ref count before dropping dialsLk - actd.incref() + actd.refCnt++ return actd } From df78bf89f4194eec30d3bfdb140729659146ae64 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 12:34:54 +0300 Subject: [PATCH 09/30] make dialWorker return an error for self dials and responsible for spawning the loop --- p2p/net/swarm/dial_sync.go | 19 ++++++---- p2p/net/swarm/dial_sync_test.go | 61 ++++++++++++++++++--------------- p2p/net/swarm/swarm_dial.go | 16 +++------ 3 files changed, 51 insertions(+), 45 deletions(-) diff --git a/p2p/net/swarm/dial_sync.go b/p2p/net/swarm/dial_sync.go index e5f25478cb..32196cb2d5 100644 --- a/p2p/net/swarm/dial_sync.go +++ b/p2p/net/swarm/dial_sync.go @@ -13,7 +13,7 @@ import ( var errDialCanceled = errors.New("dial was aborted internally, likely due to https://git.io/Je2wW") // DialFunc is the type of function expected by DialSync. -type DialWorkerFunc func(context.Context, peer.ID, <-chan DialRequest) +type DialWorkerFunc func(context.Context, peer.ID, <-chan DialRequest) error // NewDialSync constructs a new DialSync func NewDialSync(worker DialWorkerFunc) *DialSync { @@ -79,7 +79,7 @@ func (ad *activeDial) dial(ctx context.Context, p peer.ID) (*Conn, error) { } } -func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { +func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) { ds.dialsLk.Lock() defer ds.dialsLk.Unlock() @@ -99,20 +99,27 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { } ds.dials[p] = actd - go ds.dialWorker(adctx, p, actd.reqch) + err := ds.dialWorker(adctx, p, actd.reqch) + if err != nil { + cancel() + return nil, err + } } // increase ref count before dropping dialsLk actd.refCnt++ - return actd + return actd, nil } // DialLock initiates a dial to the given peer if there are none in progress // then waits for the dial to that peer to complete. func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) { - ad := ds.getActiveDial(p) - defer ad.decref() + ad, err := ds.getActiveDial(p) + if err != nil { + return nil, err + } + defer ad.decref() return ad.dial(ctx, p) } diff --git a/p2p/net/swarm/dial_sync_test.go b/p2p/net/swarm/dial_sync_test.go index ef7458a554..f1a9f8a539 100644 --- a/p2p/net/swarm/dial_sync_test.go +++ b/p2p/net/swarm/dial_sync_test.go @@ -16,7 +16,7 @@ func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{} dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care dialctx, cancel := context.WithCancel(context.Background()) ch := make(chan struct{}) - f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { + f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { dfcalls <- struct{}{} go func() { defer cancel() @@ -39,6 +39,7 @@ func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{} } } }() + return nil } o := new(sync.Once) @@ -188,25 +189,28 @@ func TestDialSyncAllCancel(t *testing.T) { func TestFailFirst(t *testing.T) { var count int - f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { - for { - select { - case req, ok := <-reqch: - if !ok { - return - } + f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { + go func() { + for { + select { + case req, ok := <-reqch: + if !ok { + return + } - if count > 0 { - req.Resch <- DialResponse{Conn: new(Conn)} - } else { - req.Resch <- DialResponse{Err: fmt.Errorf("gophers ate the modem")} - } - count++ + if count > 0 { + req.Resch <- DialResponse{Conn: new(Conn)} + } else { + req.Resch <- DialResponse{Err: fmt.Errorf("gophers ate the modem")} + } + count++ - case <-ctx.Done(): - return + case <-ctx.Done(): + return + } } - } + }() + return nil } ds := NewDialSync(f) @@ -232,19 +236,22 @@ func TestFailFirst(t *testing.T) { } func TestStressActiveDial(t *testing.T) { - ds := NewDialSync(func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { - for { - select { - case req, ok := <-reqch: - if !ok { + ds := NewDialSync(func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { + go func() { + for { + select { + case req, ok := <-reqch: + if !ok { + return + } + + req.Resch <- DialResponse{} + case <-ctx.Done(): return } - - req.Resch <- DialResponse{} - case <-ctx.Done(): - return } - } + }() + return nil }) wg := sync.WaitGroup{} diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 9b10db165b..77cb8a6bc7 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -307,21 +307,13 @@ type dialComplete struct { } // dialWorker is an active dial goroutine that synchronizes and executes concurrent dials -func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { +func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { if p == s.local { - for { - select { - case req, ok := <-reqch: - if !ok { - return - } - - req.Resch <- DialResponse{Err: ErrDialToSelf} - } - } + return ErrDialToSelf } - s.dialWorkerLoop(ctx, p, reqch) + go s.dialWorkerLoop(ctx, p, reqch) + return nil } func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { From 88df3095026e362e8eb89b90dd2fa0317d9722a4 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 13:58:16 +0300 Subject: [PATCH 10/30] don't use a goroutine for the actual dial --- p2p/net/swarm/swarm_dial.go | 138 +++++++++++++++++++----------------- 1 file changed, 72 insertions(+), 66 deletions(-) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 77cb8a6bc7..8f1e2c3cb8 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -290,6 +290,7 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { // lo and behold, The Dialer // TODO explain how all this works ////////////////////////////////////////////////////////////////////////////////// + type DialRequest struct { Ctx context.Context Resch chan DialResponse @@ -300,12 +301,6 @@ type DialResponse struct { Err error } -type dialComplete struct { - addr ma.Multiaddr - conn *Conn - err error -} - // dialWorker is an active dial goroutine that synchronizes and executes concurrent dials func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { if p == s.local { @@ -326,6 +321,7 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial } type addrDial struct { + addr ma.Multiaddr ctx context.Context conn *Conn err error @@ -336,12 +332,40 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial requests := make(map[int]*pendRequest) pending := make(map[ma.Multiaddr]*addrDial) + dispatchError := func(ad *addrDial, err error) { + ad.err = err + for _, reqno := range ad.requests { + pr, ok := requests[reqno] + if !ok { + // has already been dispatched + continue + } + + // accumulate the error + pr.err.recordErr(ad.addr, err) + + delete(pr.addrs, ad.addr) + if len(pr.addrs) == 0 { + // all addrs have erred, dispatch dial error + pr.req.Resch <- DialResponse{Err: pr.err} + delete(requests, reqno) + } + } + + ad.requests = nil + + // if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests + if err == ErrDialBackoff { + delete(pending, ad.addr) + } + } + var triggerDial <-chan time.Time var nextDial []ma.Multiaddr active := 0 done := false - resch := make(chan dialComplete) + resch := make(chan dialResult) loop: for { @@ -408,6 +432,7 @@ loop: // dial to this addr errored, accumulate the error pr.err.recordErr(a, ad.err) delete(pr.addrs, a) + continue } // dial is still pending, add to the join list @@ -430,7 +455,7 @@ loop: if len(todial) > 0 { for _, a := range todial { - pending[a] = &addrDial{ctx: req.Ctx, requests: []int{reqno}} + pending[a] = &addrDial{addr: a, ctx: req.Ctx, requests: []int{reqno}} } nextDial = append(nextDial, todial...) @@ -454,7 +479,12 @@ loop: // spawn the next dial ad := pending[next] - go s.dialNextAddr(ad.ctx, p, next, resch) + err := s.dialNextAddr(ad.ctx, p, next, resch) + if err != nil { + dispatchError(ad, err) + continue loop + } + active++ // select an appropriate delay for the next dial trigger @@ -465,55 +495,54 @@ loop: active-- if done && active == 0 { + if res.Conn != nil { + // we got an actual connection, but the dial has been cancelled + // Should we close it? I think not, we should just add it to the swarm + _, err := s.addConn(res.Conn, network.DirOutbound) + if err != nil { + // well duh, now we have to close it + res.Conn.Close() + } + } return } - ad := pending[res.addr] - ad.conn = res.conn - ad.err = res.err + ad := pending[res.Addr] - dialRequests := ad.requests - ad.requests = nil + if res.Conn != nil { + // we got a connection, add it to the swarm + conn, err := s.addConn(res.Conn, network.DirOutbound) + if err != nil { + // oops no, we failed to add it to the swarm + res.Conn.Close() + dispatchError(ad, err) + continue loop + } - if res.conn != nil { - // we got a connection, dispatch to still pending requests - for _, reqno := range dialRequests { + // dispatch to still pending requests + for _, reqno := range ad.requests { pr, ok := requests[reqno] if !ok { // it has already dispatched a connection continue } - pr.req.Resch <- DialResponse{Conn: res.conn} + pr.req.Resch <- DialResponse{Conn: conn} delete(requests, reqno) } + ad.conn = conn + ad.requests = nil + continue loop } - // it must be an error, accumulate it and dispatch dial error if the request has tried all addrs - for _, reqno := range dialRequests { - pr, ok := requests[reqno] - if !ok { - // has already been dispatched - continue - } - - // accumulate the error - pr.err.recordErr(res.addr, res.err) - - delete(pr.addrs, res.addr) - if len(pr.addrs) == 0 { - // all addrs have erred, dispatch dial error - pr.req.Resch <- DialResponse{Err: pr.err} - delete(requests, reqno) - } + // it must be an error -- add backoff if applicable and dispatch + if res.Err != context.Canceled { + s.backf.AddBackoff(p, res.Addr) } - // if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests - if res.err == ErrDialBackoff { - delete(pending, res.addr) - } + dispatchError(ad, res.Err) } } } @@ -536,41 +565,18 @@ func (s *Swarm) addrsForDial(ctx context.Context, p peer.ID) ([]ma.Multiaddr, er return goodAddrs, nil } -func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialComplete) { +func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialResult) error { // check the dial backoff if forceDirect, _ := network.GetForceDirectDial(ctx); !forceDirect { if s.backf.Backoff(p, addr) { - resch <- dialComplete{addr: addr, err: ErrDialBackoff} - return + return ErrDialBackoff } } // start the dial - dresch := make(chan dialResult) - s.limitedDial(ctx, p, addr, dresch) - select { - case res := <-dresch: - if res.Err != nil { - if res.Err != context.Canceled { - s.backf.AddBackoff(p, addr) - } - - resch <- dialComplete{addr: addr, err: res.Err} - return - } + s.limitedDial(ctx, p, addr, resch) - conn, err := s.addConn(res.Conn, network.DirOutbound) - if err != nil { - res.Conn.Close() - resch <- dialComplete{addr: addr, err: err} - return - } - - resch <- dialComplete{addr: addr, conn: conn} - - case <-ctx.Done(): - resch <- dialComplete{addr: addr, err: ctx.Err()} - } + return nil } func (s *Swarm) delayForNextDial(addr ma.Multiaddr) time.Duration { From 0cf03ba71aad3a5b63780e3b178aadf40538f95b Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 16:12:45 +0300 Subject: [PATCH 11/30] batch dials together, rework address ranking --- p2p/net/swarm/swarm_dial.go | 163 +++++++++++++++++++++++------------- 1 file changed, 107 insertions(+), 56 deletions(-) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 8f1e2c3cb8..3d8ec3df33 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -58,9 +58,9 @@ var ( ) var ( - DelayDialPrivateAddr = 5 * time.Millisecond - DelayDialPublicAddr = 25 * time.Millisecond - DelayDialRelayAddr = 50 * time.Millisecond + delayDialPrivateAddr = 5 * time.Millisecond + delayDialPublicAddr = 25 * time.Millisecond + delayDialRelayAddr = 50 * time.Millisecond ) // DialAttempts governs how many times a goroutine will try to dial a given peer. @@ -361,6 +361,9 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial } var triggerDial <-chan time.Time + triggerNow := make(chan time.Time) + close(triggerNow) + var nextDial []ma.Multiaddr active := 0 done := false @@ -461,34 +464,46 @@ loop: nextDial = append(nextDial, todial...) nextDial = s.rankAddrs(nextDial) - if triggerDial == nil { - trigger := make(chan time.Time) - close(trigger) - triggerDial = trigger - } + // trigger a new dial now to account for the new addrs we added + triggerDial = triggerNow } case <-triggerDial: - if len(nextDial) == 0 { - triggerDial = nil - continue loop - } + // we dial batches of addresses together, logically belonging to the same batch + // after a batch of addresses has been dialed, we add a delay before initiating the next batch + dialed := false + last := 0 + next := 0 + for i, addr := range nextDial { + if dialed && !s.sameAddrBatch(nextDial[last], addr) { + break + } - next := nextDial[0] - nextDial = nextDial[1:] + next = i + 1 - // spawn the next dial - ad := pending[next] - err := s.dialNextAddr(ad.ctx, p, next, resch) - if err != nil { - dispatchError(ad, err) - continue loop + // spawn the dial + ad := pending[addr] + err := s.dialNextAddr(ad.ctx, p, addr, resch) + if err != nil { + dispatchError(ad, err) + continue + } + + dialed = true + last = i + active++ } - active++ + lastDial := nextDial[last] + nextDial = nextDial[next:] + if !dialed || len(nextDial) == 0 { + // we didn't dial anything because of backoff or we don't have any more addresses + triggerDial = nil + continue loop + } - // select an appropriate delay for the next dial trigger - delay := s.delayForNextDial(next) + // select an appropriate delay for the next dial batch + delay := s.delayForNextDial(lastDial) triggerDial = time.After(delay) case res := <-resch: @@ -516,6 +531,9 @@ loop: // oops no, we failed to add it to the swarm res.Conn.Close() dispatchError(ad, err) + if active == 0 && len(nextDial) > 0 { + triggerDial = triggerNow + } continue loop } @@ -543,6 +561,9 @@ loop: } dispatchError(ad, res.Err) + if active == 0 && len(nextDial) > 0 { + triggerDial = triggerNow + } } } } @@ -579,16 +600,37 @@ func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, return nil } +func (s *Swarm) sameAddrBatch(a, b ma.Multiaddr) bool { + // is it a relay addr? + if s.IsRelayAddr(a) { + return s.IsRelayAddr(b) + } + + // is it an expensive addr? + if s.IsExpensiveAddr(a) { + return s.IsExpensiveAddr(b) + } + + // is it a public addr? + if !manet.IsPrivateAddr(a) { + return !manet.IsPrivateAddr(b) && + s.IsFdConsumingAddr(a) == s.IsFdConsumingAddr(b) + } + + // it's a private addr + return manet.IsPrivateAddr(b) +} + func (s *Swarm) delayForNextDial(addr ma.Multiaddr) time.Duration { if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err == nil { - return DelayDialRelayAddr + return delayDialRelayAddr } if manet.IsPrivateAddr(addr) { - return DelayDialPrivateAddr + return delayDialPrivateAddr } - return DelayDialPublicAddr + return delayDialPublicAddr } func (s *Swarm) canDial(addr ma.Multiaddr) bool { @@ -601,43 +643,41 @@ func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool { return !t.Proxy() } -// ranks addresses in descending order of preference for dialing -// Private UDP > Public UDP > Private TCP > Public TCP > UDP Relay server > TCP Relay server +// ranks addresses in descending order of preference for dialing, with the following rules: +// NonRelay > Relay +// NonWS > WS +// Private > Public +// UDP > TCP func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { - var localUdpAddrs []ma.Multiaddr // private udp - var relayUdpAddrs []ma.Multiaddr // relay udp - var othersUdp []ma.Multiaddr // public udp + addrTier := func(a ma.Multiaddr) (tier int) { + if s.IsRelayAddr(a) { + tier |= 0b1000 + } + if s.IsExpensiveAddr(a) { + tier |= 0b0100 + } + if !manet.IsPrivateAddr(a) { + tier |= 0b0010 + } + if s.IsFdConsumingAddr(a) { + tier |= 0b0001 + } - var localFdAddrs []ma.Multiaddr // private fd consuming - var relayFdAddrs []ma.Multiaddr // relay fd consuming - var othersFd []ma.Multiaddr // public fd consuming + return tier + } + tiers := make([][]ma.Multiaddr, 16) for _, a := range addrs { - if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil { - if s.IsFdConsumingAddr(a) { - relayFdAddrs = append(relayFdAddrs, a) - continue - } - relayUdpAddrs = append(relayUdpAddrs, a) - } else if manet.IsPrivateAddr(a) { - if s.IsFdConsumingAddr(a) { - localFdAddrs = append(localFdAddrs, a) - continue - } - localUdpAddrs = append(localUdpAddrs, a) - } else { - if s.IsFdConsumingAddr(a) { - othersFd = append(othersFd, a) - continue - } - othersUdp = append(othersUdp, a) - } + tier := addrTier(a) + tiers[tier] = append(tiers[tier], a) } - relays := append(relayUdpAddrs, relayFdAddrs...) - fds := append(localFdAddrs, othersFd...) + result := make([]ma.Multiaddr, 0, len(addrs)) + for _, tier := range tiers { + result = append(result, tier...) + } - return append(append(append(localUdpAddrs, othersUdp...), fds...), relays...) + return result } // filterKnownUndialables takes a list of multiaddrs, and removes those @@ -729,3 +769,14 @@ func (s *Swarm) IsFdConsumingAddr(addr ma.Multiaddr) bool { _, err2 := first.ValueForProtocol(ma.P_UNIX) return err1 == nil || err2 == nil } + +func (s *Swarm) IsExpensiveAddr(addr ma.Multiaddr) bool { + _, err1 := addr.ValueForProtocol(ma.P_WS) + _, err2 := addr.ValueForProtocol(ma.P_WSS) + return err1 == nil || err2 == nil +} + +func (s *Swarm) IsRelayAddr(addr ma.Multiaddr) bool { + _, err := addr.ValueForProtocol(ma.P_CIRCUIT) + return err == nil +} From 3707a55a7e0b9f50912ad37cadc9556e63be0339 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 16:28:10 +0300 Subject: [PATCH 12/30] tune down batch dial delays --- p2p/net/swarm/swarm_dial.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 3d8ec3df33..2b7f0b03e9 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -58,9 +58,9 @@ var ( ) var ( - delayDialPrivateAddr = 5 * time.Millisecond - delayDialPublicAddr = 25 * time.Millisecond - delayDialRelayAddr = 50 * time.Millisecond + delayDialPrivateAddr = 1 * time.Millisecond + delayDialPublicAddr = 5 * time.Millisecond + delayDialRelayAddr = 10 * time.Millisecond ) // DialAttempts governs how many times a goroutine will try to dial a given peer. From 0bd0fcded706eed3345f92092951ff96dd385554 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 16:42:04 +0300 Subject: [PATCH 13/30] use a timer instead of time.After --- p2p/net/swarm/swarm_dial.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 2b7f0b03e9..de16ba3fbb 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -361,9 +361,16 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial } var triggerDial <-chan time.Time + var triggerTimer *time.Timer triggerNow := make(chan time.Time) close(triggerNow) + defer func() { + if triggerTimer != nil { + triggerTimer.Stop() + } + }() + var nextDial []ma.Multiaddr active := 0 done := false @@ -504,7 +511,15 @@ loop: // select an appropriate delay for the next dial batch delay := s.delayForNextDial(lastDial) - triggerDial = time.After(delay) + if triggerTimer == nil { + triggerTimer = time.NewTimer(delay) + } else { + if !triggerTimer.Stop() && triggerDial != triggerTimer.C { + <-triggerTimer.C + } + triggerTimer.Reset(delay) + } + triggerDial = triggerTimer.C case res := <-resch: active-- From 5539ac088539fe462dee49e0597db88407b83293 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 17:46:14 +0300 Subject: [PATCH 14/30] kill dial jump delays --- p2p/net/swarm/swarm_dial.go | 43 ++----------------------------------- 1 file changed, 2 insertions(+), 41 deletions(-) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index de16ba3fbb..1fe37a1fdd 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -57,12 +57,6 @@ var ( ErrGaterDisallowedConnection = errors.New("gater disallows connection to peer") ) -var ( - delayDialPrivateAddr = 1 * time.Millisecond - delayDialPublicAddr = 5 * time.Millisecond - delayDialRelayAddr = 10 * time.Millisecond -) - // DialAttempts governs how many times a goroutine will try to dial a given peer. // Note: this is down to one, as we have _too many dials_ atm. To add back in, // add loop back in Dial(.) @@ -360,17 +354,10 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial } } - var triggerDial <-chan time.Time - var triggerTimer *time.Timer - triggerNow := make(chan time.Time) + var triggerDial <-chan struct{} + triggerNow := make(chan struct{}) close(triggerNow) - defer func() { - if triggerTimer != nil { - triggerTimer.Stop() - } - }() - var nextDial []ma.Multiaddr active := 0 done := false @@ -501,26 +488,12 @@ loop: active++ } - lastDial := nextDial[last] nextDial = nextDial[next:] if !dialed || len(nextDial) == 0 { // we didn't dial anything because of backoff or we don't have any more addresses triggerDial = nil - continue loop } - // select an appropriate delay for the next dial batch - delay := s.delayForNextDial(lastDial) - if triggerTimer == nil { - triggerTimer = time.NewTimer(delay) - } else { - if !triggerTimer.Stop() && triggerDial != triggerTimer.C { - <-triggerTimer.C - } - triggerTimer.Reset(delay) - } - triggerDial = triggerTimer.C - case res := <-resch: active-- @@ -636,18 +609,6 @@ func (s *Swarm) sameAddrBatch(a, b ma.Multiaddr) bool { return manet.IsPrivateAddr(b) } -func (s *Swarm) delayForNextDial(addr ma.Multiaddr) time.Duration { - if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err == nil { - return delayDialRelayAddr - } - - if manet.IsPrivateAddr(addr) { - return delayDialPrivateAddr - } - - return delayDialPublicAddr -} - func (s *Swarm) canDial(addr ma.Multiaddr) bool { t := s.TransportForDialing(addr) return t != nil && t.CanDial(addr) From 91c366808709be2424c2f5de7caf9622d88b744a Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 18:10:43 +0300 Subject: [PATCH 15/30] add TestDialExistingConnection --- p2p/net/swarm/dial_test.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go index 9fc5df4189..c8b8bbe9b1 100644 --- a/p2p/net/swarm/dial_test.go +++ b/p2p/net/swarm/dial_test.go @@ -524,3 +524,29 @@ func TestDialPeerFailed(t *testing.T) { t.Errorf("expected %d errors, got %d", expectedErrorsCount, len(dialErr.DialErrors)) } } + +func TestDialExistingConnection(t *testing.T) { + ctx := context.Background() + + swarms := makeSwarms(ctx, t, 2) + defer closeSwarms(swarms) + s1 := swarms[0] + s2 := swarms[1] + + s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), peerstore.PermanentAddrTTL) + + c1, err := s1.DialPeer(ctx, s2.LocalPeer()) + if err != nil { + t.Fatal(err) + } + + c2, err := s1.DialPeer(ctx, s2.LocalPeer()) + if err != nil { + t.Fatal(err) + } + + if c1 != c2 { + t.Fatal("expecting the same connection from both dials") + } + +} From ad3f67685ce2dd4127011bd9ca0f008fdd761e73 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 18:18:22 +0300 Subject: [PATCH 16/30] do a last ditch check for acceptable connections before dispatching a dial error --- p2p/net/swarm/swarm_dial.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 1fe37a1fdd..2d3255b4a6 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -341,7 +341,14 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial delete(pr.addrs, ad.addr) if len(pr.addrs) == 0 { // all addrs have erred, dispatch dial error - pr.req.Resch <- DialResponse{Err: pr.err} + // but first do a last one check in case an acceptable connection has landed from + // a simultaneous dial that started later and added new acceptable addrs + c := s.bestAcceptableConnToPeer(pr.req.Ctx, p) + if c != nil { + pr.req.Resch <- DialResponse{Conn: c} + } else { + pr.req.Resch <- DialResponse{Err: pr.err} + } delete(requests, reqno) } } From c20de2016f79ef68d02835d4dfcc5500ca1c18f4 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 18:30:30 +0300 Subject: [PATCH 17/30] merge dial contexts where possible --- p2p/net/swarm/swarm_dial.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 2d3255b4a6..513ce1c617 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -320,6 +320,7 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial conn *Conn err error requests []int + dialed bool } reqno := 0 @@ -454,6 +455,9 @@ loop: requests[reqno] = pr for _, ad := range tojoin { + if !ad.dialed { + ad.ctx = s.mergeDialContexts(ad.ctx, req.Ctx) + } ad.requests = append(ad.requests, reqno) } @@ -490,6 +494,7 @@ loop: continue } + ad.dialed = true dialed = true last = i active++ @@ -581,6 +586,18 @@ func (s *Swarm) addrsForDial(ctx context.Context, p peer.ID) ([]ma.Multiaddr, er return goodAddrs, nil } +func (s *Swarm) mergeDialContexts(a, b context.Context) context.Context { + dialCtx := a + + if simConnect, reason := network.GetSimultaneousConnect(b); simConnect { + if simConnect, _ := network.GetSimultaneousConnect(a); !simConnect { + dialCtx = network.WithSimultaneousConnect(dialCtx, reason) + } + } + + return dialCtx +} + func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialResult) error { // check the dial backoff if forceDirect, _ := network.GetForceDirectDial(ctx); !forceDirect { From d2bc4f43119f9ec074b5f906bb51617fbc62e8db Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 18:55:52 +0300 Subject: [PATCH 18/30] add TestDialSimultaneousJoin test --- p2p/net/swarm/dial_test.go | 101 +++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go index c8b8bbe9b1..86390d6254 100644 --- a/p2p/net/swarm/dial_test.go +++ b/p2p/net/swarm/dial_test.go @@ -9,6 +9,7 @@ import ( addrutil "github.com/libp2p/go-addr-util" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/transport" @@ -548,5 +549,105 @@ func TestDialExistingConnection(t *testing.T) { if c1 != c2 { t.Fatal("expecting the same connection from both dials") } +} + +func newSilentListener(t *testing.T) ([]ma.Multiaddr, net.Listener) { + lst, err := net.Listen("tcp4", "localhost:0") + if err != nil { + t.Fatal(err) + } + addr, err := manet.FromNetAddr(lst.Addr()) + if err != nil { + t.Fatal(err) + } + addrs := []ma.Multiaddr{addr} + addrs, err = addrutil.ResolveUnspecifiedAddresses(addrs, nil) + if err != nil { + t.Fatal(err) + } + return addrs, lst + +} + +func TestDialSimultaneousJoin(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + swarms := makeSwarms(ctx, t, 2) + s1 := swarms[0] + s2 := swarms[1] + defer s1.Close() + defer s2.Close() + + s2silentAddrs, s2silentListener := newSilentListener(t) + go acceptAndHang(s2silentListener) + + connch := make(chan network.Conn, 512) + + // start a dial to s2 through the silent addr + go func() { + s1.Peerstore().AddAddrs(s2.LocalPeer(), s2silentAddrs, peerstore.PermanentAddrTTL) + + c, err := s1.DialPeer(ctx, s2.LocalPeer()) + if err != nil { + t.Fatal(err) + } + + t.Logf("first dial succedded; conn: %+v", c) + + connch <- c + }() + + // wait a bit for the dial to take hold + time.Sleep(100 * time.Millisecond) + + // start a second dial to s2 that uses the real s2 addrs + go func() { + s2addrs, err := s2.InterfaceListenAddresses() + if err != nil { + t.Fatal(err) + } + s1.Peerstore().AddAddrs(s2.LocalPeer(), s2addrs[:1], peerstore.PermanentAddrTTL) + + c, err := s1.DialPeer(ctx, s2.LocalPeer()) + if err != nil { + t.Fatal(err) + } + t.Logf("second dial succedded; conn: %+v", c) + + connch <- c + }() + + // wait for the second dial to finish + c2 := <-connch + + // start a third dial to s2, this should get the existing connection from the successful dial + go func() { + c, err := s1.DialPeer(ctx, s2.LocalPeer()) + if err != nil { + t.Fatal(err) + } + + t.Logf("third dial succedded; conn: %+v", c) + + connch <- c + }() + + c3 := <-connch + + if c2 != c3 { + t.Fatal("expected c2 and c3 to be the same") + } + + // next, the first dial to s2, using the silent addr should timeout; at this point the dial + // will error but the last chance check will see the existing connection and return it + select { + case c1 := <-connch: + if c1 != c2 { + t.Fatal("expected c1 and c2 to be the same") + } + case <-time.After(2 * transport.DialTimeout): + t.Fatal("no connection from first dial") + } } From 167d64587bce771cede74d9df0d07b64253fc349 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 20:04:42 +0300 Subject: [PATCH 19/30] don't add backoff if we have successfully connected for consistency with the old dialer behaviour. --- p2p/net/swarm/swarm_dial.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 513ce1c617..1a5cb1208a 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -368,7 +368,8 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial var nextDial []ma.Multiaddr active := 0 - done := false + done := false // true when the request channel has been closed + connected := false // true when a connection has been successfully established resch := make(chan dialResult) @@ -509,6 +510,10 @@ loop: case res := <-resch: active-- + if res.Conn != nil { + connected = true + } + if done && active == 0 { if res.Conn != nil { // we got an actual connection, but the dial has been cancelled @@ -556,7 +561,9 @@ loop: } // it must be an error -- add backoff if applicable and dispatch - if res.Err != context.Canceled { + if res.Err != context.Canceled && !connected { + // we only add backoff if there has not been a successful connection + // for consistency with the old dialer behavior. s.backf.AddBackoff(p, res.Addr) } From 80b33ec0b4a910f3584d7f6cf08d828ab9cf8973 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 20:14:52 +0300 Subject: [PATCH 20/30] fix TestConnectednessCorrect we might get more connections because simultaneous dials can succeed and we have both TCP and QUIC addrs by default --- p2p/net/swarm/swarm_net_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/net/swarm/swarm_net_test.go b/p2p/net/swarm/swarm_net_test.go index 2ba64edb96..64121bb1b4 100644 --- a/p2p/net/swarm/swarm_net_test.go +++ b/p2p/net/swarm/swarm_net_test.go @@ -57,8 +57,8 @@ func TestConnectednessCorrect(t *testing.T) { t.Fatal("expected net 0 to have two peers") } - if len(nets[2].Conns()) != 2 { - t.Fatal("expected net 2 to have two conns") + if len(nets[2].Peers()) != 2 { + t.Fatal("expected net 2 to have two peers") } if len(nets[1].ConnsToPeer(nets[3].LocalPeer())) != 0 { From 0af34e70f0a5010157f329a2f501baa2527035c4 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 23:04:42 +0300 Subject: [PATCH 21/30] don't store the active dial if it errors while starting the worker --- p2p/net/swarm/dial_sync.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/p2p/net/swarm/dial_sync.go b/p2p/net/swarm/dial_sync.go index 32196cb2d5..ae3578a53a 100644 --- a/p2p/net/swarm/dial_sync.go +++ b/p2p/net/swarm/dial_sync.go @@ -97,13 +97,14 @@ func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) { reqch: make(chan DialRequest), ds: ds, } - ds.dials[p] = actd err := ds.dialWorker(adctx, p, actd.reqch) if err != nil { cancel() return nil, err } + + ds.dials[p] = actd } // increase ref count before dropping dialsLk From 302743444250277bf1c4100c794dbb49e4306aa8 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 23:04:49 +0300 Subject: [PATCH 22/30] add TestSelfDial --- p2p/net/swarm/dial_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go index 86390d6254..26bf47cac1 100644 --- a/p2p/net/swarm/dial_test.go +++ b/p2p/net/swarm/dial_test.go @@ -651,3 +651,23 @@ func TestDialSimultaneousJoin(t *testing.T) { t.Fatal("no connection from first dial") } } + +func TestDialSelf(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + swarms := makeSwarms(ctx, t, 2) + s1 := swarms[0] + defer s1.Close() + + _, err := s1.DialPeer(ctx, s1.LocalPeer()) + if err != ErrDialToSelf { + t.Fatal("expected error from self dial") + } + + // do it twice to make sure we get a new active dial object that fails again + _, err = s1.DialPeer(ctx, s1.LocalPeer()) + if err != ErrDialToSelf { + t.Fatal("expected error from self dial") + } +} From 7db6ec9ea181fd49d18ff7c1ff9d9cf5fc179a8a Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 12:13:03 +0300 Subject: [PATCH 23/30] make DialRequest and DialResponse private --- p2p/net/swarm/dial_sync.go | 14 +++++------ p2p/net/swarm/dial_sync_test.go | 20 +++++++--------- p2p/net/swarm/swarm_dial.go | 42 ++++++++++++++++----------------- 3 files changed, 37 insertions(+), 39 deletions(-) diff --git a/p2p/net/swarm/dial_sync.go b/p2p/net/swarm/dial_sync.go index ae3578a53a..24781dd19d 100644 --- a/p2p/net/swarm/dial_sync.go +++ b/p2p/net/swarm/dial_sync.go @@ -12,8 +12,8 @@ import ( // TODO: change this text when we fix the bug var errDialCanceled = errors.New("dial was aborted internally, likely due to https://git.io/Je2wW") -// DialFunc is the type of function expected by DialSync. -type DialWorkerFunc func(context.Context, peer.ID, <-chan DialRequest) error +// DialWorerFunc is used by DialSync to spawn a new dial worker +type DialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) error // NewDialSync constructs a new DialSync func NewDialSync(worker DialWorkerFunc) *DialSync { @@ -38,7 +38,7 @@ type activeDial struct { ctx context.Context cancel func() - reqch chan DialRequest + reqch chan dialRequest ds *DialSync } @@ -64,16 +64,16 @@ func (ad *activeDial) dial(ctx context.Context, p peer.ID) (*Conn, error) { dialCtx = network.WithSimultaneousConnect(dialCtx, reason) } - resch := make(chan DialResponse, 1) + resch := make(chan dialResponse, 1) select { - case ad.reqch <- DialRequest{Ctx: dialCtx, Resch: resch}: + case ad.reqch <- dialRequest{ctx: dialCtx, resch: resch}: case <-ctx.Done(): return nil, ctx.Err() } select { case res := <-resch: - return res.Conn, res.Err + return res.conn, res.err case <-ctx.Done(): return nil, ctx.Err() } @@ -94,7 +94,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) { id: p, ctx: adctx, cancel: cancel, - reqch: make(chan DialRequest), + reqch: make(chan dialRequest), ds: ds, } diff --git a/p2p/net/swarm/dial_sync_test.go b/p2p/net/swarm/dial_sync_test.go index f1a9f8a539..e414dd529e 100644 --- a/p2p/net/swarm/dial_sync_test.go +++ b/p2p/net/swarm/dial_sync_test.go @@ -1,4 +1,4 @@ -package swarm_test +package swarm import ( "context" @@ -7,8 +7,6 @@ import ( "testing" "time" - . "github.com/libp2p/go-libp2p-swarm" - "github.com/libp2p/go-libp2p-core/peer" ) @@ -16,7 +14,7 @@ func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{} dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care dialctx, cancel := context.WithCancel(context.Background()) ch := make(chan struct{}) - f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { + f := func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { dfcalls <- struct{}{} go func() { defer cancel() @@ -29,9 +27,9 @@ func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{} select { case <-ch: - req.Resch <- DialResponse{Conn: new(Conn)} + req.resch <- dialResponse{conn: new(Conn)} case <-ctx.Done(): - req.Resch <- DialResponse{Err: ctx.Err()} + req.resch <- dialResponse{err: ctx.Err()} return } case <-ctx.Done(): @@ -189,7 +187,7 @@ func TestDialSyncAllCancel(t *testing.T) { func TestFailFirst(t *testing.T) { var count int - f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { + f := func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { go func() { for { select { @@ -199,9 +197,9 @@ func TestFailFirst(t *testing.T) { } if count > 0 { - req.Resch <- DialResponse{Conn: new(Conn)} + req.resch <- dialResponse{conn: new(Conn)} } else { - req.Resch <- DialResponse{Err: fmt.Errorf("gophers ate the modem")} + req.resch <- dialResponse{err: fmt.Errorf("gophers ate the modem")} } count++ @@ -236,7 +234,7 @@ func TestFailFirst(t *testing.T) { } func TestStressActiveDial(t *testing.T) { - ds := NewDialSync(func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { + ds := NewDialSync(func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { go func() { for { select { @@ -245,7 +243,7 @@ func TestStressActiveDial(t *testing.T) { return } - req.Resch <- DialResponse{} + req.resch <- dialResponse{} case <-ctx.Done(): return } diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 1a5cb1208a..ab95a46127 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -285,18 +285,18 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { // TODO explain how all this works ////////////////////////////////////////////////////////////////////////////////// -type DialRequest struct { - Ctx context.Context - Resch chan DialResponse +type dialRequest struct { + ctx context.Context + resch chan dialResponse } -type DialResponse struct { - Conn *Conn - Err error +type dialResponse struct { + conn *Conn + err error } // dialWorker is an active dial goroutine that synchronizes and executes concurrent dials -func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { +func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { if p == s.local { return ErrDialToSelf } @@ -305,11 +305,11 @@ func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequ return nil } -func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { +func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dialRequest) { defer s.limiter.clearAllPeerDials(p) type pendRequest struct { - req DialRequest // the original request + req dialRequest // the original request err *DialError // dial error accumulator addrs map[ma.Multiaddr]struct{} // pending addr dials } @@ -344,11 +344,11 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial // all addrs have erred, dispatch dial error // but first do a last one check in case an acceptable connection has landed from // a simultaneous dial that started later and added new acceptable addrs - c := s.bestAcceptableConnToPeer(pr.req.Ctx, p) + c := s.bestAcceptableConnToPeer(pr.req.ctx, p) if c != nil { - pr.req.Resch <- DialResponse{Conn: c} + pr.req.resch <- dialResponse{conn: c} } else { - pr.req.Resch <- DialResponse{Err: pr.err} + pr.req.resch <- dialResponse{err: pr.err} } delete(requests, reqno) } @@ -390,15 +390,15 @@ loop: return } - c := s.bestAcceptableConnToPeer(req.Ctx, p) + c := s.bestAcceptableConnToPeer(req.ctx, p) if c != nil { - req.Resch <- DialResponse{Conn: c} + req.resch <- dialResponse{conn: c} continue loop } - addrs, err := s.addrsForDial(req.Ctx, p) + addrs, err := s.addrsForDial(req.ctx, p) if err != nil { - req.Resch <- DialResponse{Err: err} + req.resch <- dialResponse{err: err} continue loop } @@ -430,7 +430,7 @@ loop: if ad.conn != nil { // dial to this addr was successful, complete the request - req.Resch <- DialResponse{Conn: ad.conn} + req.resch <- dialResponse{conn: ad.conn} continue loop } @@ -447,7 +447,7 @@ loop: if len(todial) == 0 && len(tojoin) == 0 { // all request applicable addrs have been dialed, we must have errored - req.Resch <- DialResponse{Err: pr.err} + req.resch <- dialResponse{err: pr.err} continue loop } @@ -457,14 +457,14 @@ loop: for _, ad := range tojoin { if !ad.dialed { - ad.ctx = s.mergeDialContexts(ad.ctx, req.Ctx) + ad.ctx = s.mergeDialContexts(ad.ctx, req.ctx) } ad.requests = append(ad.requests, reqno) } if len(todial) > 0 { for _, a := range todial { - pending[a] = &addrDial{addr: a, ctx: req.Ctx, requests: []int{reqno}} + pending[a] = &addrDial{addr: a, ctx: req.ctx, requests: []int{reqno}} } nextDial = append(nextDial, todial...) @@ -550,7 +550,7 @@ loop: continue } - pr.req.Resch <- DialResponse{Conn: conn} + pr.req.resch <- dialResponse{conn: conn} delete(requests, reqno) } From c55e86c4d210bc2802d6694d1da9e255cc3906e9 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 12:16:06 +0300 Subject: [PATCH 24/30] add comment about the necessity of removing the address tracking when a dial backoff occurs --- p2p/net/swarm/swarm_dial.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index ab95a46127..29e3a8d70c 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -356,7 +356,12 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dial ad.requests = nil - // if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests + // if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests. + // this is necessary to support active listen scenarios, where a new dial comes in while + // another dial is in progress, and needs to do a direct connection without inhibitions from + // dial backoff. + // it is also necessary to preserve consisent behaviour with the old dialer -- TestDialBackoff + // regresses without this. if err == ErrDialBackoff { delete(pending, ad.addr) } From ace5e258c941df093bcad4ed576b53d55e383c70 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 12:20:25 +0300 Subject: [PATCH 25/30] remove dial batching --- p2p/net/swarm/swarm_dial.go | 47 +++---------------------------------- 1 file changed, 3 insertions(+), 44 deletions(-) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 29e3a8d70c..c656df5a26 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -480,37 +480,17 @@ loop: } case <-triggerDial: - // we dial batches of addresses together, logically belonging to the same batch - // after a batch of addresses has been dialed, we add a delay before initiating the next batch - dialed := false - last := 0 - next := 0 - for i, addr := range nextDial { - if dialed && !s.sameAddrBatch(nextDial[last], addr) { - break - } - - next = i + 1 - + for _, addr := range nextDial { // spawn the dial ad := pending[addr] err := s.dialNextAddr(ad.ctx, p, addr, resch) if err != nil { dispatchError(ad, err) - continue } - - ad.dialed = true - dialed = true - last = i - active++ } - nextDial = nextDial[next:] - if !dialed || len(nextDial) == 0 { - // we didn't dial anything because of backoff or we don't have any more addresses - triggerDial = nil - } + nextDial = nil + triggerDial = nil case res := <-resch: active-- @@ -624,27 +604,6 @@ func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, return nil } -func (s *Swarm) sameAddrBatch(a, b ma.Multiaddr) bool { - // is it a relay addr? - if s.IsRelayAddr(a) { - return s.IsRelayAddr(b) - } - - // is it an expensive addr? - if s.IsExpensiveAddr(a) { - return s.IsExpensiveAddr(b) - } - - // is it a public addr? - if !manet.IsPrivateAddr(a) { - return !manet.IsPrivateAddr(b) && - s.IsFdConsumingAddr(a) == s.IsFdConsumingAddr(b) - } - - // it's a private addr - return manet.IsPrivateAddr(b) -} - func (s *Swarm) canDial(addr ma.Multiaddr) bool { t := s.TransportForDialing(addr) return t != nil && t.CanDial(addr) From c2e44d4b3d6b116577d08d6d813b25c9c6579225 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 12:41:58 +0300 Subject: [PATCH 26/30] add new TestDialSelf so that we exercise the dialWorker dial to self error path --- p2p/net/swarm/dial_sync_test.go | 23 +++++++++++++++++++++++ p2p/net/swarm/dial_test.go | 8 +------- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/p2p/net/swarm/dial_sync_test.go b/p2p/net/swarm/dial_sync_test.go index e414dd529e..e5a7da69eb 100644 --- a/p2p/net/swarm/dial_sync_test.go +++ b/p2p/net/swarm/dial_sync_test.go @@ -270,3 +270,26 @@ func TestStressActiveDial(t *testing.T) { wg.Wait() } + +func TestDialSelf(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + self := peer.ID("ABC") + s := NewSwarm(ctx, self, nil, nil) + defer s.Close() + + ds := NewDialSync(s.dialWorker) + + // this should fail + _, err := ds.DialLock(ctx, self) + if err != ErrDialToSelf { + t.Fatal("expected error from self dial") + } + + // do it twice to make sure we get a new active dial object that fails again + _, err = ds.DialLock(ctx, self) + if err != ErrDialToSelf { + t.Fatal("expected error from self dial") + } +} diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go index 26bf47cac1..2a966a4662 100644 --- a/p2p/net/swarm/dial_test.go +++ b/p2p/net/swarm/dial_test.go @@ -652,7 +652,7 @@ func TestDialSimultaneousJoin(t *testing.T) { } } -func TestDialSelf(t *testing.T) { +func TestDialSelf2(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -664,10 +664,4 @@ func TestDialSelf(t *testing.T) { if err != ErrDialToSelf { t.Fatal("expected error from self dial") } - - // do it twice to make sure we get a new active dial object that fails again - _, err = s1.DialPeer(ctx, s1.LocalPeer()) - if err != ErrDialToSelf { - t.Fatal("expected error from self dial") - } } From c7e4304bd428954e742b622582d0fa9bee7c6e46 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 15:36:55 +0300 Subject: [PATCH 27/30] make DialWorkerFunc, NewDialSync private they work with private data types, so there is no point in having them public --- p2p/net/swarm/dial_sync.go | 8 ++++---- p2p/net/swarm/dial_sync_test.go | 18 ++++++++---------- p2p/net/swarm/swarm.go | 2 +- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/p2p/net/swarm/dial_sync.go b/p2p/net/swarm/dial_sync.go index 24781dd19d..3179016661 100644 --- a/p2p/net/swarm/dial_sync.go +++ b/p2p/net/swarm/dial_sync.go @@ -13,10 +13,10 @@ import ( var errDialCanceled = errors.New("dial was aborted internally, likely due to https://git.io/Je2wW") // DialWorerFunc is used by DialSync to spawn a new dial worker -type DialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) error +type dialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) error -// NewDialSync constructs a new DialSync -func NewDialSync(worker DialWorkerFunc) *DialSync { +// newDialSync constructs a new DialSync +func newDialSync(worker dialWorkerFunc) *DialSync { return &DialSync{ dials: make(map[peer.ID]*activeDial), dialWorker: worker, @@ -28,7 +28,7 @@ func NewDialSync(worker DialWorkerFunc) *DialSync { type DialSync struct { dials map[peer.ID]*activeDial dialsLk sync.Mutex - dialWorker DialWorkerFunc + dialWorker dialWorkerFunc } type activeDial struct { diff --git a/p2p/net/swarm/dial_sync_test.go b/p2p/net/swarm/dial_sync_test.go index e5a7da69eb..59ace9ae67 100644 --- a/p2p/net/swarm/dial_sync_test.go +++ b/p2p/net/swarm/dial_sync_test.go @@ -10,7 +10,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{}) { +func getMockDialFunc() (dialWorkerFunc, func(), context.Context, <-chan struct{}) { dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care dialctx, cancel := context.WithCancel(context.Background()) ch := make(chan struct{}) @@ -48,7 +48,7 @@ func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{} func TestBasicDialSync(t *testing.T) { df, done, _, callsch := getMockDialFunc() - dsync := NewDialSync(df) + dsync := newDialSync(df) p := peer.ID("testpeer") @@ -86,7 +86,7 @@ func TestBasicDialSync(t *testing.T) { func TestDialSyncCancel(t *testing.T) { df, done, _, dcall := getMockDialFunc() - dsync := NewDialSync(df) + dsync := newDialSync(df) p := peer.ID("testpeer") @@ -137,7 +137,7 @@ func TestDialSyncCancel(t *testing.T) { func TestDialSyncAllCancel(t *testing.T) { df, done, dctx, _ := getMockDialFunc() - dsync := NewDialSync(df) + dsync := newDialSync(df) p := peer.ID("testpeer") @@ -211,7 +211,7 @@ func TestFailFirst(t *testing.T) { return nil } - ds := NewDialSync(f) + ds := newDialSync(f) p := peer.ID("testing") @@ -234,7 +234,7 @@ func TestFailFirst(t *testing.T) { } func TestStressActiveDial(t *testing.T) { - ds := NewDialSync(func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { + ds := newDialSync(func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { go func() { for { select { @@ -279,16 +279,14 @@ func TestDialSelf(t *testing.T) { s := NewSwarm(ctx, self, nil, nil) defer s.Close() - ds := NewDialSync(s.dialWorker) - // this should fail - _, err := ds.DialLock(ctx, self) + _, err := s.dsync.DialLock(ctx, self) if err != ErrDialToSelf { t.Fatal("expected error from self dial") } // do it twice to make sure we get a new active dial object that fails again - _, err = ds.DialLock(ctx, self) + _, err = s.dsync.DialLock(ctx, self) if err != ErrDialToSelf { t.Fatal("expected error from self dial") } diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index c57c563c25..95b164d13c 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -121,7 +121,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc } } - s.dsync = NewDialSync(s.dialWorker) + s.dsync = newDialSync(s.dialWorker) s.limiter = newDialLimiter(s.dialAddr, s.IsFdConsumingAddr) s.proc = goprocessctx.WithContext(ctx) s.ctx = goprocessctx.OnClosingContext(s.proc) From bc47c2f657d3b983179a59a76702717e405e91de Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 23:11:25 +0300 Subject: [PATCH 28/30] rename dialWorker to startDialWorker --- p2p/net/swarm/swarm.go | 2 +- p2p/net/swarm/swarm_dial.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 95b164d13c..1abe12273e 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -121,7 +121,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc } } - s.dsync = newDialSync(s.dialWorker) + s.dsync = newDialSync(s.startDialWorker) s.limiter = newDialLimiter(s.dialAddr, s.IsFdConsumingAddr) s.proc = goprocessctx.WithContext(ctx) s.ctx = goprocessctx.OnClosingContext(s.proc) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index c656df5a26..d60c9f39bb 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -295,8 +295,8 @@ type dialResponse struct { err error } -// dialWorker is an active dial goroutine that synchronizes and executes concurrent dials -func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { +// startDialWorker starts an active dial goroutine that synchronizes and executes concurrent dials +func (s *Swarm) startDialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { if p == s.local { return ErrDialToSelf } From 4f223e98dfc9861e3b449ef17ee7a0eda4c7b349 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 23:14:59 +0300 Subject: [PATCH 29/30] make addr utility funcs standalone and not exported --- p2p/net/swarm/swarm_dial.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index d60c9f39bb..3cc3275ef8 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -621,10 +621,10 @@ func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool { // UDP > TCP func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { addrTier := func(a ma.Multiaddr) (tier int) { - if s.IsRelayAddr(a) { + if isRelayAddr(a) { tier |= 0b1000 } - if s.IsExpensiveAddr(a) { + if isExpensiveAddr(a) { tier |= 0b0100 } if !manet.IsPrivateAddr(a) { @@ -741,13 +741,13 @@ func (s *Swarm) IsFdConsumingAddr(addr ma.Multiaddr) bool { return err1 == nil || err2 == nil } -func (s *Swarm) IsExpensiveAddr(addr ma.Multiaddr) bool { +func isExpensiveAddr(addr ma.Multiaddr) bool { _, err1 := addr.ValueForProtocol(ma.P_WS) _, err2 := addr.ValueForProtocol(ma.P_WSS) return err1 == nil || err2 == nil } -func (s *Swarm) IsRelayAddr(addr ma.Multiaddr) bool { +func isRelayAddr(addr ma.Multiaddr) bool { _, err := addr.ValueForProtocol(ma.P_CIRCUIT) return err == nil } From 78037d70cb2c5d1205bc07f4a3efa1c6e7d6a7fe Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 23:31:29 +0300 Subject: [PATCH 30/30] make IsFdConsumingAddr a standalone utility func --- p2p/net/swarm/swarm.go | 2 +- p2p/net/swarm/swarm_dial.go | 4 +-- p2p/net/swarm/swarm_test.go | 48 --------------------------------- p2p/net/swarm/util_test.go | 53 +++++++++++++++++++++++++++++++++++++ 4 files changed, 56 insertions(+), 51 deletions(-) create mode 100644 p2p/net/swarm/util_test.go diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 1abe12273e..00c43f7a6f 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -122,7 +122,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc } s.dsync = newDialSync(s.startDialWorker) - s.limiter = newDialLimiter(s.dialAddr, s.IsFdConsumingAddr) + s.limiter = newDialLimiter(s.dialAddr, isFdConsumingAddr) s.proc = goprocessctx.WithContext(ctx) s.ctx = goprocessctx.OnClosingContext(s.proc) s.backf.init(s.ctx) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 3cc3275ef8..14129257be 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -630,7 +630,7 @@ func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { if !manet.IsPrivateAddr(a) { tier |= 0b0010 } - if s.IsFdConsumingAddr(a) { + if isFdConsumingAddr(a) { tier |= 0b0001 } @@ -726,7 +726,7 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra // A Non-circuit address which has the TCP/UNIX protocol is deemed FD consuming. // For a circuit-relay address, we look at the address of the relay server/proxy // and use the same logic as above to decide. -func (s *Swarm) IsFdConsumingAddr(addr ma.Multiaddr) bool { +func isFdConsumingAddr(addr ma.Multiaddr) bool { first, _ := ma.SplitFunc(addr, func(c ma.Component) bool { return c.Protocol().Code == ma.P_CIRCUIT }) diff --git a/p2p/net/swarm/swarm_test.go b/p2p/net/swarm/swarm_test.go index 9b1e9c42d7..4e9801ad08 100644 --- a/p2p/net/swarm/swarm_test.go +++ b/p2p/net/swarm/swarm_test.go @@ -15,7 +15,6 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" - "github.com/libp2p/go-libp2p-core/test" . "github.com/libp2p/go-libp2p-swarm" . "github.com/libp2p/go-libp2p-swarm/testing" @@ -387,53 +386,6 @@ func TestConnectionGating(t *testing.T) { } } -func TestIsFdConsuming(t *testing.T) { - tcs := map[string]struct { - addr string - isFdConsuming bool - }{ - "tcp": { - addr: "/ip4/127.0.0.1/tcp/20", - isFdConsuming: true, - }, - "quic": { - addr: "/ip4/127.0.0.1/udp/0/quic", - isFdConsuming: false, - }, - "addr-without-registered-transport": { - addr: "/ip4/127.0.0.1/tcp/20/ws", - isFdConsuming: true, - }, - "relay-tcp": { - addr: fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), - isFdConsuming: true, - }, - "relay-quic": { - addr: fmt.Sprintf("/ip4/127.0.0.1/udp/20/quic/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), - isFdConsuming: false, - }, - "relay-without-serveraddr": { - addr: fmt.Sprintf("/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), - isFdConsuming: true, - }, - "relay-without-registered-transport-server": { - addr: fmt.Sprintf("/ip4/127.0.0.1/tcp/20/ws/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), - isFdConsuming: true, - }, - } - - ctx := context.Background() - sw := GenSwarm(t, ctx) - sk := sw.Peerstore().PrivKey(sw.LocalPeer()) - require.NotNil(t, sk) - - for name := range tcs { - maddr, err := ma.NewMultiaddr(tcs[name].addr) - require.NoError(t, err, name) - require.Equal(t, tcs[name].isFdConsuming, sw.IsFdConsumingAddr(maddr), name) - } -} - func TestNoDial(t *testing.T) { ctx := context.Background() swarms := makeSwarms(ctx, t, 2) diff --git a/p2p/net/swarm/util_test.go b/p2p/net/swarm/util_test.go new file mode 100644 index 0000000000..11124adb27 --- /dev/null +++ b/p2p/net/swarm/util_test.go @@ -0,0 +1,53 @@ +package swarm + +import ( + "fmt" + "testing" + + "github.com/libp2p/go-libp2p-core/test" + ma "github.com/multiformats/go-multiaddr" + + "github.com/stretchr/testify/require" +) + +func TestIsFdConsuming(t *testing.T) { + tcs := map[string]struct { + addr string + isFdConsuming bool + }{ + "tcp": { + addr: "/ip4/127.0.0.1/tcp/20", + isFdConsuming: true, + }, + "quic": { + addr: "/ip4/127.0.0.1/udp/0/quic", + isFdConsuming: false, + }, + "addr-without-registered-transport": { + addr: "/ip4/127.0.0.1/tcp/20/ws", + isFdConsuming: true, + }, + "relay-tcp": { + addr: fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), + isFdConsuming: true, + }, + "relay-quic": { + addr: fmt.Sprintf("/ip4/127.0.0.1/udp/20/quic/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), + isFdConsuming: false, + }, + "relay-without-serveraddr": { + addr: fmt.Sprintf("/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), + isFdConsuming: true, + }, + "relay-without-registered-transport-server": { + addr: fmt.Sprintf("/ip4/127.0.0.1/tcp/20/ws/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), + isFdConsuming: true, + }, + } + + for name := range tcs { + maddr, err := ma.NewMultiaddr(tcs[name].addr) + require.NoError(t, err, name) + require.Equal(t, tcs[name].isFdConsuming, isFdConsumingAddr(maddr), name) + } +}