Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sidecar scheduling #3706

Merged
merged 2 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 46 additions & 32 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,11 @@ type TaskInfo struct {
// LastTransaction holds the context of last scheduling transaction
LastTransaction *TransactionContext

Priority int32
VolumeReady bool
Preemptable bool
BestEffort bool
Priority int32
VolumeReady bool
Preemptable bool
BestEffort bool
HasRestartableInitContainer bool

// RevocableZone supports setting volcano.sh/revocable-zone annotation or label for pod/podgroup
// we only support empty value or * value for this version and we will support specify revocable zone name for future releases
Expand Down Expand Up @@ -176,23 +177,25 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
revocableZone := GetPodRevocableZone(pod)
topologyInfo := GetPodTopologyInfo(pod)
role := getTaskRole(pod)
hasRestartableInitContainer := hasRestartableInitContainer(pod)

jobID := getJobID(pod)

ti := &TaskInfo{
UID: TaskID(pod.UID),
Job: jobID,
Name: pod.Name,
Namespace: pod.Namespace,
TaskRole: role,
Priority: 1,
Pod: pod,
Resreq: resReq,
InitResreq: initResReq,
Preemptable: preemptable,
BestEffort: bestEffort,
RevocableZone: revocableZone,
NumaInfo: topologyInfo,
UID: TaskID(pod.UID),
Job: jobID,
Name: pod.Name,
Namespace: pod.Namespace,
TaskRole: role,
Priority: 1,
Pod: pod,
Resreq: resReq,
InitResreq: initResReq,
Preemptable: preemptable,
BestEffort: bestEffort,
HasRestartableInitContainer: hasRestartableInitContainer,
RevocableZone: revocableZone,
NumaInfo: topologyInfo,
TransactionContext: TransactionContext{
NodeName: pod.Spec.NodeName,
Status: getTaskStatus(pod),
Expand Down Expand Up @@ -254,21 +257,22 @@ func (ti *TaskInfo) UnsetPodResourceDecision() {
// Clone is used for cloning a task
func (ti *TaskInfo) Clone() *TaskInfo {
return &TaskInfo{
UID: ti.UID,
Job: ti.Job,
Name: ti.Name,
Namespace: ti.Namespace,
TaskRole: ti.TaskRole,
Priority: ti.Priority,
PodVolumes: ti.PodVolumes,
Pod: ti.Pod,
Resreq: ti.Resreq.Clone(),
InitResreq: ti.InitResreq.Clone(),
VolumeReady: ti.VolumeReady,
Preemptable: ti.Preemptable,
BestEffort: ti.BestEffort,
RevocableZone: ti.RevocableZone,
NumaInfo: ti.NumaInfo.Clone(),
UID: ti.UID,
Job: ti.Job,
Name: ti.Name,
Namespace: ti.Namespace,
TaskRole: ti.TaskRole,
Priority: ti.Priority,
PodVolumes: ti.PodVolumes,
Pod: ti.Pod,
Resreq: ti.Resreq.Clone(),
InitResreq: ti.InitResreq.Clone(),
VolumeReady: ti.VolumeReady,
Preemptable: ti.Preemptable,
BestEffort: ti.BestEffort,
HasRestartableInitContainer: ti.HasRestartableInitContainer,
RevocableZone: ti.RevocableZone,
NumaInfo: ti.NumaInfo.Clone(),
TransactionContext: TransactionContext{
NodeName: ti.NodeName,
Status: ti.Status,
Expand All @@ -277,6 +281,16 @@ func (ti *TaskInfo) Clone() *TaskInfo {
}
}

// hasRestartableInitContainer returns whether pod has restartable container.
func hasRestartableInitContainer(pod *v1.Pod) bool {
for _, c := range pod.Spec.InitContainers {
if c.RestartPolicy != nil && *c.RestartPolicy == v1.ContainerRestartPolicyAlways {
return true
}
}
return false
}

// String returns the taskInfo details in a string
func (ti TaskInfo) String() string {
res := fmt.Sprintf("Task (%v:%v/%v): taskSpec %s, job %v, status %v, pri %v, "+
Expand Down
24 changes: 21 additions & 3 deletions pkg/scheduler/api/pod_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
)

// Refer k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go#GetResourceRequest.
// Refer k8s.io/kubernetes/pkg/api/v1/resource/helpers.go#PodRequests.
//
// GetResourceRequest returns a *Resource that covers the largest width in each resource dimension.
// Because init-containers run sequentially, we collect the max in each dimension iteratively.
Expand Down Expand Up @@ -59,10 +59,28 @@ import (
func GetPodResourceRequest(pod *v1.Pod) *Resource {
result := GetPodResourceWithoutInitContainers(pod)

// take max_resource(sum_pod, any_init_container)
restartableInitContainerReqs := EmptyResource()
initContainerReqs := EmptyResource()
for _, container := range pod.Spec.InitContainers {
result.SetMaxResource(NewResource(container.Resources.Requests))
containerReq := NewResource(container.Resources.Requests)

if container.RestartPolicy != nil && *container.RestartPolicy == v1.ContainerRestartPolicyAlways {
// Add the restartable container's req to the resulting cumulative container requests.
result.Add(containerReq)

// Track our cumulative restartable init container resources
restartableInitContainerReqs.Add(containerReq)
containerReq = restartableInitContainerReqs
} else {
tmp := EmptyResource()
tmp.Add(containerReq)
tmp.Add(restartableInitContainerReqs)
containerReq = tmp
}
initContainerReqs.SetMaxResource(containerReq)
}

result.SetMaxResource(initContainerReqs)
result.AddScalar(v1.ResourcePods, 1)

return result
Expand Down
192 changes: 192 additions & 0 deletions pkg/scheduler/api/pod_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
)

func TestGetPodResourceRequest(t *testing.T) {
restartAlways := v1.ContainerRestartPolicyAlways
tests := []struct {
name string
pod *v1.Pod
Expand Down Expand Up @@ -86,6 +87,197 @@ func TestGetPodResourceRequest(t *testing.T) {
},
expectedResource: buildResource("3000m", "5G", map[string]string{"pods": "1"}, 0),
},
// test case with restartable containers, mainly derived from k8s.io/kubernetes/pkg/api/v1/resource/helpers_test.go#TestPodResourceRequests
{
name: "restartable init container",
// restartable init + regular container
expectedResource: buildResource("2", "0", map[string]string{"pods": "1"}, 0),
pod: &v1.Pod{
Spec: v1.PodSpec{
InitContainers: []v1.Container{
{
Name: "restartable-init-1",
RestartPolicy: &restartAlways,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
},
},
},
},

Containers: []v1.Container{
{
Name: "container-1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
},
},
},
},
},
},
},
{
name: "multiple restartable init containers",
// max(5, restartable init containers(3+2+1) + regular(1)) = 7
expectedResource: buildResource("7", "0", map[string]string{"pods": "1"}, 0),
pod: &v1.Pod{
Spec: v1.PodSpec{
InitContainers: []v1.Container{
{
Name: "init-1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("5"),
},
},
},
{
Name: "restartable-init-1",
RestartPolicy: &restartAlways,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
},
},
},
{
Name: "restartable-init-2",
RestartPolicy: &restartAlways,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
{
Name: "restartable-init-3",
RestartPolicy: &restartAlways,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("3"),
},
},
},
},
Containers: []v1.Container{
{
Name: "container-1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
},
},
},
},
},
},
},
{
name: "multiple restartable and regular init containers",
// init-2 requires 5 + the previously running restartable init
// containers(1+2) = 8, the restartable init container that starts
// after it doesn't count
expectedResource: buildResource("8", "0", map[string]string{"pods": "1"}, 0),
pod: &v1.Pod{
Spec: v1.PodSpec{
InitContainers: []v1.Container{
{
Name: "init-1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("5"),
},
},
},
{
Name: "restartable-init-1",
RestartPolicy: &restartAlways,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
},
},
},
{
Name: "restartable-init-2",
RestartPolicy: &restartAlways,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
},
{
Name: "init-2",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("5"),
},
},
},
{
Name: "restartable-init-3",
RestartPolicy: &restartAlways,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("3"),
},
},
},
},
Containers: []v1.Container{
{
Name: "container-1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
},
},
},
},
},
},
},
{
name: "restartable-init, init and regular",
expectedResource: buildResource("210", "0", map[string]string{"pods": "1"}, 0),
pod: &v1.Pod{
Spec: v1.PodSpec{
InitContainers: []v1.Container{
{
Name: "restartable-init-1",
RestartPolicy: &restartAlways,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("10"),
},
},
},
{
Name: "init-1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200"),
},
},
},
},
Containers: []v1.Container{
{
Name: "container-1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100"),
},
},
},
},
},
},
},
}

