Skip to content

Commit

Permalink
internal/pool: move the gce buildlet pool into a pool package
Browse files Browse the repository at this point in the history
This CL creates the internal/coordinator/pool package intended to
contain all buildlet pool implementations. In order to keep this
change small and carefully discover where the interactions are
between the gce buildlet pool and the rest of the coordinator
are, this change only moves the gce buildlet over to the new
package.

The next steps will be to move the rest of the buildlet pools
over to this package. After that we will restructure the
implementations themselves in order to increase test coverage
and increase the ease of testing.

Updates golang/go#36841
Updates golang/go#38337

Change-Id: If82ae1b584bd77c697aa84fadf9011c9e79fa409
Reviewed-on: https://go-review.googlesource.com/c/build/+/227141
Run-TryBot: Carlos Amedee <carlos@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Alexander Rakoczy <alex@golang.org>
  • Loading branch information
cagedmantis committed Apr 20, 2020
1 parent 6a8b9e1 commit 02e10ad
Show file tree
Hide file tree
Showing 16 changed files with 370 additions and 216 deletions.
160 changes: 59 additions & 101 deletions cmd/coordinator/coordinator.go

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions cmd/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"golang.org/x/build/buildenv"
"golang.org/x/build/dashboard"
"golang.org/x/build/internal/buildgo"
"golang.org/x/build/internal/coordinator/pool"
"golang.org/x/build/maintner/maintnerd/apipb"
)

