Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of numa: fix scheduler panic due to topology serialization bug into release/1.8.x #23300

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/fingerprint/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (f *CPUFingerprint) setNUMA(response *FingerprintResponse) {
return
}

nodes := f.top.Nodes()
nodes := f.top.GetNodes()
response.AddAttribute("numa.node.count", f.nodes(nodes.Size()))

nodes.ForEach(func(id hw.NodeID) error {
Expand Down
2 changes: 1 addition & 1 deletion client/lib/idset/idset.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (s *Set[T]) Size() int {

// Empty returns whether the set is empty.
func (s *Set[T]) Empty() bool {
if s == nil {
if s == nil || s.items == nil {
return true
}
return s.items.Empty()
Expand Down
4 changes: 2 additions & 2 deletions client/lib/numalib/detect_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type MacOS struct{}

func (m *MacOS) ScanSystem(top *Topology) {
// all apple hardware is non-numa; just assume as much
top.NodeIDs = idset.Empty[hw.NodeID]()
top.NodeIDs.Insert(nodeID)
top.nodeIDs = idset.Empty[hw.NodeID]()
top.nodeIDs.Insert(nodeID)

// arch specific detection
switch m1cpu.IsAppleSilicon() {
Expand Down
5 changes: 3 additions & 2 deletions client/lib/numalib/detect_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ const (
func scanGeneric(top *Topology) {
// hardware may or may not be NUMA, but for now we only
// detect such topology on linux systems
top.NodeIDs = idset.Empty[hw.NodeID]()
top.NodeIDs.Insert(genericNodeID)
top.nodeIDs = idset.Empty[hw.NodeID]()
top.nodeIDs.Insert(genericNodeID)
top.Nodes = top.nodeIDs.Slice()

// cores
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down
23 changes: 13 additions & 10 deletions client/lib/numalib/detect_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,23 @@ func (*Sysfs) available() bool {
func (*Sysfs) discoverOnline(st *Topology, readerFunc pathReaderFn) {
ids, err := getIDSet[hw.NodeID](nodeOnline, readerFunc)
if err == nil {
st.NodeIDs = ids
st.nodeIDs = ids
st.Nodes = st.nodeIDs.Slice()
}
}

func (*Sysfs) discoverCosts(st *Topology, readerFunc pathReaderFn) {
if st.NodeIDs.Empty() {
if st.nodeIDs.Empty() {
return
}

dimension := st.NodeIDs.Size()
st.Distances = make(SLIT, st.NodeIDs.Size())
dimension := st.nodeIDs.Size()
st.Distances = make(SLIT, st.nodeIDs.Size())
for i := 0; i < dimension; i++ {
st.Distances[i] = make([]Cost, dimension)
}

_ = st.NodeIDs.ForEach(func(id hw.NodeID) error {
_ = st.nodeIDs.ForEach(func(id hw.NodeID) error {
s, err := getString(distanceFile, readerFunc, id)
if err != nil {
return err
Expand All @@ -104,20 +105,21 @@ func (*Sysfs) discoverCores(st *Topology, readerFunc pathReaderFn) {
st.Cores = make([]Core, onlineCores.Size())

switch {
case st.NodeIDs == nil:
case st.nodeIDs == nil:
// We did not find node data, no node to associate with
_ = onlineCores.ForEach(func(core hw.CoreID) error {
st.NodeIDs = idset.From[hw.NodeID]([]hw.NodeID{0})
st.nodeIDs = idset.From[hw.NodeID]([]hw.NodeID{0})
const node = 0
const socket = 0
cpuMax, _ := getNumeric[hw.KHz](cpuMaxFile, 64, readerFunc, core)
base, _ := getNumeric[hw.KHz](cpuBaseFile, 64, readerFunc, core)
st.insert(node, socket, core, Performance, cpuMax, base)
st.Nodes = st.nodeIDs.Slice()
return nil
})
default:
// We found node data, associate cores to nodes
_ = st.NodeIDs.ForEach(func(node hw.NodeID) error {
_ = st.nodeIDs.ForEach(func(node hw.NodeID) error {
s, err := readerFunc(fmt.Sprintf(cpulistFile, node))
if err != nil {
return err
Expand Down Expand Up @@ -231,7 +233,7 @@ func (s *Fallback) ScanSystem(top *Topology) {
broken := false

switch {
case top.NodeIDs.Empty():
case top.nodeIDs.Empty():
broken = true
case len(top.Distances) == 0:
broken = true
Expand All @@ -251,7 +253,8 @@ func (s *Fallback) ScanSystem(top *Topology) {

// we have a broken topology; reset it and fallback to the generic scanner
// basically treating this client like a windows / unsupported OS
top.NodeIDs = nil
top.nodeIDs = nil
top.Nodes = nil
top.Distances = nil
top.Cores = nil

Expand Down
12 changes: 7 additions & 5 deletions client/lib/numalib/detect_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestSysfs_discoverOnline(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
sy := &Sysfs{}
sy.discoverOnline(st, tt.readerFunc)
must.Eq(t, tt.expectedIDSet, st.NodeIDs)
must.Eq(t, tt.expectedIDSet, st.GetNodes())
})
}
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestSysfs_discoverCosts(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sy := &Sysfs{}
st.NodeIDs = tt.nodeIDs
st.SetNodes(tt.nodeIDs)
sy.discoverCosts(st, tt.readerFunc)
must.Eq(t, tt.expectedDistances, st.Distances)
})
Expand All @@ -136,7 +136,8 @@ func TestSysfs_discoverCores(t *testing.T) {

// issue#19372
{"one node and bad sys data", oneNode, badSysData, &Topology{
NodeIDs: oneNode,
nodeIDs: oneNode,
Nodes: oneNode.Slice(),
Cores: []Core{
{
SocketID: 0,
Expand All @@ -157,7 +158,8 @@ func TestSysfs_discoverCores(t *testing.T) {
},
}},
{"two nodes and good sys data", twoNodes, goodSysData, &Topology{
NodeIDs: twoNodes,
nodeIDs: twoNodes,
Nodes: twoNodes.Slice(),
Cores: []Core{
{
SocketID: 1,
Expand Down Expand Up @@ -197,7 +199,7 @@ func TestSysfs_discoverCores(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sy := &Sysfs{}
st.NodeIDs = tt.nodeIDs
st.SetNodes(tt.nodeIDs)
sy.discoverCores(st, tt.readerFunc)
must.Eq(t, tt.expectedTopology, st)
})
Expand Down
36 changes: 31 additions & 5 deletions client/lib/numalib/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ type (
// The JSON encoding is not used yet but my be part of the gRPC plumbing
// in the future.
type Topology struct {
NodeIDs *idset.Set[hw.NodeID]
// COMPAT: idset.Set wasn't being serialized correctly but we can't change
// the encoding of a field once its shipped. Nodes is the wire
// representation
nodeIDs *idset.Set[hw.NodeID]
Nodes []uint8

Distances SLIT
Cores []Core

Expand All @@ -66,7 +71,25 @@ type Topology struct {
// NewTopology is a constructor for the Topology object, only used in tests for
// mocking.
func NewTopology(nodeIDs *idset.Set[hw.NodeID], distances SLIT, cores []Core) *Topology {
return &Topology{NodeIDs: nodeIDs, Distances: distances, Cores: cores}
t := &Topology{
nodeIDs: nodeIDs,
Distances: distances, Cores: cores}
t.SetNodes(nodeIDs)
return t
}

func (t *Topology) SetNodes(nodes *idset.Set[hw.NodeID]) {
t.nodeIDs = nodes
if !nodes.Empty() {
t.Nodes = nodes.Slice()
} else {
t.Nodes = []uint8{}
}
}

func (t *Topology) SetNodesFrom(nodes []uint8) {
t.nodeIDs = idset.From[hw.NodeID](nodes)
t.Nodes = nodes
}

// A Core represents one logical (vCPU) core on a processor. Basically the slice
Expand Down Expand Up @@ -139,12 +162,15 @@ func (st *Topology) SupportsNUMA() bool {
}
}

// Nodes returns the set of NUMA Node IDs.
func (st *Topology) Nodes() *idset.Set[hw.NodeID] {
// GetNodes returns the set of NUMA Node IDs.
func (st *Topology) GetNodes() *idset.Set[hw.NodeID] {
if !st.SupportsNUMA() {
return nil
}
return st.NodeIDs
if st.nodeIDs.Empty() {
st.nodeIDs = idset.From[hw.NodeID](st.Nodes)
}
return st.nodeIDs
}

// NodeCores returns the set of Core IDs for the given NUMA Node ID.
Expand Down
2 changes: 1 addition & 1 deletion nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func TestJobEndpoint_Register_NonOverlapping(t *testing.T) {
node := mock.Node()
node.NodeResources.Processors = structs.NodeProcessorResources{
Topology: &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
Distances: numalib.SLIT{[]numalib.Cost{10}},
Cores: []numalib.Core{{
ID: 0,
Expand All @@ -142,6 +141,7 @@ func TestJobEndpoint_Register_NonOverlapping(t *testing.T) {
}},
},
}
node.NodeResources.Processors.Topology.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
node.NodeResources.Compatibility()
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1, node))

Expand Down
9 changes: 5 additions & 4 deletions nomad/structs/cpucompat_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ func topologyFromLegacyGeneric(old LegacyNodeCpuResources) *numalib.Topology {

withheld := (frequency * hw.MHz(coreCount)) - hw.MHz(old.CpuShares)

return &numalib.Topology{
// legacy: assume one node with id 0
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),

t := &numalib.Topology{
// legacy: with one node the distance matrix is 1-D
Distances: numalib.SLIT{{10}},

Expand All @@ -47,4 +44,8 @@ func topologyFromLegacyGeneric(old LegacyNodeCpuResources) *numalib.Topology {
// legacy: set since we can compute the value
OverrideWitholdCompute: withheld,
}

// legacy: assume one node with id 0
t.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
return t
}
9 changes: 5 additions & 4 deletions nomad/structs/cpucompat_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ func topologyFromLegacyLinux(old LegacyNodeCpuResources) *numalib.Topology {

withheld := (frequency * hw.MHz(old.TotalCpuCores)) - hw.MHz(old.CpuShares)

return &numalib.Topology{
// legacy: assume one node with id 0
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),

t := &numalib.Topology{
// legacy: with one node the distance matrix is 1-D
Distances: numalib.SLIT{{10}},

Expand All @@ -87,4 +84,8 @@ func topologyFromLegacyLinux(old LegacyNodeCpuResources) *numalib.Topology {
// legacy: set since we can compute the value
OverrideWitholdCompute: withheld,
}

// legacy: assume one node with id 0
t.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
return t
}
8 changes: 4 additions & 4 deletions nomad/structs/cpucompat_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func TestNUMA_topologyFromLegacy_plain(t *testing.T) {
result := topologyFromLegacy(old)

exp := &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
Distances: numalib.SLIT{{10}},
Cores: []numalib.Core{
makeLegacyCore(0),
Expand All @@ -40,12 +39,13 @@ func TestNUMA_topologyFromLegacy_plain(t *testing.T) {
OverrideTotalCompute: 12800,
OverrideWitholdCompute: 0,
}
exp.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))

// only compares total compute
must.Equal(t, exp, result)

// check underlying fields
must.Eq(t, exp.NodeIDs, result.NodeIDs)
must.Eq(t, exp.GetNodes(), result.GetNodes())
must.Eq(t, exp.Distances, result.Distances)
must.Eq(t, exp.Cores, result.Cores)
must.Eq(t, exp.OverrideTotalCompute, result.OverrideTotalCompute)
Expand All @@ -66,7 +66,6 @@ func TestNUMA_topologyFromLegacy_reservations(t *testing.T) {
result := topologyFromLegacy(old)

exp := &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
Distances: numalib.SLIT{{10}},
Cores: []numalib.Core{
makeLegacyCore(1),
Expand All @@ -76,12 +75,13 @@ func TestNUMA_topologyFromLegacy_reservations(t *testing.T) {
OverrideTotalCompute: 9600,
OverrideWitholdCompute: 3200, // core 0 excluded
}
exp.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))

// only compares total compute
must.Equal(t, exp, result)

// check underlying fields
must.Eq(t, exp.NodeIDs, result.NodeIDs)
must.Eq(t, exp.GetNodes(), result.GetNodes())
must.Eq(t, exp.Distances, result.Distances)
must.Eq(t, exp.Cores, result.Cores)
must.Eq(t, exp.OverrideTotalCompute, result.OverrideTotalCompute)
Expand Down
41 changes: 37 additions & 4 deletions nomad/structs/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,52 @@ package structs

import (
"reflect"

"github.com/hashicorp/nomad/client/lib/numalib"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/helper"
)

var (
// extendedTypes is a mapping of extended types to their extension function
// TODO: the duplicates could be simplified by looking up the base type in the case of a pointer type in ConvertExt
extendedTypes = map[reflect.Type]extendFunc{
reflect.TypeOf(Node{}): nodeExt,
reflect.TypeOf(&Node{}): nodeExt,
reflect.TypeOf(CSIVolume{}): csiVolumeExt,
reflect.TypeOf(&CSIVolume{}): csiVolumeExt,
reflect.TypeOf(Node{}): nodeExt,
reflect.TypeOf(&Node{}): nodeExt,
reflect.TypeOf(CSIVolume{}): csiVolumeExt,
reflect.TypeOf(&CSIVolume{}): csiVolumeExt,
reflect.TypeOf(&numalib.Topology{}): numaTopoExt,
}
)

// numaTopoExt is used to JSON encode topology to correctly handle the private
// idset.Set fields and so that NUMA NodeIDs are encoded as []int because
// go-msgpack will further JSON encode []uint8 into a base64-encoded bytestring,
// rather than an array
func numaTopoExt(v interface{}) interface{} {
topo := v.(*numalib.Topology)

var nodes []int
if topo.GetNodes() != nil {
nodes = helper.ConvertSlice(
topo.GetNodes().Slice(), func(n uint8) int { return int(n) })
}

return &struct {
Nodes []int
Distances numalib.SLIT
Cores []numalib.Core
OverrideTotalCompute hw.MHz
OverrideWitholdCompute hw.MHz
}{
Nodes: nodes,
Distances: topo.Distances,
Cores: topo.Cores,
OverrideTotalCompute: topo.OverrideTotalCompute,
OverrideWitholdCompute: topo.OverrideWitholdCompute,
}
}

// nodeExt ensures the node is sanitized and adds the legacy field .Drain back to encoded Node objects
func nodeExt(v interface{}) interface{} {
node := v.(*Node).Sanitize()
Expand Down
Loading
Loading