for i, test := range tests {
Expand Down
10 changes: 10 additions & 0 deletions pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
EnableVolumeCapacityPriority: utilFeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
EnableNodeInclusionPolicyInPodTopologySpread: utilFeature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread),
EnableMatchLabelKeysInPodTopologySpread: utilFeature.DefaultFeatureGate.Enabled(features.MatchLabelKeysInPodTopologySpread),
EnableSidecarContainers: utilFeature.DefaultFeatureGate.Enabled(features.SidecarContainers),
}
// Initialize k8s plugins
// TODO: Add more predicates, k8s.io/kubernetes/pkg/scheduler/framework/plugins/legacy_registry.go
Expand Down Expand Up @@ -338,6 +339,15 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
return err
}
}
// Check restartable container
if !features.EnableSidecarContainers && task.HasRestartableInitContainer {
// Scheduler will calculate resources usage for a Pod containing
// restartable init containers that will be equal or more than kubelet will
// require to run the Pod. So there will be no overbooking. However, to
// avoid the inconsistency in resource calculation between the scheduler
// and the older (before v1.28) kubelet, make the Pod unschedulable.
return fmt.Errorf("Pod has a restartable init container and the SidecarContainers feature is disabled")
}

// InterPodAffinity Predicate
// TODO: Update the node information to be processed by the filer based on the node list returned by the prefilter.
Expand Down
Loading