Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record events when QuotaReservation and Admition take place. #1436

Merged
merged 4 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
)

const (
KueueName = "kueue"
JobControllerName = KueueName + "-job-controller"
AdmissionName = KueueName + "-admission"
ReclaimablePodsMgr = KueueName + "-reclaimable-pods"
KueueName = "kueue"
JobControllerName = KueueName + "-job-controller"
WorkloadControllerName = KueueName + "-workload-controller"
AdmissionName = KueueName + "-admission"
ReclaimablePodsMgr = KueueName + "-reclaimable-pods"

// UpdatesBatchPeriod is the batch period to hold workload updates
// before syncing a Queue and ClusterQueue objects.
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

config "sigs.k8s.io/kueue/apis/config/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/queue"
)

Expand Down Expand Up @@ -61,7 +62,10 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache
if err := cqRec.SetupWithManager(mgr); err != nil {
return "ClusterQueue", err
}
if err := NewWorkloadReconciler(mgr.GetClient(), qManager, cc, WithWorkloadUpdateWatchers(qRec, cqRec), WithPodsReadyTimeout(podsReadyTimeout(cfg))).SetupWithManager(mgr); err != nil {
if err := NewWorkloadReconciler(mgr.GetClient(), qManager, cc,
mgr.GetEventRecorderFor(constants.WorkloadControllerName),
WithWorkloadUpdateWatchers(qRec, cqRec),
WithPodsReadyTimeout(podsReadyTimeout(cfg))).SetupWithManager(mgr); err != nil {
return "Workload", err
}
return "", nil
Expand Down
15 changes: 13 additions & 2 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -95,9 +96,10 @@ type WorkloadReconciler struct {
client client.Client
watchers []WorkloadUpdateWatcher
podsReadyTimeout *time.Duration
recorder record.EventRecorder
}

func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache, opts ...Option) *WorkloadReconciler {
func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache, recorder record.EventRecorder, opts ...Option) *WorkloadReconciler {
options := defaultOptions
for _, opt := range opts {
opt(&options)
Expand All @@ -110,6 +112,7 @@ func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *c
cache: cache,
watchers: options.watchers,
podsReadyTimeout: options.podsReadyTimeout,
recorder: recorder,
}
}

Expand Down Expand Up @@ -153,7 +156,15 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}

if workload.SyncAdmittedCondition(&wl) {
return ctrl.Result{}, workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
if err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true); err != nil {
return ctrl.Result{}, err
}
if workload.IsAdmitted(&wl) {
c := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadQuotaReserved)
r.recorder.Eventf(&wl, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was %.0fs", wl.Status.Admission.ClusterQueue, time.Since(c.LastTransitionTime.Time).Seconds())

}
return ctrl.Result{}, nil
}

if workload.HasQuotaReservation(&wl) {
Expand Down
67 changes: 67 additions & 0 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,23 @@ limitations under the License.
package core

import (
"context"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/queue"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
)

func TestAdmittedNotReadyWorkload(t *testing.T) {
Expand Down Expand Up @@ -290,3 +297,63 @@ func TestSyncCheckStates(t *testing.T) {
})
}
}

func TestReconcile(t *testing.T) {
cases := map[string]struct {
workload *kueue.Workload
wantError error
wantEvents []utiltesting.EventRecord
}{
"admit": {
trasc marked this conversation as resolved.
Show resolved Hide resolved
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
}).
Obj(),
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "ns", Name: "wl"},
EventType: "Normal",
Reason: "Admitted",
},
},
},
"already admited": {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
}).
Obj(),
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
objs := []client.Object{tc.workload}
clientBuilder := utiltesting.NewClientBuilder().WithObjects(objs...).WithStatusSubresource(objs...)
cl := clientBuilder.Build()
recorder := &utiltesting.EventRecorder{}

cqCache := cache.New(cl)
qManager := queue.NewManager(cl, cqCache)
reconciler := NewWorkloadReconciler(cl, qManager, cqCache, recorder)

ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()

_, gotError := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(tc.workload)})

if diff := cmp.Diff(tc.wantError, gotError); diff != "" {
t.Errorf("unexpected reconcile error (-want/+got):\n%s", diff)
}

