Skip to content

Commit

Permalink
[YUNIKORN-2834] [shim] Add non-YuniKorn allocation tracking logic
Browse files Browse the repository at this point in the history
  • Loading branch information
pbacsko committed Oct 3, 2024
1 parent 8a4585f commit f642183
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 234 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ go 1.22.0
toolchain go1.22.5

require (
github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d
github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d h1:awo2goBrw25P1aFNZgYJ0q7V+5ycMqMhvI60B75OzQg=
github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d/go.mod h1:q6OXYpCTGvMJxsEorpIF6icKM/IioMmU6KcsclV1kI0=
github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf h1:wKySiY4IA9Us287QRnIxFnuTHXaMSeQ3BhAwSrSW/sQ=
github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf/go.mod h1:q6OXYpCTGvMJxsEorpIF6icKM/IioMmU6KcsclV1kI0=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
Expand Down
59 changes: 20 additions & 39 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,8 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) {

if !common.Equals(prevCapacity, newCapacity) {
// update capacity
if capacity, occupied, ok := ctx.schedulerCache.UpdateCapacity(node.Name, newCapacity); ok {
if err := ctx.updateNodeResources(node, capacity, occupied); err != nil {
log.Log(log.ShimContext).Warn("Failed to update node capacity", zap.Error(err))
}
} else {
log.Log(log.ShimContext).Warn("Failed to update cached node capacity", zap.String("nodeName", node.Name))
if err := ctx.updateNodeResources(node, newCapacity, nil); err != nil {
log.Log(log.ShimContext).Warn("Failed to update node capacity", zap.Error(err))

Check warning on line 219 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L219

Added line #L219 was not covered by tests
}
}
}
Expand Down Expand Up @@ -370,7 +366,11 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) {
zap.String("podName", pod.Name),
zap.String("podStatusBefore", podStatusBefore),
zap.String("podStatusCurrent", string(pod.Status.Phase)))
ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.AddOccupiedResource)
allocReq := common.CreateAllocationForForeignPod(pod)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(allocReq); err != nil {
log.Log(log.ShimContext).Error("failed to add foreign allocation to the core",
zap.Error(err))

Check warning on line 372 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L371-L372

Added lines #L371 - L372 were not covered by tests
}
} else {
// pod is orphaned (references an unknown node)
log.Log(log.ShimContext).Info("skipping occupied resource update for assigned orphaned pod",
Expand All @@ -394,8 +394,12 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) {
zap.String("podStatusCurrent", string(pod.Status.Phase)))
// this means pod is terminated
// we need sub the occupied resource and re-sync with the scheduler-core
ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.SubOccupiedResource)
ctx.schedulerCache.RemovePod(pod)
releaseReq := common.CreateReleaseRequestForForeignPod(string(pod.UID), constants.DefaultPartition)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil {
log.Log(log.ShimContext).Error("failed to remove foreign allocation from the core",
zap.Error(err))

Check warning on line 401 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L400-L401

Added lines #L400 - L401 were not covered by tests
}
} else {
// pod is orphaned (references an unknown node)
log.Log(log.ShimContext).Info("skipping occupied resource update for terminated orphaned pod",
Expand Down Expand Up @@ -441,40 +445,17 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
}

func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
oldPod := ctx.schedulerCache.GetPod(string(pod.UID))
if oldPod == nil {
// if pod is not in scheduler cache, no node updates are needed
log.Log(log.ShimContext).Debug("unknown foreign pod deleted, no resource updated needed",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name))
return
releaseReq := common.CreateReleaseRequestForForeignPod(string(pod.UID), constants.DefaultPartition)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil {
log.Log(log.ShimContext).Error("failed to remove foreign allocation from the core",
zap.Error(err))

Check warning on line 451 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L450-L451

Added lines #L450 - L451 were not covered by tests
}

// conditions for release:
// 1. pod is already assigned to a node
// 2. pod was not in a terminal state before
// 3. pod references a known node
if !utils.IsPodTerminated(oldPod) {
if !ctx.schedulerCache.IsPodOrphaned(string(oldPod.UID)) {
log.Log(log.ShimContext).Debug("foreign pod deleted, triggering occupied resource update",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("podStatusBefore", string(oldPod.Status.Phase)),
zap.String("podStatusCurrent", string(pod.Status.Phase)))
// this means pod is terminated
// we need sub the occupied resource and re-sync with the scheduler-core
ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.SubOccupiedResource)
} else {
// pod is orphaned (references an unknown node)
log.Log(log.ShimContext).Info("skipping occupied resource update for removed orphaned pod",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("nodeName", pod.Spec.NodeName))
}
ctx.schedulerCache.RemovePod(pod)
}
log.Log(log.ShimContext).Debug("removing pod from cache", zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
}

//nolint:unused
func (ctx *Context) updateNodeOccupiedResources(nodeName string, namespace string, podName string, resource *si.Resource, opt schedulercache.UpdateType) {
if common.IsZero(resource) {
return
Expand Down Expand Up @@ -1560,7 +1541,7 @@ func (ctx *Context) decommissionNode(node *v1.Node) error {
}

func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource, occupied *si.Resource) error {
request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, occupied)
request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, nil)
return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
}

Expand Down
Loading

0 comments on commit f642183

Please sign in to comment.