Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2832] [core] Add non-Yunikorn allocation tracking logic #975

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module github.com/apache/yunikorn-core
go 1.21

require (
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0
github.com/google/btree v1.1.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a h1:3WRXGTvhunGBZj8AVZDxx7Bs/AXiH9mvf2jYcuDyklA=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ func (cc *ClusterContext) processAllocations(request *si.AllocationRequest) {
continue
}
// at some point, we may need to handle new requests as well
if newAlloc {
if newAlloc && !alloc.IsForeign() {
cc.notifyRMNewAllocation(request.RmID, alloc)
}
}
Expand Down
100 changes: 96 additions & 4 deletions pkg/scheduler/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ import (
const pName = "default"

type mockEventHandler struct {
eventHandled bool
rejectedNodes []*si.RejectedNode
acceptedNodes []*si.AcceptedNode
eventHandled bool
rejectedNodes []*si.RejectedNode
acceptedNodes []*si.AcceptedNode
newAllocHandler func(*rmevent.RMNewAllocationsEvent)
}

func newMockEventHandler() *mockEventHandler {
Expand All @@ -56,6 +57,10 @@ func (m *mockEventHandler) HandleEvent(ev interface{}) {
m.rejectedNodes = append(m.rejectedNodes, nodeEvent.RejectedNodes...)
m.acceptedNodes = append(m.acceptedNodes, nodeEvent.AcceptedNodes...)
}

if allocEvent, ok := ev.(*rmevent.RMNewAllocationsEvent); ok && m.newAllocHandler != nil {
m.newAllocHandler(allocEvent)
}
}

func createTestContext(t *testing.T, partitionName string) *ClusterContext {
Expand All @@ -71,7 +76,13 @@ func createTestContext(t *testing.T, partitionName string) *ClusterContext {
Name: "root",
Parent: true,
SubmitACL: "*",
Queues: nil,
Queues: []configs.QueueConfig{
{
Name: "default",
Parent: false,
SubmitACL: "*",
},
},
},
},
}
Expand Down Expand Up @@ -296,6 +307,87 @@ func TestContextDrainingNodeBackToSchedulableMetrics(t *testing.T) {
verifyMetrics(t, 0, "draining")
}

func TestContext_OnAllocationNotification(t *testing.T) {
context := createTestContext(t, pName)
eventHandler := context.rmEventHandler.(*mockEventHandler) //nolint:errcheck
var lastAllocEvent *rmevent.RMNewAllocationsEvent
eventHandler.newAllocHandler = func(event *rmevent.RMNewAllocationsEvent) {
lastAllocEvent = event
go func() {
event.Channel <- &rmevent.Result{Succeeded: true}
}()
}

n := getNodeInfoForAddingNode()
err := context.addNode(n, true)
assert.NilError(t, err, "unexpected error returned from addNode")
partition := context.GetPartition(pName)
assert.Assert(t, partition != nil)
assert.Equal(t, 1, len(partition.GetNodes()), "expected node not found on partition")

// register application
appReq := &si.ApplicationRequest{
New: []*si.AddApplicationRequest{
{
QueueName: defQueue,
PartitionName: pName,
Ugi: &si.UserGroupInformation{
User: "testuser",
Groups: []string{"testgroup"},
},
ApplicationID: appID1,
},
},
RmID: "rm:123",
}
context.handleRMUpdateApplicationEvent(&rmevent.RMUpdateApplicationEvent{Request: appReq})

// add a Yunikorn allocation
allocReq := &si.AllocationRequest{
Allocations: []*si.Allocation{
{
AllocationKey: allocKey,
ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"first": {Value: 1},
},
},
ApplicationID: appID1,
NodeID: "test-1",
PartitionName: pName,
},
},
RmID: "rm:123",
}
context.handleRMUpdateAllocationEvent(&rmevent.RMUpdateAllocationEvent{Request: allocReq})
assert.Assert(t, lastAllocEvent != nil)
assert.Equal(t, lastAllocEvent.Allocations[0].AllocationKey, allocKey)

// add a non-Yunikorn allocation
lastAllocEvent = nil
nonYkAllocReq := &si.AllocationRequest{
Allocations: []*si.Allocation{
{
AllocationKey: "foreign-alloc-1",
ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"first": {Value: 1},
},
},
AllocationTags: map[string]string{
siCommon.Foreign: siCommon.AllocTypeDefault,
},
NodeID: "test-1",
PartitionName: pName,
},
},
RmID: "rm:123",
}

context.handleRMUpdateAllocationEvent(&rmevent.RMUpdateAllocationEvent{Request: nonYkAllocReq})
assert.Assert(t, lastAllocEvent == nil, "unexpected allocation event")
}

