Skip to content

Commit

Permalink
Record events when QuotaReservation and Admition take place. (#1436) (#…
Browse files Browse the repository at this point in the history
…1458)

* Use different events for QuotaReservation and Admission

* Review Remarks

* Review Remarks

* Review Remarks
  • Loading branch information
trasc committed Dec 14, 2023
1 parent a05301a commit 41de014
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 14 deletions.
9 changes: 5 additions & 4 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
)

const (
KueueName = "kueue"
JobControllerName = KueueName + "-job-controller"
AdmissionName = KueueName + "-admission"
ReclaimablePodsMgr = KueueName + "-reclaimable-pods"
KueueName = "kueue"
JobControllerName = KueueName + "-job-controller"
WorkloadControllerName = KueueName + "-workload-controller"
AdmissionName = KueueName + "-admission"
ReclaimablePodsMgr = KueueName + "-reclaimable-pods"

// UpdatesBatchPeriod is the batch period to hold workload updates
// before syncing a Queue and ClusterQueue objects.
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

config "sigs.k8s.io/kueue/apis/config/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/queue"
)

Expand Down Expand Up @@ -61,7 +62,10 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache
if err := cqRec.SetupWithManager(mgr); err != nil {
return "ClusterQueue", err
}
if err := NewWorkloadReconciler(mgr.GetClient(), qManager, cc, WithWorkloadUpdateWatchers(qRec, cqRec), WithPodsReadyTimeout(podsReadyTimeout(cfg))).SetupWithManager(mgr); err != nil {
if err := NewWorkloadReconciler(mgr.GetClient(), qManager, cc,
mgr.GetEventRecorderFor(constants.WorkloadControllerName),
WithWorkloadUpdateWatchers(qRec, cqRec),
WithPodsReadyTimeout(podsReadyTimeout(cfg))).SetupWithManager(mgr); err != nil {
return "Workload", err
}
return "", nil
Expand Down
15 changes: 13 additions & 2 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -95,9 +96,10 @@ type WorkloadReconciler struct {
client client.Client
watchers []WorkloadUpdateWatcher
podsReadyTimeout *time.Duration
recorder record.EventRecorder
}

func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache, opts ...Option) *WorkloadReconciler {
func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache, recorder record.EventRecorder, opts ...Option) *WorkloadReconciler {
options := defaultOptions
for _, opt := range opts {
opt(&options)
Expand All @@ -110,6 +112,7 @@ func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *c
cache: cache,
watchers: options.watchers,
podsReadyTimeout: options.podsReadyTimeout,
recorder: recorder,
}
}

Expand Down Expand Up @@ -153,7 +156,15 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}

if workload.SyncAdmittedCondition(&wl) {
return ctrl.Result{}, workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
if err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true); err != nil {
return ctrl.Result{}, err
}
if workload.IsAdmitted(&wl) {
c := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadQuotaReserved)
r.recorder.Eventf(&wl, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was %.0fs", wl.Status.Admission.ClusterQueue, time.Since(c.LastTransitionTime.Time).Seconds())

}
return ctrl.Result{}, nil
}

if workload.HasQuotaReservation(&wl) {
Expand Down
67 changes: 67 additions & 0 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,23 @@ limitations under the License.
package core

import (
"context"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/queue"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
)

func TestAdmittedNotReadyWorkload(t *testing.T) {
Expand Down Expand Up @@ -290,3 +297,63 @@ func TestSyncCheckStates(t *testing.T) {
})
}
}

func TestReconcile(t *testing.T) {
cases := map[string]struct {
workload *kueue.Workload
wantError error
wantEvents []utiltesting.EventRecord
}{
"admit": {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
}).
Obj(),
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "ns", Name: "wl"},
EventType: "Normal",
Reason: "Admitted",
},
},
},
"already admited": {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
}).
Obj(),
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
objs := []client.Object{tc.workload}
clientBuilder := utiltesting.NewClientBuilder().WithObjects(objs...).WithStatusSubresource(objs...)
cl := clientBuilder.Build()
recorder := &utiltesting.EventRecorder{}

cqCache := cache.New(cl)
qManager := queue.NewManager(cl, cqCache)
reconciler := NewWorkloadReconciler(cl, qManager, cqCache, recorder)

ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()

_, gotError := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(tc.workload)})

if diff := cmp.Diff(tc.wantError, gotError); diff != "" {
t.Errorf("unexpected reconcile error (-want/+got):\n%s", diff)
}

