Skip to content

Commit

Permalink
cmd/go: add a GODEBUG to limit the number of concurrent network conne…
Browse files Browse the repository at this point in the history
…ctions

I implemented this in order to debug connection failures on a
new-to-me VM development environment that uses Cloud NAT. It doesn't
directly fix the bug, but perhaps folks will find it useful to
diagnose port-exhaustion-related flakiness in other environments.

For #52545.

Change-Id: Icd3f13dcf62e718560c4f4a965a4df7c1bd785ce
Reviewed-on: https://go-review.googlesource.com/c/go/+/473277
Run-TryBot: Bryan Mills <bcmills@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Michael Matloob <matloob@golang.org>
Auto-Submit: Bryan Mills <bcmills@google.com>
  • Loading branch information
Bryan C. Mills authored and gopherbot committed May 19, 2023
1 parent 8f1031d commit d0c72c2
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 11 deletions.
62 changes: 54 additions & 8 deletions src/cmd/go/go_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"
"io/fs"
"log"
"math"
"os"
"os/exec"
"path/filepath"
Expand All @@ -29,6 +30,7 @@ import (
"testing"
"time"

"cmd/go/internal/base"
"cmd/go/internal/cache"
"cmd/go/internal/cfg"
"cmd/go/internal/robustio"
Expand Down Expand Up @@ -61,6 +63,12 @@ var (
cgoEnabled string // raw value from 'go env CGO_ENABLED'
)

// netTestSem is a semaphore limiting the number of tests that may use the
// external network in parallel. If non-nil, it contains one buffer slot per
// test (send to acquire), with a low enough limit that the overall number of
// connections (summed across subprocesses) stays at or below base.NetLimit.
var netTestSem chan struct{}

var exeSuffix string = func() string {
if runtime.GOOS == "windows" {
return ".exe"
Expand Down Expand Up @@ -282,6 +290,17 @@ func TestMain(m *testing.M) {
}
}

if n, limited := base.NetLimit(); limited && n > 0 {
// Split the network limit into chunks, so that each parallel script can
// have one chunk. We want to run as many parallel scripts as possible, but
// also want to give each script as high a limit as possible.
// We arbitrarily split by sqrt(n) to try to balance those two goals.
netTestLimit := int(math.Sqrt(float64(n)))
netTestSem = make(chan struct{}, netTestLimit)
reducedLimit := fmt.Sprintf(",%s=%d", base.NetLimitGodebug.Name(), n/netTestLimit)
os.Setenv("GODEBUG", os.Getenv("GODEBUG")+reducedLimit)
}

// Don't let these environment variables confuse the test.
os.Setenv("GOENV", "off")
os.Unsetenv("GOFLAGS")
Expand Down Expand Up @@ -369,6 +388,7 @@ type testgoData struct {
tempdir string
ran bool
inParallel bool
hasNet bool
stdout, stderr bytes.Buffer
execDir string // dir for tg.run
}
Expand Down Expand Up @@ -411,6 +431,9 @@ func (tg *testgoData) parallel() {
if tg.ran {
tg.t.Fatal("internal testsuite error: call to parallel after run")
}
if tg.hasNet {
tg.t.Fatal("internal testsuite error: call to parallel after acquireNet")
}
for _, e := range tg.env {
if strings.HasPrefix(e, "GOROOT=") || strings.HasPrefix(e, "GOPATH=") || strings.HasPrefix(e, "GOBIN=") {
val := e[strings.Index(e, "=")+1:]
Expand All @@ -423,6 +446,25 @@ func (tg *testgoData) parallel() {
tg.t.Parallel()
}

// acquireNet skips t if the network is unavailable, and otherwise acquires a
// netTestSem token for t to be released at the end of the test.
//
// t.Parallel must not be called after acquireNet.
func (tg *testgoData) acquireNet() {
tg.t.Helper()
if tg.hasNet {
return
}

testenv.MustHaveExternalNetwork(tg.t)
if netTestSem != nil {
netTestSem <- struct{}{}
tg.t.Cleanup(func() { <-netTestSem })
}
tg.setenv("TESTGONETWORK", "")
tg.hasNet = true
}

// pwd returns the current directory.
func (tg *testgoData) pwd() string {
tg.t.Helper()
Expand All @@ -444,9 +486,6 @@ func (tg *testgoData) sleep() {
// command.
func (tg *testgoData) setenv(name, val string) {
tg.t.Helper()
if tg.inParallel && (name == "GOROOT" || name == "GOPATH" || name == "GOBIN") && (strings.HasPrefix(val, "testdata") || strings.HasPrefix(val, "./testdata")) {
tg.t.Fatalf("internal testsuite error: call to setenv with testdata (%s=%s) after parallel", name, val)
}
tg.unsetenv(name)
tg.env = append(tg.env, name+"="+val)
}
Expand All @@ -455,7 +494,10 @@ func (tg *testgoData) setenv(name, val string) {
func (tg *testgoData) unsetenv(name string) {
if tg.env == nil {
tg.env = append([]string(nil), os.Environ()...)
tg.env = append(tg.env, "GO111MODULE=off")
tg.env = append(tg.env, "GO111MODULE=off", "TESTGONETWORK=panic")
if testing.Short() {
tg.env = append(tg.env, "TESTGOVCS=panic")
}
}
for i, v := range tg.env {
if strings.HasPrefix(v, name+"=") {
Expand Down Expand Up @@ -1012,12 +1054,13 @@ func TestNewReleaseRebuildsStalePackagesInGOPATH(t *testing.T) {

// cmd/go: custom import path checking should not apply to Go packages without import comment.
func TestIssue10952(t *testing.T) {
testenv.MustHaveExternalNetwork(t)
testenv.MustHaveExecPath(t, "git")

tg := testgo(t)
defer tg.cleanup()
tg.parallel()
tg.acquireNet()

tg.tempDir("src")
tg.setenv("GOPATH", tg.path("."))
const importPath = "github.com/zombiezen/go-get-issue-10952"
Expand All @@ -1029,12 +1072,13 @@ func TestIssue10952(t *testing.T) {

// Test git clone URL that uses SCP-like syntax and custom import path checking.
func TestIssue11457(t *testing.T) {
testenv.MustHaveExternalNetwork(t)
testenv.MustHaveExecPath(t, "git")

tg := testgo(t)
defer tg.cleanup()
tg.parallel()
tg.acquireNet()

tg.tempDir("src")
tg.setenv("GOPATH", tg.path("."))
const importPath = "rsc.io/go-get-issue-11457"
Expand All @@ -1054,12 +1098,13 @@ func TestIssue11457(t *testing.T) {
}

func TestGetGitDefaultBranch(t *testing.T) {
testenv.MustHaveExternalNetwork(t)
testenv.MustHaveExecPath(t, "git")

tg := testgo(t)
defer tg.cleanup()
tg.parallel()
tg.acquireNet()

tg.tempDir("src")
tg.setenv("GOPATH", tg.path("."))

Expand Down Expand Up @@ -1395,12 +1440,13 @@ func TestDefaultGOPATH(t *testing.T) {
}

func TestDefaultGOPATHGet(t *testing.T) {
testenv.MustHaveExternalNetwork(t)
testenv.MustHaveExecPath(t, "git")

tg := testgo(t)
defer tg.cleanup()
tg.parallel()
tg.acquireNet()

tg.setenv("GOPATH", "")
tg.tempDir("home")
tg.setenv(homeEnvName(), tg.path("home"))
Expand Down
84 changes: 84 additions & 0 deletions src/cmd/go/internal/base/limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package base

import (
"fmt"
"internal/godebug"
"runtime"
"strconv"
"sync"
)

var NetLimitGodebug = godebug.New("#cmdgonetlimit")

// NetLimit returns the limit on concurrent network operations
// configured by GODEBUG=cmdgonetlimit, if any.
//
// A limit of 0 (indicated by 0, true) means that network operations should not
// be allowed.
func NetLimit() (int, bool) {
netLimitOnce.Do(func() {
s := NetLimitGodebug.Value()
if s == "" {
return
}

n, err := strconv.Atoi(s)
if err != nil {
Fatalf("invalid %s: %v", NetLimitGodebug.Name(), err)
}
if n < 0 {
// Treat negative values as unlimited.
return
}
netLimitSem = make(chan struct{}, n)
})

return cap(netLimitSem), netLimitSem != nil
}

// AcquireNet acquires a semaphore token for a network operation.
func AcquireNet() (release func(), err error) {
hasToken := false
if n, ok := NetLimit(); ok {
if n == 0 {
return nil, fmt.Errorf("network disabled by %v=%v", NetLimitGodebug.Name(), NetLimitGodebug.Value())
}
netLimitSem <- struct{}{}
hasToken = true
}

checker := new(netTokenChecker)
runtime.SetFinalizer(checker, (*netTokenChecker).panicUnreleased)

return func() {
if checker.released {
panic("internal error: net token released twice")
}
checker.released = true
if hasToken {
<-netLimitSem
}
runtime.SetFinalizer(checker, nil)
}, nil
}

var (
netLimitOnce sync.Once
netLimitSem chan struct{}
)

type netTokenChecker struct {
released bool
// We want to use a finalizer to check that all acquired tokens are returned,
// so we arbitrarily pad the tokens with a string to defeat the runtime's
// “tiny allocator”.
unusedAvoidTinyAllocator string
}

func (c *netTokenChecker) panicUnreleased() {
panic("internal error: net token acquired but not released")
}
23 changes: 22 additions & 1 deletion src/cmd/go/internal/modfetch/codehost/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"cmd/go/internal/base"
"cmd/go/internal/lockedfile"
"cmd/go/internal/par"
"cmd/go/internal/web"
Expand Down Expand Up @@ -241,7 +242,14 @@ func (r *gitRepo) loadRefs(ctx context.Context) (map[string]string, error) {
// The git protocol sends all known refs and ls-remote filters them on the client side,
// so we might as well record both heads and tags in one shot.
// Most of the time we only care about tags but sometimes we care about heads too.
release, err := base.AcquireNet()
if err != nil {
r.refsErr = err
return
}
out, gitErr := Run(ctx, r.dir, "git", "ls-remote", "-q", r.remote)
release()

if gitErr != nil {
if rerr, ok := gitErr.(*RunError); ok {
if bytes.Contains(rerr.Stderr, []byte("fatal: could not read Username")) {
Expand Down Expand Up @@ -531,7 +539,14 @@ func (r *gitRepo) stat(ctx context.Context, rev string) (info *RevInfo, err erro
ref = hash
refspec = hash + ":refs/dummy"
}
_, err := Run(ctx, r.dir, "git", "fetch", "-f", "--depth=1", r.remote, refspec)

release, err := base.AcquireNet()
if err != nil {
return nil, err
}
_, err = Run(ctx, r.dir, "git", "fetch", "-f", "--depth=1", r.remote, refspec)
release()

if err == nil {
return r.statLocal(ctx, rev, ref)
}
Expand Down Expand Up @@ -566,6 +581,12 @@ func (r *gitRepo) fetchRefsLocked(ctx context.Context) error {
// golang.org/issue/34266 and
// https://github.com/git/git/blob/4c86140027f4a0d2caaa3ab4bd8bfc5ce3c11c8a/transport.c#L1303-L1309.)

release, err := base.AcquireNet()
if err != nil {
return err
}
defer release()

if _, err := Run(ctx, r.dir, "git", "fetch", "-f", r.remote, "refs/heads/*:refs/heads/*", "refs/tags/*:refs/tags/*"); err != nil {
return err
}
Expand Down
12 changes: 12 additions & 0 deletions src/cmd/go/internal/modfetch/codehost/svn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"path/filepath"
"strconv"
"time"

"cmd/go/internal/base"
)

func svnParseStat(rev, out string) (*RevInfo, error) {
Expand Down Expand Up @@ -66,6 +68,10 @@ func svnReadZip(ctx context.Context, dst io.Writer, workDir, rev, subdir, remote
remotePath += "/" + subdir
}

release, err := base.AcquireNet()
if err != nil {
return err
}
out, err := Run(ctx, workDir, []string{
"svn", "list",
"--non-interactive",
Expand All @@ -75,6 +81,7 @@ func svnReadZip(ctx context.Context, dst io.Writer, workDir, rev, subdir, remote
"--revision", rev,
"--", remotePath,
})
release()
if err != nil {
return err
}
Expand All @@ -98,6 +105,10 @@ func svnReadZip(ctx context.Context, dst io.Writer, workDir, rev, subdir, remote
}
defer os.RemoveAll(exportDir) // best-effort

release, err = base.AcquireNet()
if err != nil {
return err
}
_, err = Run(ctx, workDir, []string{
"svn", "export",
"--non-interactive",
Expand All @@ -112,6 +123,7 @@ func svnReadZip(ctx context.Context, dst io.Writer, workDir, rev, subdir, remote
"--", remotePath,
exportDir,
})
release()
if err != nil {
return err
}
Expand Down
16 changes: 15 additions & 1 deletion src/cmd/go/internal/modfetch/codehost/vcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

"cmd/go/internal/base"
"cmd/go/internal/lockedfile"
"cmd/go/internal/par"
"cmd/go/internal/str"
Expand Down Expand Up @@ -109,7 +110,14 @@ func newVCSRepo(ctx context.Context, vcs, remote string) (Repo, error) {
defer unlock()

if _, err := os.Stat(filepath.Join(r.dir, "."+vcs)); err != nil {
if _, err := Run(ctx, r.dir, cmd.init(r.remote)); err != nil {
release, err := base.AcquireNet()
if err != nil {
return nil, err
}
_, err = Run(ctx, r.dir, cmd.init(r.remote))
release()

if err != nil {
os.RemoveAll(r.dir)
return nil, err
}
Expand Down Expand Up @@ -355,7 +363,13 @@ func (r *vcsRepo) Stat(ctx context.Context, rev string) (*RevInfo, error) {

func (r *vcsRepo) fetch(ctx context.Context) {
if len(r.cmd.fetch) > 0 {
release, err := base.AcquireNet()
if err != nil {
r.fetchErr = err
return
}
_, r.fetchErr = Run(ctx, r.dir, r.cmd.fetch)
release()
}
}

Expand Down
Loading

0 comments on commit d0c72c2

Please sign in to comment.