Skip to content

Commit

Permalink
internal/pool: move the kubernetes buildlet pool into a pool package
Browse files Browse the repository at this point in the history
This is a set in a series of steps which will move everything buildlet
pool related into a pool package.

Updates golang/go#36841

Change-Id: I8efb1f94c7b929be559004d9f455bca0370c7800
Reviewed-on: https://go-review.googlesource.com/c/build/+/227768
Run-TryBot: Carlos Amedee <carlos@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Alexander Rakoczy <alex@golang.org>
  • Loading branch information
cagedmantis committed Apr 20, 2020
1 parent 02e10ad commit 1f68cb0
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 35 deletions.
14 changes: 7 additions & 7 deletions cmd/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@ func main() {
}

// TODO(evanbrown: disable kubePool if init fails)
err = initKube()
err = pool.InitKube(monitorGitMirror)
if err != nil {
kubeErr = err
pool.KubeSetErr(err)
log.Printf("Kube support disabled due to error initializing Kubernetes: %v", err)
}

Expand Down Expand Up @@ -346,8 +346,8 @@ func main() {
pool.GetGCEBuildletPool().SetEnabled(*devEnableGCE)
} else {
go pool.GetGCEBuildletPool().CleanUpOldVMs()
if kubeErr == nil {
go kubePool.cleanUpOldPodsLoop(context.Background())
if pool.KubeErr() == nil {
go pool.KubePool().CleanUpOldPodsLoop(context.Background())
}

if pool.GCEInStaging() {
Expand Down Expand Up @@ -1628,10 +1628,10 @@ func poolForConf(conf *dashboard.HostConfig) pool.Buildlet {
case conf.IsVM():
return pool.GetGCEBuildletPool()
case conf.IsContainer():
if pool.GCEBuildEnv().PreferContainersOnCOS || kubeErr != nil {
if pool.GCEBuildEnv().PreferContainersOnCOS || pool.KubeErr() != nil {
return pool.GetGCEBuildletPool() // it also knows how to do containers.
} else {
return kubePool
return pool.KubePool()
}
case conf.IsReverse:
return reversePool
Expand Down Expand Up @@ -1820,7 +1820,7 @@ func (st *buildStatus) forceSnapshotUsage() {
}

func (st *buildStatus) getCrossCompileConfig() *dashboard.CrossCompileConfig {
if kubeErr != nil {
if pool.KubeErr() != nil {
return nil
}
config := st.conf.CrossCompileConfig
Expand Down
4 changes: 2 additions & 2 deletions cmd/coordinator/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func monitorGitMirror() {
func gitMirrorErrors() (errs []string) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
pods, err := goKubeClient.GetPods(ctx)
pods, err := pool.KubeGoClient().GetPods(ctx)
if err != nil {
log.Println("gitMirrorErrors: goKubeClient.GetPods:", err)
return []string{"failed to get pods; can't query gitmirror status"}
Expand Down Expand Up @@ -662,7 +662,7 @@ func handleStatus(w http.ResponseWriter, r *http.Request) {
data.GCEPoolStatus = template.HTML(buf.String())
buf.Reset()

kubePool.WriteHTMLStatus(&buf)
pool.KubePool().WriteHTMLStatus(&buf)
data.KubePoolStatus = template.HTML(buf.String())
buf.Reset()

Expand Down
83 changes: 57 additions & 26 deletions cmd/coordinator/kube.go → internal/coordinator/pool/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// +build go1.13
// +build linux darwin

package main
package pool

import (
"context"
Expand All @@ -22,7 +22,6 @@ import (

"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/internal/coordinator/pool"
"golang.org/x/build/internal/sourcecache"
"golang.org/x/build/kubernetes"
"golang.org/x/build/kubernetes/api"
Expand All @@ -34,6 +33,15 @@ import (
This file implements the Kubernetes-based buildlet pool.
*/

const (
// podDeleteTimeout is how long before we delete a VM.
// In practice this need only be as long as the slowest
// builder (plan9 currently), because on startup this program
// already deletes all buildlets it doesn't know about
// (i.e. ones from a previous instance of the coordinator).
podDeleteTimeout = 45 * time.Minute
)

// Initialized by initKube:
var (
buildletsKubeClient *kubernetes.Client // for "buildlets" cluster
Expand All @@ -43,14 +51,17 @@ var (
kubeCluster *container.Cluster
)

// initGCE must be called before initKube
func initKube() error {
if pool.GCEBuildEnv().KubeBuild.MaxNodes == 0 {
// MonitorGitMirrorFunc defines a function used to monitor gitmirror.
type MonitorGitMirrorFunc func()

// InitGCE must be called before initKube
func InitKube(monitorGitMirror MonitorGitMirrorFunc) error {
if GCEBuildEnv().KubeBuild.MaxNodes == 0 {
return errors.New("Kubernetes builders disabled due to KubeBuild.MaxNodes == 0")
}

// projectID was set by initGCE
registryPrefix += "/" + pool.GCEBuildEnv().ProjectName
registryPrefix += "/" + GCEBuildEnv().ProjectName
if !hasCloudPlatformScope() {
return errors.New("coordinator not running with access to the Cloud Platform scope.")
}
Expand All @@ -59,19 +70,19 @@ func initKube() error {
defer cancel() // ctx is only used for discovery and connect; not retained.
var err error
buildletsKubeClient, err = gke.NewClient(ctx,
pool.GCEBuildEnv().KubeBuild.Name,
gke.OptZone(pool.GCEBuildEnv().ControlZone),
gke.OptProject(pool.GCEBuildEnv().ProjectName),
gke.OptTokenSource(pool.GCPCredentials().TokenSource))
GCEBuildEnv().KubeBuild.Name,
gke.OptZone(GCEBuildEnv().ControlZone),
gke.OptProject(GCEBuildEnv().ProjectName),
gke.OptTokenSource(GCPCredentials().TokenSource))
if err != nil {
return err
}

goKubeClient, err = gke.NewClient(ctx,
pool.GCEBuildEnv().KubeTools.Name,
gke.OptZone(pool.GCEBuildEnv().ControlZone),
gke.OptProject(pool.GCEBuildEnv().ProjectName),
gke.OptTokenSource(pool.GCPCredentials().TokenSource))
GCEBuildEnv().KubeTools.Name,
gke.OptZone(GCEBuildEnv().ControlZone),
gke.OptProject(GCEBuildEnv().ProjectName),
gke.OptTokenSource(GCPCredentials().TokenSource))
if err != nil {
return err
}
Expand All @@ -85,6 +96,26 @@ func initKube() error {
return nil
}

// KubeSetErr sets the kube error to passed in value.
func KubeSetErr(err error) {
kubeErr = err
}

// KubeErr retrieves the kube error value.
func KubeErr() error {
return kubeErr
}

// KubePool returns the kube buildlet pool.
func KubePool() *kubeBuildletPool {
return kubePool
}

// KubeGoClient retrieves a kube client for the go cluster.
func KubeGoClient() *kubernetes.Client {
return goKubeClient
}

// kubeBuildletPool is the Kubernetes buildlet pool.
type kubeBuildletPool struct {
mu sync.Mutex // guards all following
Expand Down Expand Up @@ -136,12 +167,12 @@ func (p *kubeBuildletPool) pollCapacityLoop() {
func (p *kubeBuildletPool) pollCapacity(ctx context.Context) {
nodes, err := buildletsKubeClient.GetNodes(ctx)
if err != nil {
log.Printf("failed to retrieve nodes to calculate cluster capacity for %s/%s: %v", pool.GCEBuildEnv().ProjectName, pool.GCEBuildEnv().Region(), err)
log.Printf("failed to retrieve nodes to calculate cluster capacity for %s/%s: %v", GCEBuildEnv().ProjectName, GCEBuildEnv().Region(), err)
return
}
pods, err := buildletsKubeClient.GetPods(ctx)
if err != nil {
log.Printf("failed to retrieve pods to calculate cluster capacity for %s/%s: %v", pool.GCEBuildEnv().ProjectName, pool.GCEBuildEnv().Region(), err)
log.Printf("failed to retrieve pods to calculate cluster capacity for %s/%s: %v", GCEBuildEnv().ProjectName, GCEBuildEnv().Region(), err)
return
}

Expand Down Expand Up @@ -210,7 +241,7 @@ func (p *kubeBuildletPool) pollCapacity(ctx context.Context) {

}

func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg pool.Logger) (*buildlet.Client, error) {
func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg Logger) (*buildlet.Client, error) {
hconf, ok := dashboard.Hosts[hostType]
if !ok || !hconf.IsContainer() {
return nil, fmt.Errorf("kubepool: invalid host type %q", hostType)
Expand All @@ -222,7 +253,7 @@ func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg
panic("expect non-nil buildletsKubeClient")
}

deleteIn, ok := ctx.Value(pool.BuildletTimeoutOpt{}).(time.Duration)
deleteIn, ok := ctx.Value(BuildletTimeoutOpt{}).(time.Duration)
if !ok {
deleteIn = podDeleteTimeout
}
Expand All @@ -237,7 +268,7 @@ func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg
log.Printf("Creating Kubernetes pod %q for %s", podName, hostType)

bc, err := buildlet.StartPod(ctx, buildletsKubeClient, podName, hostType, buildlet.PodOpts{
ProjectID: pool.GCEBuildEnv().ProjectName,
ProjectID: GCEBuildEnv().ProjectName,
ImageRegistry: registryPrefix,
Description: fmt.Sprintf("Go Builder for %s", hostType),
DeleteIn: deleteIn,
Expand Down Expand Up @@ -354,16 +385,16 @@ func (p *kubeBuildletPool) podUsed(podName string) bool {
return ok
}

func (p *kubeBuildletPool) podsActive() (ret []pool.ResourceTime) {
func (p *kubeBuildletPool) podsActive() (ret []ResourceTime) {
p.mu.Lock()
defer p.mu.Unlock()
for name, ph := range p.pods {
ret = append(ret, pool.ResourceTime{
ret = append(ret, ResourceTime{
Name: name,
Creation: ph.requestedAt,
})
}
sort.Sort(pool.ByCreationTime(ret))
sort.Sort(ByCreationTime(ret))
return ret
}

Expand All @@ -376,7 +407,7 @@ func (p *kubeBuildletPool) String() string {
return fmt.Sprintf("Kubernetes pool capacity: %d/%d", inUse, total)
}

// cleanUpOldPods loops forever and periodically enumerates pods
// CleanUpOldPods loops forever and periodically enumerates pods
// and deletes those which have expired.
//
// A Pod is considered expired if it has a "delete-at" metadata
Expand All @@ -389,7 +420,7 @@ func (p *kubeBuildletPool) String() string {
// stranded and wasting resources forever, we instead set the
// "delete-at" metadata attribute on them when created to some time
// that's well beyond their expected lifetime.
func (p *kubeBuildletPool) cleanUpOldPodsLoop(ctx context.Context) {
func (p *kubeBuildletPool) CleanUpOldPodsLoop(ctx context.Context) {
if buildletsKubeClient == nil {
log.Printf("cleanUpOldPods: no buildletsKubeClient configured; aborting.")
return
Expand Down Expand Up @@ -438,7 +469,7 @@ func (p *kubeBuildletPool) cleanUpOldPods(ctx context.Context) {
}
if err == nil && time.Now().Unix() > unixDeadline {
stats.DeletedOld++
log.Printf("cleanUpOldPods: Deleting expired pod %q in zone %q ...", pod.Name, pool.GCEBuildEnv().ControlZone)
log.Printf("cleanUpOldPods: Deleting expired pod %q in zone %q ...", pod.Name, GCEBuildEnv().ControlZone)
err = buildletsKubeClient.DeletePod(ctx, pod.Name)
if err != nil {
log.Printf("cleanUpOldPods: problem deleting old pod %q: %v", pod.Name, err)
Expand Down Expand Up @@ -468,5 +499,5 @@ func (p *kubeBuildletPool) cleanUpOldPods(ctx context.Context) {
}

func hasCloudPlatformScope() bool {
return pool.HasScope(container.CloudPlatformScope)
return HasScope(container.CloudPlatformScope)
}

0 comments on commit 1f68cb0

Please sign in to comment.