if diff := cmp.Diff(tc.wantEvents, recorder.RecordedEvents, cmpopts.IgnoreFields(utiltesting.EventRecord{}, "Message")); diff != "" {
t.Errorf("unexpected events (-want/+got):\n%s", diff)
}
})
}
}
12 changes: 10 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -450,8 +451,15 @@ func (s *Scheduler) admit(ctx context.Context, e *entry, mustHaveChecks sets.Set
s.admissionRoutineWrapper.Run(func() {
err := s.applyAdmission(ctx, newWorkload)
if err == nil {
waitTime := time.Since(e.Obj.CreationTimestamp.Time)
s.recorder.Eventf(newWorkload, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time was %.0fs", admission.ClusterQueue, waitTime.Seconds())
waitStarted := e.Obj.CreationTimestamp.Time
if c := apimeta.FindStatusCondition(e.Obj.Status.Conditions, kueue.WorkloadEvicted); c != nil {
waitStarted = c.LastTransitionTime.Time
}
waitTime := time.Since(waitStarted)
s.recorder.Eventf(newWorkload, corev1.EventTypeNormal, "QuotaReserved", "Quota reserved in ClusterQueue %v, wait time since queued was %.0fs", admission.ClusterQueue, waitTime.Seconds())
if workload.IsAdmitted(newWorkload) {
s.recorder.Eventf(newWorkload, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was 0s ", admission.ClusterQueue)
}
metrics.AdmittedWorkload(admission.ClusterQueue, waitTime)
log.V(2).Info("Workload successfully admitted and assigned flavors", "assignments", admission.PodSetAssignments)
return
Expand Down
72 changes: 67 additions & 5 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -194,14 +195,21 @@ func TestSchedule(t *testing.T) {

// disable partial admission
disablePartialAdmission bool

// ignored if empty, the Message is ignored (it contains the duration)
wantEvents []utiltesting.EventRecord
}{
"workload fits in single clusterQueue": {
"workload fits in single clusterQueue, with check state ready": {
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("foo", "sales").
Queue("main").
PodSets(*utiltesting.MakePodSet("one", 10).
Request(corev1.ResourceCPU, "1").
Obj()).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
}).
Obj(),
},
wantAssignments: map[string]kueue.Admission{
Expand All @@ -222,6 +230,57 @@ func TestSchedule(t *testing.T) {
},
},
wantScheduled: []string{"sales/foo"},
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "sales", Name: "foo"},
Reason: "QuotaReserved",
EventType: corev1.EventTypeNormal,
},
{
Key: types.NamespacedName{Namespace: "sales", Name: "foo"},
Reason: "Admitted",
EventType: corev1.EventTypeNormal,
},
},
},
"workload fits in single clusterQueue, with check state pending": {
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("foo", "sales").
Queue("main").
PodSets(*utiltesting.MakePodSet("one", 10).
Request(corev1.ResourceCPU, "1").
Obj()).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStatePending,
}).
Obj(),
},
wantAssignments: map[string]kueue.Admission{
"sales/foo": {
ClusterQueue: "sales",
PodSetAssignments: []kueue.PodSetAssignment{
{
Name: "one",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "default",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10000m"),
},
Count: ptr.To[int32](10),
},
},
},
},
wantScheduled: []string{"sales/foo"},
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "sales", Name: "foo"},
Reason: "QuotaReserved",
EventType: corev1.EventTypeNormal,
},
},
},
"error during admission": {
workloads: []kueue.Workload{
Expand Down Expand Up @@ -1044,7 +1103,6 @@ func TestSchedule(t *testing.T) {
defer features.SetFeatureGateDuringTest(t, features.PartialAdmission, false)()
}
ctx, _ := utiltesting.ContextWithLog(t)
scheme := runtime.NewScheme()

allQueues := append(queues, tc.additionalLocalQueues...)
allClusterQueues := append(clusterQueues, tc.additionalClusterQueues...)
Expand All @@ -1058,9 +1116,7 @@ func TestSchedule(t *testing.T) {
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "sales", Labels: map[string]string{"dep": "sales"}}},
)
cl := clientBuilder.Build()
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme,
corev1.EventSource{Component: constants.AdmissionName})
recorder := &utiltesting.EventRecorder{}
cqCache := cache.New(cl)
qManager := queue.NewManager(cl, cqCache)
// Workloads are loaded into queues or clusterQueues as we add them.
Expand Down Expand Up @@ -1153,6 +1209,12 @@ func TestSchedule(t *testing.T) {
if diff := cmp.Diff(tc.wantInadmissibleLeft, qDumpInadmissible); diff != "" {
t.Errorf("Unexpected elements left in inadmissible workloads (-want,+got):\n%s", diff)
}

if len(tc.wantEvents) > 0 {
if diff := cmp.Diff(tc.wantEvents, recorder.RecordedEvents, cmpopts.IgnoreFields(utiltesting.EventRecord{}, "Message")); diff != "" {
t.Errorf("unexpected events (-want/+got):\n%s", diff)
}
}
})
}
}
Expand Down
39 changes: 39 additions & 0 deletions pkg/util/testing/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package testing

import (
"context"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -64,3 +67,39 @@ func (b *builderIndexer) IndexField(ctx context.Context, obj client.Object, fiel
func AsIndexer(builder *fake.ClientBuilder) client.FieldIndexer {
return &builderIndexer{ClientBuilder: builder}
}

type EventRecord struct {
Key types.NamespacedName
EventType string
Reason string
Message string
// add annotations if ever needed
}

type EventRecorder struct {
lock sync.Mutex
RecordedEvents []EventRecord
}

func (tr *EventRecorder) Event(object runtime.Object, eventtype, reason, message string) {
tr.Eventf(object, eventtype, reason, message)
}

func (tr *EventRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
tr.AnnotatedEventf(object, nil, eventtype, reason, messageFmt, args...)
}

func (tr *EventRecorder) AnnotatedEventf(targetObject runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
tr.lock.Lock()
defer tr.lock.Unlock()
key := types.NamespacedName{}
if cobj, iscobj := targetObject.(client.Object); iscobj {
key = client.ObjectKeyFromObject(cobj)
}
tr.RecordedEvents = append(tr.RecordedEvents, EventRecord{
Key: key,
EventType: eventtype,
Reason: reason,
Message: fmt.Sprintf(messageFmt, args...),
})
}

0 comments on commit 41de014

Please sign in to comment.