From 96f452ddd4fc16215638d28e09594414284ee0f9 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Mon, 1 Apr 2024 17:49:00 +0300 Subject: [PATCH] wip --- Makefile | 2 +- .../scalability/default_generator_config.yaml | 6 +- .../testrunner/controller/controller.go | 11 +- test/scalability/testrunner/main.go | 36 ++- .../testrunner/recorder/recorder.go | 211 ++++++++++++++++-- 5 files changed, 235 insertions(+), 31 deletions(-) diff --git a/Makefile b/Makefile index c0f70b6498..74b894d83f 100644 --- a/Makefile +++ b/Makefile @@ -213,8 +213,8 @@ test-scalability: envtest ./bin/scalability-runner \ --o $(ARTIFACTS)/scalability \ --crds=$(PROJECT_DIR)/config/components/crd/bases \ - --withCPUProfile=true \ --generatorConfig=$(PROJECT_DIR)/test/scalability/default_generator_config.yaml \ +# --withCPUProfile=true \ # --withLogs=true \ # --logToFile=true diff --git a/test/scalability/default_generator_config.yaml b/test/scalability/default_generator_config.yaml index a738cbe723..f17297b0a6 100644 --- a/test/scalability/default_generator_config.yaml +++ b/test/scalability/default_generator_config.yaml @@ -13,7 +13,7 @@ workloads: - &small className: small - msRunning: 10 + msRunning: 100 priority: 50 request: 1 - *small @@ -24,11 +24,11 @@ - *small - &medium className: medium - msRunning: 50000 + msRunning: 500 priority: 100 request: 5 - *medium - className: large - msRunning: 100000 + msRunning: 1000 priority: 200 request: 20 diff --git a/test/scalability/testrunner/controller/controller.go b/test/scalability/testrunner/controller/controller.go index bb7c0a2469..6f50d0b418 100644 --- a/test/scalability/testrunner/controller/controller.go +++ b/test/scalability/testrunner/controller/controller.go @@ -74,10 +74,11 @@ var _ reconcile.Reconciler = (*reconciler)(nil) var _ predicate.Predicate = (*reconciler)(nil) func (r *reconciler) Create(ev event.CreateEvent) bool { - if wl, isWl := (ev.Object).(*kueue.Workload); isWl { + wl, isWl := (ev.Object).(*kueue.Workload) + if isWl { r.recorder.RecordWorkloadState(wl) } - return false + return !isWl } func (r *reconciler) Delete(_ event.DeleteEvent) bool { @@ -88,7 +89,7 @@ func (r *reconciler) Delete(_ event.DeleteEvent) bool { func (r *reconciler) Update(ev event.UpdateEvent) bool { wl, isWl := (ev.ObjectNew).(*kueue.Workload) if !isWl { - return false + return true } admitted := apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadAdmitted) r.setAdmittedTime(wl.UID, admitted) @@ -116,7 +117,7 @@ func (r *reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco _ = workload.UnsetQuotaReservationWithCondition(&wl, "Pending", "Evicted by the test runner") err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true) if err == nil { - log.V(2).Info("Finish eviction") + log.V(5).Info("Finish eviction") } return reconcile.Result{}, err } @@ -140,7 +141,7 @@ func (r *reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco } else { err := workload.UpdateStatus(ctx, r.client, &wl, kueue.WorkloadFinished, metav1.ConditionTrue, "ByTest", "By test runner", constants.JobControllerName) if err == nil { - log.V(2).Info("Finish Workload") + log.V(5).Info("Finish Workload") } return reconcile.Result{}, err } diff --git a/test/scalability/testrunner/main.go b/test/scalability/testrunner/main.go index f3dc16fc66..5d8351ab56 100644 --- a/test/scalability/testrunner/main.go +++ b/test/scalability/testrunner/main.go @@ -26,6 +26,7 @@ import ( "path" "sync" "syscall" + "time" zaplog "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -132,6 +133,7 @@ func main() { os.Exit(1) } + wg.Add(1) recorder, err := startRecorder(ctx, errCh, wg, generationDoneCh) if err != nil { log.Error(err, "recorder start") @@ -151,19 +153,29 @@ func main() { done := make(chan os.Signal, 1) signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) + endWithError := false select { case <-done: log.Info("Done") case err := <-errCh: - log.Error(err, "Error") + if err != nil { + log.Error(err, "Error") + endWithError = true + } } ctxCancel() - wg.Wait() - // stop the manager - - // interpret events / reporting + if !endWithError { + err := recorder.WriteSummary(path.Join(*outputDir, "summary.yaml")) + if err != nil { + log.Error(err, "Writing summary") + } + err = recorder.WriteCQCsv(path.Join(*outputDir, "cqEvennts.csv")) + if err != nil { + log.Error(err, "Writing cq csv") + } + } } func runCommand(ctx context.Context, workDir, exe, kubeconfig string, withCPUProf, withLogs, logToFile bool, errCh chan<- error, wg *sync.WaitGroup) error { @@ -180,6 +192,7 @@ func runCommand(ctx context.Context, workDir, exe, kubeconfig string, withCPUPro args = append(args, "--cpuprofile", path.Join(workDir, fmt.Sprintf("%s.cpu.prof", exe))) } + // TODO: having some "time" like stats could help cmd := exec.CommandContext(ctx, cmdPath, args...) cmd.Cancel = func() error { log.Info("Stop the command") @@ -266,13 +279,18 @@ func runGenerator(ctx context.Context, cfg *rest.Config, generatorConfig string, func startRecorder(ctx context.Context, errCh chan<- error, wg *sync.WaitGroup, genDone <-chan struct{}) (*recorder.Recorder, error) { defer wg.Done() log := ctrl.LoggerFrom(ctx).WithName("Start recorder") - - recorder := recorder.New() + //TODO: make the timeout an arg + recorder := recorder.New(30 * time.Minute) wg.Add(1) go func() { defer wg.Done() - recorder.Run(ctx, genDone) - log.Info("Recorder done") + err := recorder.Run(ctx, genDone) + if err != nil { + log.Error(err, "Recorder run") + } else { + log.Info("Recorder done") + } + errCh <- err }() log.Info("Recorder started ") diff --git a/test/scalability/testrunner/recorder/recorder.go b/test/scalability/testrunner/recorder/recorder.go index 5717b95fb8..a98f31d5b5 100644 --- a/test/scalability/testrunner/recorder/recorder.go +++ b/test/scalability/testrunner/recorder/recorder.go @@ -18,35 +18,188 @@ package recorder import ( "context" + "encoding/csv" + "os" + "strconv" "sync/atomic" + "time" + + "k8s.io/apimachinery/pkg/api/equality" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/yaml" ) -type event struct{} +type event interface { + GetEventInfo() string +} + +type CQStatus struct { + CPUReservation int64 + CPUUsage int64 + CPUQuota int64 + PendingWorkloads int32 + ReservingWorkloads int32 + AdmittedWorkloads int32 + Active bool +} + +type CQEvent struct { + Time time.Time + Name string + Cohort string + UID types.UID + + Status CQStatus +} + +func (e *CQEvent) GetEventInfo() string { + // TODO... + return "" +} + +func (e *CQEvent) CsvRecord(t0 time.Time) []string { + var usagePrc, reservingPrc int64 + if e.Status.CPUQuota > 0 { + reservingPrc = e.Status.CPUReservation * 100 / e.Status.CPUQuota + usagePrc = e.Status.CPUUsage * 100 / e.Status.CPUQuota + } + deltaT := e.Time.Sub(t0) + return []string{ + strconv.FormatInt(deltaT.Milliseconds(), 10), + e.Cohort, + e.Name, + strconv.FormatBool(e.Status.Active), + strconv.FormatInt(e.Status.CPUReservation, 10), + strconv.FormatInt(e.Status.CPUUsage, 10), + strconv.FormatInt(reservingPrc, 10), + strconv.FormatInt(usagePrc, 10), + strconv.FormatInt(int64(e.Status.PendingWorkloads), 10), + strconv.FormatInt(int64(e.Status.ReservingWorkloads), 10), + strconv.FormatInt(int64(e.Status.AdmittedWorkloads), 10), + } +} + +type CQEvents struct { + Events []*CQEvent + LastEvent *CQEvent +} + +type CQStore struct { + All []*CQEvent + ByCq map[string]CQEvents + ByCohort map[string]sets.Set[string] +} + +type Store struct { + CQ CQStore +} type Recorder struct { - running atomic.Bool - evChan chan event + maxRecording time.Duration + running atomic.Bool + evChan chan event + + Store Store } -func New() *Recorder { +func New(maxRecording time.Duration) *Recorder { return &Recorder{ - evChan: make(chan event, 10), + maxRecording: maxRecording, + running: atomic.Bool{}, + evChan: make(chan event, 10), + Store: Store{ + CQ: CQStore{ + All: []*CQEvent{}, + ByCq: map[string]CQEvents{}, + ByCohort: map[string]sets.Set[string]{}, + }, + }, } } func (r *Recorder) record(ev event) { //TODO: + switch v := ev.(type) { + case *CQEvent: + last := r.Store.CQ.ByCq[v.Name] + if last.LastEvent == nil || !equality.Semantic.DeepEqual(last.LastEvent.Status, v.Status) { + // add it if different + r.Store.CQ.All = append(r.Store.CQ.All, v) + r.Store.CQ.ByCq[v.Name] = CQEvents{ + LastEvent: v, + Events: append(r.Store.CQ.ByCq[v.Name].Events, v), + } + if _, fond := r.Store.CQ.ByCohort[v.Cohort]; !fond { + r.Store.CQ.ByCohort[v.Cohort] = sets.New[string]() + } + r.Store.CQ.ByCohort[v.Cohort].Insert(v.Name) + } + default: + // none fore now + } } func (r *Recorder) expectMoreEvents() bool { - //TODO: - return true + for _, cqStatus := range r.Store.CQ.ByCq { + s := cqStatus.LastEvent.Status + if (s.PendingWorkloads > 0 || s.ReservingWorkloads > 0) && s.Active { + return true + } + } + return false } -func (r *Recorder) Run(ctx context.Context, genDone <-chan struct{}) { +func (r *Recorder) WriteSummary(path string) error { + // TODO: extract proper KPIs + bytes, err := yaml.Marshal(r.Store) + if err != nil { + return err + } + return os.WriteFile(path, bytes, 0666) +} + +func (r *Recorder) WriteCQCsv(path string) (err error) { + var f *os.File + f, err = os.Create(path) + if err != nil { + return err + } + defer f.Close() + cWriter := csv.NewWriter(f) + + defer func() { + cWriter.Flush() + if err == nil { + err = cWriter.Error() + } + }() + + err = cWriter.Write([]string{"time", "cohort", "cluster queue", "active", "reserving mCPU", "using mCPU", "reserving CPU%", "using CPU%", "pending workloads", "reserving workloads", "admitted workloads"}) + if err != nil { + return err + } + + var t0 time.Time + if len(r.Store.CQ.All) > 0 { + t0 = r.Store.CQ.All[0].Time + } + + for _, ev := range r.Store.CQ.All { + err = cWriter.Write(ev.CsvRecord(t0)) + if err != nil { + return err + } + } + + return err +} + +func (r *Recorder) Run(ctx context.Context, genDone <-chan struct{}) error { r.running.Store(true) defer r.running.Store(false) @@ -57,14 +210,17 @@ func (r *Recorder) Run(ctx context.Context, genDone <-chan struct{}) { generateDone.Store(true) }() + ctx, cancel := context.WithTimeout(ctx, r.maxRecording) + defer cancel() + for { select { case <-ctx.Done(): - return + return ctx.Err() case ev := <-r.evChan: r.record(ev) if generateDone.Load() && !r.expectMoreEvents() { - return + return nil } } } @@ -75,15 +231,44 @@ func (r *Recorder) RecordWorkloadState(wl *kueue.Workload) { return } // TODO: + // for now only monitor the CQs - r.evChan <- event{} + //r.evChan <- event{} } func (r *Recorder) RecordCQState(cq *kueue.ClusterQueue) { if !r.running.Load() { return } - // TODO: - r.evChan <- event{} + // for all of this we consider having having only one flavor and only CPU resources + var cpuReserved, cpuUsed, cpuQuota int64 + if len(cq.Status.FlavorsReservation) > 0 && len(cq.Status.FlavorsReservation[0].Resources) > 0 { + cpuReserved = cq.Status.FlavorsReservation[0].Resources[0].Total.MilliValue() + } + + if len(cq.Status.FlavorsUsage) > 0 && len(cq.Status.FlavorsUsage[0].Resources) > 0 { + cpuUsed = cq.Status.FlavorsUsage[0].Resources[0].Total.MilliValue() + } + + if len(cq.Spec.ResourceGroups) > 0 && len(cq.Spec.ResourceGroups[0].Flavors) > 0 && len(cq.Spec.ResourceGroups[0].Flavors[0].Resources) > 0 { + cpuQuota = cq.Spec.ResourceGroups[0].Flavors[0].Resources[0].NominalQuota.MilliValue() + } + + r.evChan <- &CQEvent{ + Time: time.Now(), + Name: cq.Name, + Cohort: cq.Spec.Cohort, + UID: cq.UID, + + Status: CQStatus{ + CPUReservation: cpuReserved, + CPUUsage: cpuUsed, + CPUQuota: cpuQuota, + PendingWorkloads: cq.Status.PendingWorkloads, + ReservingWorkloads: cq.Status.ReservingWorkloads, + AdmittedWorkloads: cq.Status.AdmittedWorkloads, + Active: apimeta.IsStatusConditionTrue(cq.Status.Conditions, kueue.AdmissionCheckActive), + }, + } }