func getNodeInfoForAddingNode() *si.NodeInfo {
n := &si.NodeInfo{
NodeID: "test-1",
Expand Down
26 changes: 26 additions & 0 deletions pkg/scheduler/objects/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type Allocation struct {
originator bool
tags map[string]string
resKeyWithoutNode string // the reservation key without node
foreign bool
preemptable bool

// Mutable fields which need protection
allocated bool
Expand Down Expand Up @@ -106,6 +108,20 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation {
createTime = time.Unix(siCreationTime, 0)
}

foreign := false
preemptable := true
if foreignType, ok := alloc.AllocationTags[siCommon.Foreign]; ok {
foreign = true
switch foreignType {
case siCommon.AllocTypeStatic:
preemptable = false
case siCommon.AllocTypeDefault:
default:
log.Log(log.SchedAllocation).Warn("Foreign tag has illegal value, using default",
zap.String("value", foreignType))
}
}

var allocated bool
var nodeID string
var bindTime time.Time
Expand Down Expand Up @@ -135,6 +151,8 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation {
allocated: allocated,
nodeID: nodeID,
bindTime: bindTime,
foreign: foreign,
preemptable: preemptable,
}
}

Expand Down Expand Up @@ -573,3 +591,11 @@ func (a *Allocation) setUserQuotaCheckPassed() {
a.askEvents.SendRequestFitsInUserQuota(a.allocationKey, a.applicationID, a.allocatedResource)
}
}

func (a *Allocation) IsForeign() bool {
return a.foreign
}

func (a *Allocation) IsPreemptable() bool {
return a.preemptable
}
33 changes: 33 additions & 0 deletions pkg/scheduler/objects/allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,36 @@ func TestNewAllocFromSI(t *testing.T) {
assert.Assert(t, !alloc.IsAllowPreemptSelf(), "alloc should not have allow-preempt-self set")
assert.Assert(t, !alloc.IsAllowPreemptOther(), "alloc should not have allow-preempt-other set")
}

func TestNewForeignAllocFromSI(t *testing.T) {
res := resources.NewResourceFromMap(map[string]resources.Quantity{
"first": 1,
})
siAlloc := &si.Allocation{
AllocationKey: "foreign-1",
NodeID: "node-1",
ResourcePerAlloc: res.ToProto(),
TaskGroupName: "",
AllocationTags: map[string]string{
siCommon.Foreign: siCommon.AllocTypeDefault,
},
}

// default
alloc := NewAllocationFromSI(siAlloc)
assert.Assert(t, alloc.IsPreemptable())
assert.Assert(t, alloc.IsForeign())
assert.Equal(t, "foreign-1", alloc.GetAllocationKey())
assert.Equal(t, "node-1", alloc.GetNodeID())
assert.Assert(t, resources.Equals(res, alloc.GetAllocatedResource()))

// static
siAlloc.AllocationTags[siCommon.Foreign] = siCommon.AllocTypeStatic
alloc = NewAllocationFromSI(siAlloc)
assert.Assert(t, !alloc.IsPreemptable())

// illegal value for foreign type
siAlloc.AllocationTags[siCommon.Foreign] = "xyz"
alloc = NewAllocationFromSI(siAlloc)
assert.Assert(t, alloc.IsPreemptable())
}
51 changes: 45 additions & 6 deletions pkg/scheduler/objects/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,31 @@ func (sn *Node) GetAllAllocations() []*Allocation {
return arr
}

// GetYunikornAllocations returns a copy of Yunikorn allocations on this node
func (sn *Node) GetYunikornAllocations() []*Allocation {
sn.RLock()
defer sn.RUnlock()
return sn.getAllocations(false)
}

// GetForeignAllocations returns a copy of non-Yunikorn allocations on this node
func (sn *Node) GetForeignAllocations() []*Allocation {
sn.RLock()
defer sn.RUnlock()
return sn.getAllocations(true)
}

func (sn *Node) getAllocations(foreign bool) []*Allocation {
arr := make([]*Allocation, 0)
for _, v := range sn.allocations {
if v.IsForeign() == foreign {
arr = append(arr, v)
}
}

return arr
}

// Set the node to unschedulable.
// This will cause the node to be skipped during the scheduling cycle.
// Visible for testing only
Expand Down Expand Up @@ -312,15 +337,24 @@ func (sn *Node) FitInNode(resRequest *resources.Resource) bool {
// is found the Allocation removed is returned. Used resources will decrease available
// will increase as per the allocation removed.
func (sn *Node) RemoveAllocation(allocationKey string) *Allocation {
defer sn.notifyListeners()
var alloc *Allocation
defer func() {
if alloc != nil && !alloc.IsForeign() {
sn.notifyListeners()
}
}()
sn.Lock()
defer sn.Unlock()

alloc := sn.allocations[allocationKey]
alloc = sn.allocations[allocationKey]
if alloc != nil {
delete(sn.allocations, allocationKey)
sn.allocatedResource.SubFrom(alloc.GetAllocatedResource())
sn.allocatedResource.Prune()
if alloc.IsForeign() {
sn.occupiedResource = resources.Sub(sn.occupiedResource, alloc.GetAllocatedResource())
} else {
sn.allocatedResource.SubFrom(alloc.GetAllocatedResource())
sn.allocatedResource.Prune()
}
sn.availableResource.AddTo(alloc.GetAllocatedResource())
sn.nodeEvents.SendAllocationRemovedEvent(sn.NodeID, alloc.allocationKey, alloc.GetAllocatedResource())
return alloc
Expand Down Expand Up @@ -348,9 +382,10 @@ func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool {
return false
}
result := false
foreign := alloc.IsForeign()
defer func() {
// check result to ensure we don't notify listeners unnecessarily
if result {
if result && !foreign {
sn.notifyListeners()
}
}()
Expand All @@ -361,7 +396,11 @@ func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool {
res := alloc.GetAllocatedResource()
if force || sn.availableResource.FitIn(res) {
sn.allocations[alloc.GetAllocationKey()] = alloc
sn.allocatedResource.AddTo(res)
if foreign {
sn.occupiedResource = resources.Add(sn.occupiedResource, alloc.GetAllocatedResource())
} else {
sn.allocatedResource.AddTo(res)
}
sn.availableResource.SubFrom(res)
sn.availableResource.Prune()
sn.nodeEvents.SendAllocationAddedEvent(sn.NodeID, alloc.allocationKey, res)
Expand Down
Loading
Loading