Expand Down Expand Up @@ -230,8 +231,9 @@ func TestFindWork(t *testing.T) {
if testing.Short() {
t.Skip("skipping in short mode")
}
defer func(old *buildenv.Environment) { buildEnv = old }(buildEnv)
buildEnv = buildenv.Production
buildEnv := pool.GCEBuildEnv()
defer func(old *buildenv.Environment) { pool.SetGCEBuildEnv(old) }(buildEnv)
pool.SetGCEBuildEnv(buildenv.Production)
defer func() { buildgo.TestHookSnapshotExists = nil }()
buildgo.TestHookSnapshotExists = func(br *buildgo.BuilderRev) bool {
if strings.Contains(br.Name, "android") {
Expand Down
3 changes: 2 additions & 1 deletion cmd/coordinator/dash.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"cloud.google.com/go/compute/metadata"
"golang.org/x/build/internal/buildgo"
"golang.org/x/build/internal/coordinator/pool"
"golang.org/x/build/internal/secret"
)

Expand All @@ -50,7 +51,7 @@ func dash(meth, cmd string, args url.Values, req, resp interface{}) error {
}
var r *http.Response
var err error
cmd = buildEnv.DashBase() + cmd + "?" + argsCopy.Encode()
cmd = pool.GCEBuildEnv().DashBase() + cmd + "?" + argsCopy.Encode()
switch meth {
case "GET":
if req != nil {
Expand Down
5 changes: 3 additions & 2 deletions cmd/coordinator/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"text/template"

"golang.org/x/build/internal/buildgo"
"golang.org/x/build/internal/coordinator/pool"
"golang.org/x/build/types"
)

Expand Down Expand Up @@ -50,8 +51,8 @@ func handleDoSomeWork(work chan<- buildgo.BuilderRev) func(w http.ResponseWriter

// Cap number of jobs that can be scheduled from debug UI. If
// buildEnv.MaxBuilds is zero, there is no cap.
if buildEnv.MaxBuilds > 0 && count > buildEnv.MaxBuilds {
count = buildEnv.MaxBuilds
if pool.GCEBuildEnv().MaxBuilds > 0 && count > pool.GCEBuildEnv().MaxBuilds {
count = pool.GCEBuildEnv().MaxBuilds
}
log.Printf("looking for %v work items for %q", count, mode)

Expand Down
47 changes: 24 additions & 23 deletions cmd/coordinator/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/internal/coordinator/pool"
"golang.org/x/build/internal/sourcecache"
"golang.org/x/build/kubernetes"
"golang.org/x/build/kubernetes/api"
Expand All @@ -44,12 +45,12 @@ var (

// initGCE must be called before initKube
func initKube() error {
if buildEnv.KubeBuild.MaxNodes == 0 {
if pool.GCEBuildEnv().KubeBuild.MaxNodes == 0 {
return errors.New("Kubernetes builders disabled due to KubeBuild.MaxNodes == 0")
}

// projectID was set by initGCE
registryPrefix += "/" + buildEnv.ProjectName
registryPrefix += "/" + pool.GCEBuildEnv().ProjectName
if !hasCloudPlatformScope() {
return errors.New("coordinator not running with access to the Cloud Platform scope.")
}
Expand All @@ -58,19 +59,19 @@ func initKube() error {
defer cancel() // ctx is only used for discovery and connect; not retained.
var err error
buildletsKubeClient, err = gke.NewClient(ctx,
buildEnv.KubeBuild.Name,
gke.OptZone(buildEnv.ControlZone),
gke.OptProject(buildEnv.ProjectName),
gke.OptTokenSource(gcpCreds.TokenSource))
pool.GCEBuildEnv().KubeBuild.Name,
gke.OptZone(pool.GCEBuildEnv().ControlZone),
gke.OptProject(pool.GCEBuildEnv().ProjectName),
gke.OptTokenSource(pool.GCPCredentials().TokenSource))
if err != nil {
return err
}

goKubeClient, err = gke.NewClient(ctx,
buildEnv.KubeTools.Name,
gke.OptZone(buildEnv.ControlZone),
gke.OptProject(buildEnv.ProjectName),
gke.OptTokenSource(gcpCreds.TokenSource))
pool.GCEBuildEnv().KubeTools.Name,
gke.OptZone(pool.GCEBuildEnv().ControlZone),
gke.OptProject(pool.GCEBuildEnv().ProjectName),
gke.OptTokenSource(pool.GCPCredentials().TokenSource))
if err != nil {
return err
}
Expand Down Expand Up @@ -135,12 +136,12 @@ func (p *kubeBuildletPool) pollCapacityLoop() {
func (p *kubeBuildletPool) pollCapacity(ctx context.Context) {
nodes, err := buildletsKubeClient.GetNodes(ctx)
if err != nil {
log.Printf("failed to retrieve nodes to calculate cluster capacity for %s/%s: %v", buildEnv.ProjectName, buildEnv.Region(), err)
log.Printf("failed to retrieve nodes to calculate cluster capacity for %s/%s: %v", pool.GCEBuildEnv().ProjectName, pool.GCEBuildEnv().Region(), err)
return
}
pods, err := buildletsKubeClient.GetPods(ctx)
if err != nil {
log.Printf("failed to retrieve pods to calculate cluster capacity for %s/%s: %v", buildEnv.ProjectName, buildEnv.Region(), err)
log.Printf("failed to retrieve pods to calculate cluster capacity for %s/%s: %v", pool.GCEBuildEnv().ProjectName, pool.GCEBuildEnv().Region(), err)
return
}

Expand Down Expand Up @@ -209,7 +210,7 @@ func (p *kubeBuildletPool) pollCapacity(ctx context.Context) {

}

func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg logger) (*buildlet.Client, error) {
func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg pool.Logger) (*buildlet.Client, error) {
hconf, ok := dashboard.Hosts[hostType]
if !ok || !hconf.IsContainer() {
return nil, fmt.Errorf("kubepool: invalid host type %q", hostType)
Expand All @@ -221,7 +222,7 @@ func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg
panic("expect non-nil buildletsKubeClient")
}

deleteIn, ok := ctx.Value(buildletTimeoutOpt{}).(time.Duration)
deleteIn, ok := ctx.Value(pool.BuildletTimeoutOpt{}).(time.Duration)
if !ok {
deleteIn = podDeleteTimeout
}
Expand All @@ -236,7 +237,7 @@ func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg
log.Printf("Creating Kubernetes pod %q for %s", podName, hostType)

bc, err := buildlet.StartPod(ctx, buildletsKubeClient, podName, hostType, buildlet.PodOpts{
ProjectID: buildEnv.ProjectName,
ProjectID: pool.GCEBuildEnv().ProjectName,
ImageRegistry: registryPrefix,
Description: fmt.Sprintf("Go Builder for %s", hostType),
DeleteIn: deleteIn,
Expand Down Expand Up @@ -291,7 +292,7 @@ func (p *kubeBuildletPool) WriteHTMLStatus(w io.Writer) {
fmt.Fprintf(w, "<ul>")
for i, pod := range active {
if i < show/2 || i >= len(active)-(show/2) {
fmt.Fprintf(w, "<li>%v, %v</li>\n", pod.name, time.Since(pod.creation))
fmt.Fprintf(w, "<li>%v, %v</li>\n", pod.Name, time.Since(pod.Creation))
} else if i == show/2 {
fmt.Fprintf(w, "<li>... %d of %d total omitted ...</li>\n", len(active)-show, len(active))
}
Expand Down Expand Up @@ -353,16 +354,16 @@ func (p *kubeBuildletPool) podUsed(podName string) bool {
return ok
}

func (p *kubeBuildletPool) podsActive() (ret []resourceTime) {
func (p *kubeBuildletPool) podsActive() (ret []pool.ResourceTime) {
p.mu.Lock()
defer p.mu.Unlock()
for name, ph := range p.pods {
ret = append(ret, resourceTime{
name: name,
creation: ph.requestedAt,
ret = append(ret, pool.ResourceTime{
Name: name,
Creation: ph.requestedAt,
})
}
sort.Sort(byCreationTime(ret))
sort.Sort(pool.ByCreationTime(ret))
return ret
}

Expand Down Expand Up @@ -437,7 +438,7 @@ func (p *kubeBuildletPool) cleanUpOldPods(ctx context.Context) {
}
if err == nil && time.Now().Unix() > unixDeadline {
stats.DeletedOld++
log.Printf("cleanUpOldPods: Deleting expired pod %q in zone %q ...", pod.Name, buildEnv.ControlZone)
log.Printf("cleanUpOldPods: Deleting expired pod %q in zone %q ...", pod.Name, pool.GCEBuildEnv().ControlZone)
err = buildletsKubeClient.DeletePod(ctx, pod.Name)
if err != nil {
log.Printf("cleanUpOldPods: problem deleting old pod %q: %v", pod.Name, err)
Expand Down Expand Up @@ -467,5 +468,5 @@ func (p *kubeBuildletPool) cleanUpOldPods(ctx context.Context) {
}

func hasCloudPlatformScope() bool {
return hasScope(container.CloudPlatformScope)
return pool.HasScope(container.CloudPlatformScope)
}
13 changes: 7 additions & 6 deletions cmd/coordinator/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"cloud.google.com/go/datastore"

"golang.org/x/build/internal/coordinator/pool"
"golang.org/x/build/types"
)

Expand All @@ -32,13 +33,13 @@ type ProcessRecord struct {
}

func updateInstanceRecord() {
if dsClient == nil {
if pool.GCEDSClient() == nil {
return
}
ctx := context.Background()
for {
key := datastore.NameKey("Process", processID, nil)
_, err := dsClient.Put(ctx, key, &ProcessRecord{
_, err := pool.GCEDSClient().Put(ctx, key, &ProcessRecord{
ID: processID,
Start: processStartTime,
LastHeartbeat: time.Now(),
Expand All @@ -51,23 +52,23 @@ func updateInstanceRecord() {
}

func putBuildRecord(br *types.BuildRecord) {
if dsClient == nil {
if pool.GCEDSClient() == nil {
return
}
ctx := context.Background()
key := datastore.NameKey("Build", br.ID, nil)
if _, err := dsClient.Put(ctx, key, br); err != nil {
if _, err := pool.GCEDSClient().Put(ctx, key, br); err != nil {
log.Printf("datastore Build Put: %v", err)
}
}

func putSpanRecord(sr *types.SpanRecord) {
if dsClient == nil {
if pool.GCEDSClient() == nil {
return
}
ctx := context.Background()
key := datastore.NameKey("Span", fmt.Sprintf("%s-%v-%v", sr.BuildID, sr.StartTime.UnixNano(), sr.Event), nil)
if _, err := dsClient.Put(ctx, key, sr); err != nil {
if _, err := pool.GCEDSClient().Put(ctx, key, sr); err != nil {
log.Printf("datastore Span Put: %v", err)
}
}
5 changes: 3 additions & 2 deletions cmd/coordinator/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"golang.org/x/build/cmd/coordinator/metrics"
"golang.org/x/build/internal/coordinator/pool"

"github.com/golang/protobuf/ptypes"
metpb "google.golang.org/genproto/googleapis/api/metric"
Expand Down Expand Up @@ -66,8 +67,8 @@ func reportReverseCountMetrics(ctx context.Context) error {
})
}

return metricsClient.CreateTimeSeries(ctx, &monpb.CreateTimeSeriesRequest{
Name: m.DescriptorPath(buildEnv.ProjectName),
return pool.MetricsClient().CreateTimeSeries(ctx, &monpb.CreateTimeSeriesRequest{
Name: m.DescriptorPath(pool.GCEBuildEnv().ProjectName),
TimeSeries: ts,
})
}
5 changes: 3 additions & 2 deletions cmd/coordinator/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/kr/pty"
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/internal/coordinator/pool"
"golang.org/x/build/internal/gophers"
"golang.org/x/build/internal/secret"
"golang.org/x/build/types"
Expand Down Expand Up @@ -525,11 +526,11 @@ func listenAndServeSSH(sc *secret.Client) {
log.Fatal(err)
}
} else {
if storageClient == nil {
if pool.StorageClient() == nil {
log.Printf("GCS storage client not available; not running SSH server.")
return
}
r, err := storageClient.Bucket(buildEnv.BuildletBucket).Object("coordinator-gomote-ssh.key").NewReader(context.Background())
r, err := pool.StorageClient().Bucket(pool.GCEBuildEnv().BuildletBucket).Object("coordinator-gomote-ssh.key").NewReader(context.Background())
if err != nil {
log.Printf("Failed to read ssh host key: %v; not running SSH server.", err)
return
Expand Down
7 changes: 4 additions & 3 deletions cmd/coordinator/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/internal/coordinator/pool"
)

type TestBuildletPool struct {
Expand All @@ -30,7 +31,7 @@ type TestBuildletPool struct {

// GetBuildlet finds the first available buildlet for the hostType and returns
// it, or an error if no buildlets are available for that hostType.
func (tp *TestBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg logger) (*buildlet.Client, error) {
func (tp *TestBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg pool.Logger) (*buildlet.Client, error) {
tp.mu.Lock()
defer tp.mu.Unlock()
c, ok := tp.clients[hostType]
Expand Down Expand Up @@ -123,7 +124,7 @@ func TestHandleBuildletCreate_PreStream(t *testing.T) {
defer log.SetOutput(os.Stderr)
addBuilder(buildName)
remoteBuildlets.m = map[string]*remoteBuildlet{}
testPoolHook = func(_ *dashboard.HostConfig) BuildletPool { return testPool }
testPoolHook = func(_ *dashboard.HostConfig) pool.Buildlet { return testPool }
defer func() {
timeNow = time.Now
removeBuilder(buildName)
Expand Down Expand Up @@ -152,7 +153,7 @@ func TestHandleBuildletCreate_Stream(t *testing.T) {
defer log.SetOutput(os.Stderr)
addBuilder(buildName)
remoteBuildlets.m = map[string]*remoteBuildlet{}
testPoolHook = func(_ *dashboard.HostConfig) BuildletPool { return testPool }
testPoolHook = func(_ *dashboard.HostConfig) pool.Buildlet { return testPool }
defer func() {
timeNow = time.Now
removeBuilder(buildName)
Expand Down
5 changes: 3 additions & 2 deletions cmd/coordinator/reverse.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (

"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/internal/coordinator/pool"
"golang.org/x/build/revdial/v2"
"golang.org/x/build/types"
)
Expand Down Expand Up @@ -295,7 +296,7 @@ func (p *reverseBuildletPool) updateWaiterCounter(hostType string, delta int) {
p.waiters[hostType] += delta
}

func (p *reverseBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg logger) (*buildlet.Client, error) {
func (p *reverseBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg pool.Logger) (*buildlet.Client, error) {
p.updateWaiterCounter(hostType, 1)
defer p.updateWaiterCounter(hostType, -1)
seenErrInUse := false
Expand Down Expand Up @@ -324,7 +325,7 @@ func (p *reverseBuildletPool) GetBuildlet(ctx context.Context, hostType string,
}
}

func (p *reverseBuildletPool) cleanedBuildlet(b *buildlet.Client, lg logger) (*buildlet.Client, error) {
func (p *reverseBuildletPool) cleanedBuildlet(b *buildlet.Client, lg pool.Logger) (*buildlet.Client, error) {
// Clean up any files from previous builds.
sp := lg.CreateSpan("clean_buildlet", b.String())
err := b.RemoveAll(context.Background(), ".")
Expand Down
7 changes: 4 additions & 3 deletions cmd/coordinator/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/internal/buildgo"
"golang.org/x/build/internal/coordinator/pool"
"golang.org/x/build/internal/spanlog"
"golang.org/x/build/types"
)
Expand All @@ -43,7 +44,7 @@ type Scheduler struct {
// A getBuildletResult is a buildlet that was just created and is up and
// is ready to be assigned to a caller based on priority.
type getBuildletResult struct {
Pool BuildletPool
Pool pool.Buildlet
HostType string

// One of Client or Err gets set:
Expand Down Expand Up @@ -142,7 +143,7 @@ func (l stderrLogger) CreateSpan(event string, optText ...string) spanlog.Span {

// getPoolBuildlet is launched as its own goroutine to do a
// potentially long blocking cal to pool.GetBuildlet.
func (s *Scheduler) getPoolBuildlet(pool BuildletPool, hostType string) {
func (s *Scheduler) getPoolBuildlet(pool pool.Buildlet, hostType string) {
res := getBuildletResult{
Pool: pool,
HostType: hostType,
Expand Down Expand Up @@ -341,7 +342,7 @@ type SchedItem struct {
s *Scheduler
requestTime time.Time
tryFor string // TODO: which user. (user with 1 trybot >> user with 50 trybots)
pool BuildletPool
pool pool.Buildlet
ctxDone <-chan struct{}

// wantRes is the unbuffered channel that's passed
Expand Down
Loading

0 comments on commit 02e10ad

Please sign in to comment.