Skip to content

Commit

Permalink
[kueuectl] Support paging on kueue CLI list commands. (kubernetes-sig…
Browse files Browse the repository at this point in the history
  • Loading branch information
mbobrovskyi authored and Fiona-Waters committed Jun 25, 2024
1 parent 23c4592 commit d842598
Show file tree
Hide file tree
Showing 11 changed files with 599 additions and 301 deletions.
28 changes: 28 additions & 0 deletions cmd/kueuectl/app/list/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,37 @@ limitations under the License.
package list

import (
"errors"
"os"
"strconv"

"github.com/spf13/cobra"
)

const (
defaultListRequestLimit = 100
KueuectlListRequestLimitEnvName = "KUEUECTL_LIST_REQUEST_LIMIT"
)

var (
invalidListRequestLimitError = errors.New("invalid list request limit")
)

func listRequestLimit() (int64, error) {
listRequestLimitEnv := os.Getenv(KueuectlListRequestLimitEnvName)

if len(listRequestLimitEnv) == 0 {
return defaultListRequestLimit, nil
}

limit, err := strconv.ParseInt(listRequestLimitEnv, 10, 64)
if err != nil {
return 0, invalidListRequestLimitError
}

return limit, nil
}

func addFieldSelectorFlagVar(cmd *cobra.Command, p *string) {
cmd.Flags().StringVar(p, "field-selector", "",
"Selector (field query) to filter on, supports '=', '==', and '!='.(e.g. --field-selector key1=value1,key2=value2). The server only supports a limited number of field queries per type.")
Expand Down
134 changes: 56 additions & 78 deletions cmd/kueuectl/app/list/list_clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ package list
import (
"context"
"errors"
"io"
"time"
"fmt"

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/duration"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/genericiooptions"
"k8s.io/cli-runtime/pkg/printers"
Expand All @@ -48,6 +45,7 @@ type ClusterQueueOptions struct {
// PrintFlags holds options necessary for obtaining a printer
PrintFlags *genericclioptions.PrintFlags

Limit int64
LabelSelector string
FieldSelector string

Expand All @@ -56,20 +54,15 @@ type ClusterQueueOptions struct {
// Active means the cluster queue has kueue.ClusterQueueActive condition with status=metav1.ConditionTrue
Active []bool

Client kueuev1beta1.KueueV1beta1Interface
clusterQueueListRequestLimit int64

PrintObj printers.ResourcePrinterFunc
Client kueuev1beta1.KueueV1beta1Interface

genericiooptions.IOStreams
}

func NewClusterQueueOptions(streams genericiooptions.IOStreams) *ClusterQueueOptions {
return &ClusterQueueOptions{
PrintFlags: genericclioptions.NewPrintFlags("").WithTypeSetter(scheme.Scheme),
Active: make([]bool, 0),
clusterQueueListRequestLimit: 300,
IOStreams: streams,
PrintFlags: genericclioptions.NewPrintFlags("").WithTypeSetter(scheme.Scheme),
IOStreams: streams,
}
}

Expand Down Expand Up @@ -110,16 +103,6 @@ func (o *ClusterQueueOptions) Complete(clientGetter util.ClientGetter, cmd *cobr

o.Client = clientset.KueueV1beta1()

if !o.PrintFlags.OutputFlagSpecified() {
o.PrintObj = printClusterQueueTable
} else {
printer, err := o.PrintFlags.ToPrinter()
if err != nil {
return err
}
o.PrintObj = printer.PrintObj
}

if len(args) > 0 {
activeFlag, err := cmd.Flags().GetBoolSlice("active")
if err != nil {
Expand All @@ -131,6 +114,21 @@ func (o *ClusterQueueOptions) Complete(clientGetter util.ClientGetter, cmd *cobr
return nil
}

func (o *ClusterQueueOptions) ToPrinter(headers bool) (printers.ResourcePrinterFunc, error) {
if !o.PrintFlags.OutputFlagSpecified() {
printer := newClusterQueueTablePrinter().
WithHeaders(headers)
return printer.PrintObj, nil
}

printer, err := o.PrintFlags.ToPrinter()
if err != nil {
return nil, err
}

return printer.PrintObj, nil
}

func (o *ClusterQueueOptions) Validate() error {
if !o.validActiveFlagOptionProvided() {
return errors.New("only one active flag can be provided")
Expand All @@ -144,35 +142,56 @@ func (o *ClusterQueueOptions) validActiveFlagOptionProvided() bool {

// Run prints the cluster queues.
func (o *ClusterQueueOptions) Run(ctx context.Context) error {
continueToken := ""
var totalCount int

opts := metav1.ListOptions{
LabelSelector: o.LabelSelector,
FieldSelector: o.FieldSelector,
Limit: o.Limit,
}

tabWriter := printers.GetNewTabWriter(o.Out)

for {
cql, err := o.Client.ClusterQueues().List(ctx, metav1.ListOptions{
LabelSelector: o.LabelSelector,
FieldSelector: o.FieldSelector,
Limit: o.clusterQueueListRequestLimit,
Continue: continueToken,
})
headers := totalCount == 0

list, err := o.Client.ClusterQueues().List(ctx, opts)
if err != nil {
return err
}
if len(cql.Items) == 0 {
return nil
}

o.applyActiveFilter(cql)
o.filterList(list)

if err := o.PrintObj(cql, o.Out); err != nil {
totalCount += len(list.Items)

printer, err := o.ToPrinter(headers)
if err != nil {
return err
}

if err := printer.PrintObj(list, tabWriter); err != nil {
return err
}

if cql.Continue == "" {
if list.Continue != "" {
opts.Continue = list.Continue
continue
}

if totalCount == 0 {
fmt.Fprintln(o.ErrOut, "No resources found")
return nil
}
continueToken = cql.Continue

if err := tabWriter.Flush(); err != nil {
return err
}

return nil
}
}

func (o *ClusterQueueOptions) applyActiveFilter(cql *v1beta1.ClusterQueueList) {
func (o *ClusterQueueOptions) filterList(cql *v1beta1.ClusterQueueList) {
if o.Active == nil || len(o.Active) == 0 {
return
}
Expand All @@ -189,47 +208,6 @@ func (o *ClusterQueueOptions) applyActiveFilter(cql *v1beta1.ClusterQueueList) {
cql.Items = filtered
}

// printClusterQueueTable is a printer function for ClusterQueueList objects.
var _ printers.ResourcePrinterFunc = printClusterQueueTable

func printClusterQueueTable(obj runtime.Object, out io.Writer) error {
tp := printers.NewTablePrinter(printers.PrintOptions{})
a := &metav1.Table{
ColumnDefinitions: []metav1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name"},
{Name: "Cohort", Type: "string"},
{Name: "Pending Workloads", Type: "integer"},
{Name: "Admitted Workloads", Type: "integer"},
{Name: "Active", Type: "boolean"},
{Name: "Age", Type: "string"},
},
Rows: toTableRows(obj.(*v1beta1.ClusterQueueList)),
}
return tp.PrintObj(a, out)
}

func toTableRows(list *v1beta1.ClusterQueueList) []metav1.TableRow {
rows := make([]metav1.TableRow, len(list.Items))
for index := range list.Items {
rows[index] = toTableRow(&list.Items[index])
}
return rows
}

func toTableRow(cq *v1beta1.ClusterQueue) metav1.TableRow {
return metav1.TableRow{
Object: runtime.RawExtension{Object: cq},
Cells: []interface{}{
cq.Name,
cq.Spec.Cohort,
cq.Status.PendingWorkloads,
cq.Status.AdmittedWorkloads,
isActiveStatus(cq),
duration.HumanDuration(time.Since(cq.CreationTimestamp.Time)),
},
}
}

func isActiveStatus(cq *v1beta1.ClusterQueue) bool {
return meta.IsStatusConditionTrue(cq.Status.Conditions, v1beta1.ClusterQueueActive)
}
92 changes: 92 additions & 0 deletions cmd/kueuectl/app/list/list_clusterqueue_printer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package list

import (
"errors"
"io"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/duration"
"k8s.io/cli-runtime/pkg/printers"

"sigs.k8s.io/kueue/apis/kueue/v1beta1"
)

type listClusterQueuePrinter struct {
printOptions printers.PrintOptions
}

var _ printers.ResourcePrinter = (*listClusterQueuePrinter)(nil)

func (p *listClusterQueuePrinter) PrintObj(obj runtime.Object, out io.Writer) error {
printer := printers.NewTablePrinter(p.printOptions)

list, ok := obj.(*v1beta1.ClusterQueueList)
if !ok {
return errors.New("invalid object type")
}

table := &metav1.Table{
ColumnDefinitions: []metav1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name"},
{Name: "Cohort", Type: "string"},
{Name: "Pending Workloads", Type: "integer"},
{Name: "Admitted Workloads", Type: "integer"},
{Name: "Active", Type: "boolean"},
{Name: "Age", Type: "string"},
},
Rows: printClusterQueueList(list),
}

return printer.PrintObj(table, out)
}

func (p *listClusterQueuePrinter) WithHeaders(f bool) *listClusterQueuePrinter {
p.printOptions.NoHeaders = !f
return p
}

func newClusterQueueTablePrinter() *listClusterQueuePrinter {
return &listClusterQueuePrinter{}
}

func printClusterQueueList(list *v1beta1.ClusterQueueList) []metav1.TableRow {
rows := make([]metav1.TableRow, len(list.Items))
for index := range list.Items {
rows[index] = printClusterQueue(&list.Items[index])
}
return rows
}

func printClusterQueue(clusterQueue *v1beta1.ClusterQueue) metav1.TableRow {
row := metav1.TableRow{
Object: runtime.RawExtension{Object: clusterQueue},
}
row.Cells = []any{
clusterQueue.Name,
clusterQueue.Spec.Cohort,
clusterQueue.Status.PendingWorkloads,
clusterQueue.Status.AdmittedWorkloads,
isActiveStatus(clusterQueue),
duration.HumanDuration(time.Since(clusterQueue.CreationTimestamp.Time)),
}

return row
}
Loading

0 comments on commit d842598

Please sign in to comment.