Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autonat: don't change status on dial request refused #2225

Merged
merged 6 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 38 additions & 32 deletions p2p/host/autonat/autonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type AmbientAutoNAT struct {
ctxCancel context.CancelFunc // is closed when Close is called
backgroundRunning chan struct{} // is closed when the background go routine exits

inboundConn chan network.Conn
observations chan network.Reachability
inboundConn chan network.Conn
dialResponses chan error
// status is an autoNATResult reflecting current status.
status atomic.Pointer[network.Reachability]
// Reflects the confidence on of the NATStatus being private, as a single
Expand Down Expand Up @@ -107,7 +107,7 @@ func New(h host.Host, options ...Option) (AutoNAT, error) {
host: h,
config: conf,
inboundConn: make(chan network.Conn, 5),
observations: make(chan network.Reachability, 1),
dialResponses: make(chan error, 1),

emitReachabilityChanged: emitReachabilityChanged,
service: service,
Expand Down Expand Up @@ -168,6 +168,7 @@ func (as *AmbientAutoNAT) background() {
timer := time.NewTimer(delay)
defer timer.Stop()
timerRunning := true
retryProbe := false
for {
select {
// new inbound connection.
Expand Down Expand Up @@ -198,15 +199,20 @@ func (as *AmbientAutoNAT) background() {
}

// probe finished.
case result, ok := <-as.observations:
case err, ok := <-as.dialResponses:
if !ok {
return
}
as.recordObservation(result)
if IsDialRefused(err) {
retryProbe = true
} else {
as.handleDialResponse(err)
}
case <-timer.C:
peer := as.getPeerToProbe()
as.tryProbe(peer)
timerRunning = false
retryProbe = false
case <-as.ctx.Done():
return
}
Expand All @@ -215,7 +221,7 @@ func (as *AmbientAutoNAT) background() {
if timerRunning && !timer.Stop() {
<-timer.C
}
timer.Reset(as.scheduleProbe())
timer.Reset(as.scheduleProbe(retryProbe))
timerRunning = true
}
}
Expand All @@ -230,10 +236,11 @@ func (as *AmbientAutoNAT) cleanupRecentProbes() {
}

// scheduleProbe calculates when the next probe should be scheduled for.
func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
func (as *AmbientAutoNAT) scheduleProbe(retryProbe bool) time.Duration {
// Our baseline is a probe every 'AutoNATRefreshInterval'
// This is modulated by:
// * if we are in an unknown state, or have low confidence, that should drop to 'AutoNATRetryInterval'
// * if we are in an unknown state, have low confidence, or we want to retry because a probe was refused that
// should drop to 'AutoNATRetryInterval'
// * recent inbound connections (implying continued connectivity) should decrease the retry when public
// * recent inbound connections when not public mean we should try more actively to see if we're public.
fixedNow := time.Now()
Expand All @@ -249,7 +256,9 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
}
if !as.lastProbe.IsZero() {
untilNext := as.config.refreshInterval
if currentStatus == network.ReachabilityUnknown {
if retryProbe {
untilNext = as.config.retryInterval
} else if currentStatus == network.ReachabilityUnknown {
untilNext = as.config.retryInterval
} else if as.confidence < maxConfidence {
untilNext = as.config.retryInterval
Expand All @@ -269,8 +278,24 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
return nextProbe.Sub(fixedNow)
}

// Update the current status based on an observed result.
// handleDialResponse updates the current status based on dial response.
func (as *AmbientAutoNAT) handleDialResponse(dialErr error) {
var observation network.Reachability
switch {
case dialErr == nil:
observation = network.ReachabilityPublic
case IsDialError(dialErr):
observation = network.ReachabilityPrivate
default:
observation = network.ReachabilityUnknown
}

as.recordObservation(observation)
}

// recordObservation updates NAT status and confidence
func (as *AmbientAutoNAT) recordObservation(observation network.Reachability) {

currentStatus := *as.status.Load()

if observation == network.ReachabilityPublic {
Expand Down Expand Up @@ -359,21 +384,10 @@ func (as *AmbientAutoNAT) probe(pi *peer.AddrInfo) {
defer cancel()

err := cli.DialBack(ctx, pi.ID)

var result network.Reachability
switch {
case err == nil:
log.Debugf("Dialback through %s successful", pi.ID.Pretty())
result = network.ReachabilityPublic
case IsDialError(err):
log.Debugf("Dialback through %s failed", pi.ID.Pretty())
result = network.ReachabilityPrivate
default:
result = network.ReachabilityUnknown
}
log.Debugf("Dialback through peer %s completed: err: %s", pi.ID, err)

select {
case as.observations <- result:
case as.dialResponses <- err:
case <-as.ctx.Done():
return
}
Expand Down Expand Up @@ -411,8 +425,7 @@ func (as *AmbientAutoNAT) getPeerToProbe() peer.ID {
return ""
}

shufflePeers(candidates)
return candidates[0]
return candidates[rand.Intn(len(candidates))]
sukunrt marked this conversation as resolved.
Show resolved Hide resolved
}

func (as *AmbientAutoNAT) Close() error {
Expand All @@ -424,13 +437,6 @@ func (as *AmbientAutoNAT) Close() error {
return nil
}

func shufflePeers(peers []peer.ID) {
for i := range peers {
j := rand.Intn(i + 1)
peers[i], peers[j] = peers[j], peers[i]
}
}

// Status returns the AutoNAT observed reachability status.
func (s *StaticAutoNAT) Status() network.Reachability {
return s.reachability
Expand Down
55 changes: 54 additions & 1 deletion p2p/host/autonat/autonat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,29 @@ func sayPrivateStreamHandler(t *testing.T) network.StreamHandler {
}
}

func makeAutoNATRefuseDialRequest(t *testing.T) host.Host {
h := bhost.NewBlankHost(swarmt.GenSwarm(t))
h.SetStreamHandler(AutoNATProto, sayRefusedStreamHandler(t))
return h
}

func sayRefusedStreamHandler(t *testing.T) network.StreamHandler {
return func(s network.Stream) {
defer s.Close()
r := pbio.NewDelimitedReader(s, network.MessageSizeMax)
if err := r.ReadMsg(&pb.Message{}); err != nil {
t.Error(err)
return
}
w := pbio.NewDelimitedWriter(s)
res := pb.Message{
Type: pb.Message_DIAL_RESPONSE.Enum(),
DialResponse: newDialResponseError(pb.Message_E_DIAL_REFUSED, "dial refused"),
}
w.WriteMsg(&res)
}
}

func makeAutoNATServicePublic(t *testing.T) host.Host {
h := bhost.NewBlankHost(swarmt.GenSwarm(t))
h.SetStreamHandler(AutoNATProto, func(s network.Stream) {
Expand Down Expand Up @@ -154,7 +177,7 @@ func TestAutoNATPublictoPrivate(t *testing.T) {
// subscribe to AutoNat events
s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{})
if err != nil {
t.Fatalf("failed to subscribe to event EvtLocalRoutabilityPublic, err=%s", err)
t.Fatalf("failed to subscribe to event EvtLocalReachabilityChanged, err=%s", err)
}

if status := an.Status(); status != network.ReachabilityUnknown {
Expand Down Expand Up @@ -195,6 +218,36 @@ func TestAutoNATIncomingEvents(t *testing.T) {
}, 500*time.Millisecond, 10*time.Millisecond, "Expected probe due to identification of autonat service")
}

func TestAutoNATDialRefused(t *testing.T) {
hs := makeAutoNATServicePublic(t)
defer hs.Close()
hc, an := makeAutoNAT(t, hs)
defer hc.Close()
defer an.Close()

// subscribe to AutoNat events
s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{})
if err != nil {
t.Fatalf("failed to subscribe to event EvtLocalReachabilityChanged, err=%s", err)
}

if status := an.Status(); status != network.ReachabilityUnknown {
t.Fatalf("unexpected NAT status: %d", status)
}

connect(t, hs, hc)
expectEvent(t, s, network.ReachabilityPublic, 10*time.Second)

hs.SetStreamHandler(AutoNATProto, sayRefusedStreamHandler(t))
hps := makeAutoNATRefuseDialRequest(t)
connect(t, hps, hc)
identifyAsServer(hps, hc)

require.Never(t, func() bool {
return an.Status() != network.ReachabilityPublic
}, 3*time.Second, 1*time.Second, "Expected probe to not change reachability from public")
}

func TestAutoNATObservationRecording(t *testing.T) {
hs := makeAutoNATServicePublic(t)
defer hs.Close()
Expand Down