Skip to content

Commit

Permalink
Skip requests in non-leading replicas on not found errors
Browse files Browse the repository at this point in the history
  • Loading branch information
astefanutti committed Jan 12, 2024
1 parent 47efd37 commit f2b2656
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/core/admissioncheck_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,5 +226,5 @@ func (r *AdmissionCheckReconciler) SetupWithManager(mgr ctrl.Manager, cfg *confi
WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}).
WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler).
WithEventFilter(r).
Complete(WithLeadingManager(mgr, r, cfg))
Complete(WithLeadingManager(mgr, r, &kueue.AdmissionCheck{}, cfg))
}
2 changes: 1 addition & 1 deletion pkg/controller/core/clusterqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.
WatchesRawSource(&source.Channel{Source: r.acUpdateCh}, &acHandler).
WatchesRawSource(&source.Channel{Source: r.snapUpdateCh}, &snapHandler).
WithEventFilter(r).
Complete(WithLeadingManager(mgr, r, cfg))
Complete(WithLeadingManager(mgr, r, &kueue.ClusterQueue{}, cfg))
}

func (r *ClusterQueueReconciler) updateCqStatusIfChanged(
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller/core/leader_aware_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

config "sigs.k8s.io/kueue/apis/config/v1beta1"
Expand All @@ -47,7 +48,7 @@ const defaultRequeueDuration = 15 * time.Second
// in the non-leading replicas.
// - Transition to actually reconciling requests in the replica that may acquire
// the leader election lease, in case the previously leading replica failed to renew it.
func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler, cfg *config.Configuration) reconcile.Reconciler {
func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler, obj client.Object, cfg *config.Configuration) reconcile.Reconciler {
// Do not decorate the reconciler if leader election is disabled
if cfg.LeaderElection == nil || !ptr.Deref(cfg.LeaderElection.LeaderElect, false) {
return reconciler
Expand All @@ -63,14 +64,18 @@ func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler, cfg *

return &leaderAwareReconciler{
elected: mgr.Elected(),
client: mgr.GetClient(),
delegate: reconciler,
object: obj,
requeueDuration: requeueDuration,
}
}

type leaderAwareReconciler struct {
elected <-chan struct{}
client client.Client
delegate reconcile.Reconciler
object client.Object
requeueDuration time.Duration
}

Expand All @@ -82,6 +87,10 @@ func (r *leaderAwareReconciler) Reconcile(ctx context.Context, request reconcile
// The manager has been elected leader, delegate reconciliation to the provided reconciler.
return r.delegate.Reconcile(ctx, request)
default:
if err := r.client.Get(ctx, request.NamespacedName, r.object); err != nil {
// Discard the reconciliation request so requests are not re-enqueue indefinitely.
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// The manager hasn't been elected leader yet, requeue the reconciliation request
// to prevent against any missed / discarded events over the period it takes
// to fail over a new leading replica, which can take as much as the configured
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/core/localqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (r *LocalQueueReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Co
WatchesRawSource(&source.Channel{Source: r.wlUpdateCh}, &qWorkloadHandler{}).
Watches(&kueue.ClusterQueue{}, &queueCQHandler).
WithEventFilter(r).
Complete(WithLeadingManager(mgr, r, cfg))
Complete(WithLeadingManager(mgr, r, &kueue.LocalQueue{}, cfg))
}

func (r *LocalQueueReconciler) UpdateStatusIfChanged(
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/core/resourceflavor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (r *ResourceFlavorReconciler) SetupWithManager(mgr ctrl.Manager, cfg *confi
WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}).
WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler).
WithEventFilter(r).
Complete(WithLeadingManager(mgr, r, cfg))
Complete(WithLeadingManager(mgr, r, &kueue.ResourceFlavor{}, cfg))
}

func resourceFlavors(cq *kueue.ClusterQueue) sets.Set[kueue.ResourceFlavorReference] {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Conf
Watches(&nodev1.RuntimeClass{}, ruh).
Watches(&kueue.ClusterQueue{}, &workloadCqHandler{client: r.client}).
WithEventFilter(r).
Complete(WithLeadingManager(mgr, r, cfg))
Complete(WithLeadingManager(mgr, r, &kueue.Workload{}, cfg))
}

// admittedNotReadyWorkload returns as a pair of values. The first boolean determines
Expand Down

0 comments on commit f2b2656

Please sign in to comment.