Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Apr 1, 2024
1 parent 406d5dc commit 96f452d
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ test-scalability: envtest
./bin/scalability-runner \
--o $(ARTIFACTS)/scalability \
--crds=$(PROJECT_DIR)/config/components/crd/bases \
--withCPUProfile=true \
--generatorConfig=$(PROJECT_DIR)/test/scalability/default_generator_config.yaml \
# --withCPUProfile=true \
# --withLogs=true \
# --logToFile=true

Expand Down
6 changes: 3 additions & 3 deletions test/scalability/default_generator_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
workloads:
- &small
className: small
msRunning: 10
msRunning: 100
priority: 50
request: 1
- *small
Expand All @@ -24,11 +24,11 @@
- *small
- &medium
className: medium
msRunning: 50000
msRunning: 500
priority: 100
request: 5
- *medium
- className: large
msRunning: 100000
msRunning: 1000
priority: 200
request: 20
11 changes: 6 additions & 5 deletions test/scalability/testrunner/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ var _ reconcile.Reconciler = (*reconciler)(nil)
var _ predicate.Predicate = (*reconciler)(nil)

func (r *reconciler) Create(ev event.CreateEvent) bool {
if wl, isWl := (ev.Object).(*kueue.Workload); isWl {
wl, isWl := (ev.Object).(*kueue.Workload)
if isWl {
r.recorder.RecordWorkloadState(wl)
}
return false
return !isWl
}

func (r *reconciler) Delete(_ event.DeleteEvent) bool {
Expand All @@ -88,7 +89,7 @@ func (r *reconciler) Delete(_ event.DeleteEvent) bool {
func (r *reconciler) Update(ev event.UpdateEvent) bool {
wl, isWl := (ev.ObjectNew).(*kueue.Workload)
if !isWl {
return false
return true
}
admitted := apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadAdmitted)
r.setAdmittedTime(wl.UID, admitted)
Expand Down Expand Up @@ -116,7 +117,7 @@ func (r *reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
_ = workload.UnsetQuotaReservationWithCondition(&wl, "Pending", "Evicted by the test runner")
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
if err == nil {
log.V(2).Info("Finish eviction")
log.V(5).Info("Finish eviction")
}
return reconcile.Result{}, err
}
Expand All @@ -140,7 +141,7 @@ func (r *reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
} else {
err := workload.UpdateStatus(ctx, r.client, &wl, kueue.WorkloadFinished, metav1.ConditionTrue, "ByTest", "By test runner", constants.JobControllerName)
if err == nil {
log.V(2).Info("Finish Workload")
log.V(5).Info("Finish Workload")
}
return reconcile.Result{}, err
}
Expand Down
36 changes: 27 additions & 9 deletions test/scalability/testrunner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"path"
"sync"
"syscall"
"time"

zaplog "go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -132,6 +133,7 @@ func main() {
os.Exit(1)
}

wg.Add(1)
recorder, err := startRecorder(ctx, errCh, wg, generationDoneCh)
if err != nil {
log.Error(err, "recorder start")
Expand All @@ -151,19 +153,29 @@ func main() {

done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
endWithError := false
select {
case <-done:
log.Info("Done")
case err := <-errCh:
log.Error(err, "Error")
if err != nil {
log.Error(err, "Error")
endWithError = true
}
}
ctxCancel()

wg.Wait()

// stop the manager

// interpret events / reporting
if !endWithError {
err := recorder.WriteSummary(path.Join(*outputDir, "summary.yaml"))
if err != nil {
log.Error(err, "Writing summary")
}
err = recorder.WriteCQCsv(path.Join(*outputDir, "cqEvennts.csv"))
if err != nil {
log.Error(err, "Writing cq csv")
}
}
}

func runCommand(ctx context.Context, workDir, exe, kubeconfig string, withCPUProf, withLogs, logToFile bool, errCh chan<- error, wg *sync.WaitGroup) error {
Expand All @@ -180,6 +192,7 @@ func runCommand(ctx context.Context, workDir, exe, kubeconfig string, withCPUPro
args = append(args, "--cpuprofile", path.Join(workDir, fmt.Sprintf("%s.cpu.prof", exe)))
}

// TODO: having some "time" like stats could help
cmd := exec.CommandContext(ctx, cmdPath, args...)
cmd.Cancel = func() error {
log.Info("Stop the command")
Expand Down Expand Up @@ -266,13 +279,18 @@ func runGenerator(ctx context.Context, cfg *rest.Config, generatorConfig string,
func startRecorder(ctx context.Context, errCh chan<- error, wg *sync.WaitGroup, genDone <-chan struct{}) (*recorder.Recorder, error) {
defer wg.Done()
log := ctrl.LoggerFrom(ctx).WithName("Start recorder")

recorder := recorder.New()
//TODO: make the timeout an arg
recorder := recorder.New(30 * time.Minute)
wg.Add(1)
go func() {
defer wg.Done()
recorder.Run(ctx, genDone)
log.Info("Recorder done")
err := recorder.Run(ctx, genDone)
if err != nil {
log.Error(err, "Recorder run")
} else {
log.Info("Recorder done")
}
errCh <- err
}()

log.Info("Recorder started ")
Expand Down
Loading

0 comments on commit 96f452d

Please sign in to comment.