Skip to content

Commit

Permalink
[YUNIKORN-1748] Create test case which measures binding throughput us…
Browse files Browse the repository at this point in the history
…ing MockScheduler
  • Loading branch information
pbacsko committed Jul 27, 2023
1 parent 9cab77c commit 1368d84
Showing 1 changed file with 270 additions and 0 deletions.
270 changes: 270 additions & 0 deletions pkg/shim/scheduler_perf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package shim

import (
"context"
"fmt"
"io"
"os"
"runtime/pprof"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"gotest.tools/v3/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/apache/yunikorn-k8shim/pkg/common/constants"
)

var (
queueConfig = `
partitions:
- name: default
queues:
- name: root
submitacl: "*"
queues:
- name: a
- name: b
- name: c
- name: d
- name: e
`
enabled = false
profileCpu = true
profileHeap = true
numNodes = 5000
totalPods = int64(50_000)
nodeCpuMilli = int64(16_000)
nodeMemGiB = int64(16)
nodeNumPods = int64(110)
cpuProfilePath = "/tmp/yunikorn-cpu.pprof"
heapProfilePath = "/tmp/yunikorn-heap.pprof"
)

// Simple performance test which measures the theoretical throughput of the scheduler core.
// It's intended to run locally - "enabled" must be false by default so that it doesn't run during "make test"
func TestSchedulingThroughPut(t *testing.T) {
if !enabled {
return
}

cluster := &MockScheduler{}
cluster.init()
cluster.start()
defer cluster.stop()

if profileCpu {
f, err := os.Create(cpuProfilePath)
assert.NilError(t, err, "could not create file for cpu profile")
err = pprof.StartCPUProfile(f)
assert.NilError(t, err, "could not start cpu profiling")
defer pprof.StopCPUProfile()
}

if profileHeap {
f, err := os.Create(heapProfilePath)
assert.NilError(t, err, "could not create file for heap profile")

defer func(w io.Writer) {
err := pprof.WriteHeapProfile(w)
assert.NilError(t, err, "could not write heap profile")
}(f)
}

cluster.waitForSchedulerState(t, SchedulerStates().Running)
err := cluster.updateConfig(queueConfig)
assert.NilError(t, err, "update config failed")

for i := 0; i < numNodes; i++ {
addNode(cluster, "test.host."+strconv.Itoa(i))
}
// wait for nodes to get registered
time.Sleep(5 * time.Second)

addPodsToCluster(cluster)

collector := &metricsCollector{}
go collector.collectData()

err = wait.PollUntilContextTimeout(context.Background(), time.Second, time.Second*60, true, func(ctx context.Context) (done bool, err error) {
return cluster.GetPodBindStats().Success == totalPods, nil
})
assert.NilError(t, err, "scheduling did not finish in time")

b := cluster.GetPodBindStats()
diff := b.Last.Sub(b.First)
fmt.Printf("Overall throughput: %.0f allocations/s\n", float64(totalPods)/diff.Seconds())
fmt.Println("Container allocation throughput based on metrics")
for _, d := range collector.getData() {
if d != 0 {
fmt.Printf("%.0f container allocations/s\n", d)
}
}
}

func addPodsToCluster(cluster *MockScheduler) {
// make sure that total number of pods == totalPods
pods := getTestPods(80, 125, "root.a")
for _, pod := range pods {
cluster.AddPod(pod)
}
pods = getTestPods(80, 125, "root.b")
for _, pod := range pods {
cluster.AddPod(pod)
}
pods = getTestPods(80, 125, "root.c")
for _, pod := range pods {
cluster.AddPod(pod)
}
pods = getTestPods(80, 125, "root.d")
for _, pod := range pods {
cluster.AddPod(pod)
}
pods = getTestPods(80, 125, "root.e")
for _, pod := range pods {
cluster.AddPod(pod)
}
}

type metricsCollector struct {
podsPerSec []float64
sync.Mutex
}

func (m *metricsCollector) collectData() {
prev := float64(-1)

for {
time.Sleep(time.Second)

mfs, err := prometheus.DefaultGatherer.Gather()
if err != nil {
panic(err)
}

for _, mf := range mfs {
if strings.EqualFold(mf.GetName(), "yunikorn_scheduler_container_allocation_attempt_total") {
for _, metric := range mf.Metric {
for _, label := range metric.Label {
if *label.Name == "state" && *label.Value == "allocated" {
if prev == -1 {
prev = *metric.Counter.Value
continue
}
current := *metric.Counter.Value
m.update(current - prev)
prev = current
}
}
}
}
}
}
}

func (m *metricsCollector) update(data float64) {
m.Lock()
defer m.Unlock()
m.podsPerSec = append(m.podsPerSec, data)
}

func (m *metricsCollector) getData() []float64 {
m.Lock()
defer m.Unlock()
var data []float64
data = append(data, m.podsPerSec...)
return data
}

func getTestPods(noApps, noTasksPerApp int, queue string) []*v1.Pod {
podCount := noApps * noTasksPerApp
pods := make([]*v1.Pod, 0, podCount)
resources := make(map[v1.ResourceName]resource.Quantity)
resources[v1.ResourceCPU] = *resource.NewMilliQuantity(10, resource.DecimalSI)
resources[v1.ResourceMemory] = *resource.NewScaledQuantity(1, resource.Mega)

for i := 0; i < noApps; i++ {
appId := "app000" + strconv.Itoa(i) + "-" + strconv.FormatInt(time.Now().UnixMilli(), 10)
for j := 0; j < noTasksPerApp; j++ {
taskName := "task000" + strconv.Itoa(j)
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: taskName,
UID: types.UID("UID-" + appId + "-" + taskName),
Annotations: map[string]string{
constants.AnnotationApplicationID: appId,
constants.AnnotationQueueName: queue,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container-01",
Resources: v1.ResourceRequirements{
Requests: resources,
},
},
},
SchedulerName: constants.SchedulerName,
},
}
pods = append(pods, pod)
}
}

return pods
}

func addNode(cluster *MockScheduler, name string) {
node := &v1.Node{
Spec: v1.NodeSpec{
Unschedulable: false,
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID("UUID-" + name),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{Type: v1.NodeReady, Status: v1.ConditionTrue},
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(nodeCpuMilli, resource.DecimalSI),
v1.ResourceMemory: *resource.NewScaledQuantity(nodeMemGiB, resource.Giga),
v1.ResourcePods: *resource.NewScaledQuantity(nodeNumPods, resource.Scale(0)),
},
},
}

cluster.AddNode(node)
}

0 comments on commit 1368d84

Please sign in to comment.