if diff := cmp.Diff(tc.wantEvents, recorder.RecordedEvents, cmpopts.IgnoreFields(utiltesting.EventRecord{}, "Message")); diff != "" {
t.Errorf("unexpected events (-want/+got):\n%s", diff)
}
})
}
}
12 changes: 10 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -450,8 +451,15 @@ func (s *Scheduler) admit(ctx context.Context, e *entry, mustHaveChecks sets.Set
s.admissionRoutineWrapper.Run(func() {
err := s.applyAdmission(ctx, newWorkload)
if err == nil {
waitTime := time.Since(e.Obj.CreationTimestamp.Time)
s.recorder.Eventf(newWorkload, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time was %.0fs", admission.ClusterQueue, waitTime.Seconds())
waitStarted := e.Obj.CreationTimestamp.Time
if c := apimeta.FindStatusCondition(e.Obj.Status.Conditions, kueue.WorkloadEvicted); c != nil {
waitStarted = c.LastTransitionTime.Time
}
waitTime := time.Since(waitStarted)
s.recorder.Eventf(newWorkload, corev1.EventTypeNormal, "QuotaReserved", "Quota reserved in ClusterQueue %v, wait time since queued was %.0fs", admission.ClusterQueue, waitTime.Seconds())
if workload.IsAdmitted(newWorkload) {
s.recorder.Eventf(newWorkload, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was 0s ", admission.ClusterQueue)
}
metrics.AdmittedWorkload(admission.ClusterQueue, waitTime)
log.V(2).Info("Workload successfully admitted and assigned flavors", "assignments", admission.PodSetAssignments)
return
Expand Down
72 changes: 67 additions & 5 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -194,14 +195,21 @@ func TestSchedule(t *testing.T) {

// disable partial admission
disablePartialAdmission bool

// ignored if empty, the Message is ignored (it contains the duration)
wantEvents []utiltesting.EventRecord
}{
"workload fits in single clusterQueue": {
"workload fits in single clusterQueue, with check state ready": {
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("foo", "sales").
Queue("main").
PodSets(*utiltesting.MakePodSet("one", 10).
Request(corev1.ResourceCPU, "1").
Obj()).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
}).
Obj(),
},
wantAssignments: map[string]kueue.Admission{
Expand All @@ -222,6 +230,57 @@ func TestSchedule(t *testing.T) {
},
},
wantScheduled: []string{"sales/foo"},
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "sales", Name: "foo"},
Reason: "QuotaReserved",
EventType: corev1.EventTypeNormal,
},
{
Key: types.NamespacedName{Namespace: "sales", Name: "foo"},
Reason: "Admitted",
EventType: corev1.EventTypeNormal,
},
},
},
"workload fits in single clusterQueue, with check state pending": {
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("foo", "sales").
Queue("main").
PodSets(*utiltesting.MakePodSet("one", 10).
Request(corev1.ResourceCPU, "1").
Obj()).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStatePending,
}).
Obj(),
},
wantAssignments: map[string]kueue.Admission{
"sales/foo": {
ClusterQueue: "sales",
PodSetAssignments: []kueue.PodSetAssignment{
{
Name: "one",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "default",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10000m"),
},
Count: ptr.To[int32](10),
},
},
},
},
wantScheduled: []string{"sales/foo"},
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "sales", Name: "foo"},
Reason: "QuotaReserved",
EventType: corev1.EventTypeNormal,
},
},
},
"error during admission": {
workloads: []kueue.Workload{
Expand Down Expand Up @@ -1044,7 +1103,6 @@ func TestSchedule(t *testing.T) {
defer features.SetFeatureGateDuringTest(t, features.PartialAdmission, false)()
}
ctx, _ := utiltesting.ContextWithLog(t)
scheme := runtime.NewScheme()

allQueues := append(queues, tc.additionalLocalQueues...)
allClusterQueues := append(clusterQueues, tc.additionalClusterQueues...)
Expand All @@ -1058,9 +1116,7 @@ func TestSchedule(t *testing.T) {
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "sales", Labels: map[string]string{"dep": "sales"}}},
)
cl := clientBuilder.Build()
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme,
corev1.EventSource{Component: constants.AdmissionName})
recorder := &utiltesting.EventRecorder{}
cqCache := cache.New(cl)
qManager := queue.NewManager(cl, cqCache)
// Workloads are loaded into queues or clusterQueues as we add them.
Expand Down Expand Up @@ -1153,6 +1209,12 @@ func TestSchedule(t *testing.T) {
if diff := cmp.Diff(tc.wantInadmissibleLeft, qDumpInadmissible); diff != "" {
t.Errorf("Unexpected elements left in inadmissible workloads (-want,+got):\n%s", diff)
}

if len(tc.wantEvents) > 0 {
trasc marked this conversation as resolved.
Show resolved Hide resolved
if diff := cmp.Diff(tc.wantEvents, recorder.RecordedEvents, cmpopts.IgnoreFields(utiltesting.EventRecord{}, "Message")); diff != "" {
t.Errorf("unexpected events (-want/+got):\n%s", diff)
}
}
})
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/testing/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package testing
import (
"context"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -76,6 +77,7 @@ type EventRecord struct {
}

type EventRecorder struct {
lock sync.Mutex
RecordedEvents []EventRecord
}

Expand All @@ -88,6 +90,8 @@ func (tr *EventRecorder) Eventf(object runtime.Object, eventtype, reason, messag
}

func (tr *EventRecorder) AnnotatedEventf(targetObject runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
tr.lock.Lock()
defer tr.lock.Unlock()
key := types.NamespacedName{}
if cobj, iscobj := targetObject.(client.Object); iscobj {
key = client.ObjectKeyFromObject(cobj)
Expand Down