Skip to content

Commit

Permalink
Merge 7d73065 into backport/numa-nodeids-serialization/brightly-fast-…
Browse files Browse the repository at this point in the history
…aphid
  • Loading branch information
hc-github-team-nomad-core authored Jun 11, 2024
2 parents ac9e364 + 7d73065 commit 36ef5f8
Show file tree
Hide file tree
Showing 19 changed files with 134 additions and 61 deletions.
2 changes: 1 addition & 1 deletion client/fingerprint/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (f *CPUFingerprint) setNUMA(response *FingerprintResponse) {
return
}

nodes := f.top.Nodes()
nodes := f.top.GetNodes()
response.AddAttribute("numa.node.count", f.nodes(nodes.Size()))

nodes.ForEach(func(id hw.NodeID) error {
Expand Down
2 changes: 1 addition & 1 deletion client/lib/idset/idset.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (s *Set[T]) Size() int {

// Empty returns whether the set is empty.
func (s *Set[T]) Empty() bool {
if s == nil {
if s == nil || s.items == nil {
return true
}
return s.items.Empty()
Expand Down
4 changes: 2 additions & 2 deletions client/lib/numalib/detect_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type MacOS struct{}

func (m *MacOS) ScanSystem(top *Topology) {
// all apple hardware is non-numa; just assume as much
top.NodeIDs = idset.Empty[hw.NodeID]()
top.NodeIDs.Insert(nodeID)
top.nodeIDs = idset.Empty[hw.NodeID]()
top.nodeIDs.Insert(nodeID)

// arch specific detection
switch m1cpu.IsAppleSilicon() {
Expand Down
5 changes: 3 additions & 2 deletions client/lib/numalib/detect_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ const (
func scanGeneric(top *Topology) {
// hardware may or may not be NUMA, but for now we only
// detect such topology on linux systems
top.NodeIDs = idset.Empty[hw.NodeID]()
top.NodeIDs.Insert(genericNodeID)
top.nodeIDs = idset.Empty[hw.NodeID]()
top.nodeIDs.Insert(genericNodeID)
top.Nodes = top.nodeIDs.Slice()

// cores
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down
23 changes: 13 additions & 10 deletions client/lib/numalib/detect_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,23 @@ func (*Sysfs) available() bool {
func (*Sysfs) discoverOnline(st *Topology, readerFunc pathReaderFn) {
ids, err := getIDSet[hw.NodeID](nodeOnline, readerFunc)
if err == nil {
st.NodeIDs = ids
st.nodeIDs = ids
st.Nodes = st.nodeIDs.Slice()
}
}

func (*Sysfs) discoverCosts(st *Topology, readerFunc pathReaderFn) {
if st.NodeIDs.Empty() {
if st.nodeIDs.Empty() {
return
}

dimension := st.NodeIDs.Size()
st.Distances = make(SLIT, st.NodeIDs.Size())
dimension := st.nodeIDs.Size()
st.Distances = make(SLIT, st.nodeIDs.Size())
for i := 0; i < dimension; i++ {
st.Distances[i] = make([]Cost, dimension)
}

_ = st.NodeIDs.ForEach(func(id hw.NodeID) error {
_ = st.nodeIDs.ForEach(func(id hw.NodeID) error {
s, err := getString(distanceFile, readerFunc, id)
if err != nil {
return err
Expand All @@ -104,20 +105,21 @@ func (*Sysfs) discoverCores(st *Topology, readerFunc pathReaderFn) {
st.Cores = make([]Core, onlineCores.Size())

switch {
case st.NodeIDs == nil:
case st.nodeIDs == nil:
// We did not find node data, no node to associate with
_ = onlineCores.ForEach(func(core hw.CoreID) error {
st.NodeIDs = idset.From[hw.NodeID]([]hw.NodeID{0})
st.nodeIDs = idset.From[hw.NodeID]([]hw.NodeID{0})
const node = 0
const socket = 0
cpuMax, _ := getNumeric[hw.KHz](cpuMaxFile, 64, readerFunc, core)
base, _ := getNumeric[hw.KHz](cpuBaseFile, 64, readerFunc, core)
st.insert(node, socket, core, Performance, cpuMax, base)
st.Nodes = st.nodeIDs.Slice()
return nil
})
default:
// We found node data, associate cores to nodes
_ = st.NodeIDs.ForEach(func(node hw.NodeID) error {
_ = st.nodeIDs.ForEach(func(node hw.NodeID) error {
s, err := readerFunc(fmt.Sprintf(cpulistFile, node))
if err != nil {
return err
Expand Down Expand Up @@ -231,7 +233,7 @@ func (s *Fallback) ScanSystem(top *Topology) {
broken := false

switch {
case top.NodeIDs.Empty():
case top.nodeIDs.Empty():
broken = true
case len(top.Distances) == 0:
broken = true
Expand All @@ -251,7 +253,8 @@ func (s *Fallback) ScanSystem(top *Topology) {

// we have a broken topology; reset it and fallback to the generic scanner
// basically treating this client like a windows / unsupported OS
top.NodeIDs = nil
top.nodeIDs = nil
top.Nodes = nil
top.Distances = nil
top.Cores = nil

Expand Down
12 changes: 7 additions & 5 deletions client/lib/numalib/detect_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestSysfs_discoverOnline(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
sy := &Sysfs{}
sy.discoverOnline(st, tt.readerFunc)
must.Eq(t, tt.expectedIDSet, st.NodeIDs)
must.Eq(t, tt.expectedIDSet, st.GetNodes())
})
}
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestSysfs_discoverCosts(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sy := &Sysfs{}
st.NodeIDs = tt.nodeIDs
st.SetNodes(tt.nodeIDs)
sy.discoverCosts(st, tt.readerFunc)
must.Eq(t, tt.expectedDistances, st.Distances)
})
Expand All @@ -136,7 +136,8 @@ func TestSysfs_discoverCores(t *testing.T) {

// issue#19372
{"one node and bad sys data", oneNode, badSysData, &Topology{
NodeIDs: oneNode,
nodeIDs: oneNode,
Nodes: oneNode.Slice(),
Cores: []Core{
{
SocketID: 0,
Expand All @@ -157,7 +158,8 @@ func TestSysfs_discoverCores(t *testing.T) {
},
}},
{"two nodes and good sys data", twoNodes, goodSysData, &Topology{
NodeIDs: twoNodes,
nodeIDs: twoNodes,
Nodes: twoNodes.Slice(),
Cores: []Core{
{
SocketID: 1,
Expand Down Expand Up @@ -197,7 +199,7 @@ func TestSysfs_discoverCores(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sy := &Sysfs{}
st.NodeIDs = tt.nodeIDs
st.SetNodes(tt.nodeIDs)
sy.discoverCores(st, tt.readerFunc)
must.Eq(t, tt.expectedTopology, st)
})
Expand Down
36 changes: 31 additions & 5 deletions client/lib/numalib/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ type (
// The JSON encoding is not used yet but my be part of the gRPC plumbing
// in the future.
type Topology struct {
NodeIDs *idset.Set[hw.NodeID]
// COMPAT: idset.Set wasn't being serialized correctly but we can't change
// the encoding of a field once its shipped. Nodes is the wire
// representation
nodeIDs *idset.Set[hw.NodeID]
Nodes []uint8

Distances SLIT
Cores []Core

Expand All @@ -66,7 +71,25 @@ type Topology struct {
// NewTopology is a constructor for the Topology object, only used in tests for
// mocking.
func NewTopology(nodeIDs *idset.Set[hw.NodeID], distances SLIT, cores []Core) *Topology {
return &Topology{NodeIDs: nodeIDs, Distances: distances, Cores: cores}
t := &Topology{
nodeIDs: nodeIDs,
Distances: distances, Cores: cores}
t.SetNodes(nodeIDs)
return t
}

func (t *Topology) SetNodes(nodes *idset.Set[hw.NodeID]) {
t.nodeIDs = nodes
if !nodes.Empty() {
t.Nodes = nodes.Slice()
} else {
t.Nodes = []uint8{}
}
}

func (t *Topology) SetNodesFrom(nodes []uint8) {
t.nodeIDs = idset.From[hw.NodeID](nodes)
t.Nodes = nodes
}

// A Core represents one logical (vCPU) core on a processor. Basically the slice
Expand Down Expand Up @@ -139,12 +162,15 @@ func (st *Topology) SupportsNUMA() bool {
}
}

// Nodes returns the set of NUMA Node IDs.
func (st *Topology) Nodes() *idset.Set[hw.NodeID] {
// GetNodes returns the set of NUMA Node IDs.
func (st *Topology) GetNodes() *idset.Set[hw.NodeID] {
if !st.SupportsNUMA() {
return nil
}
return st.NodeIDs
if st.nodeIDs.Empty() {
st.nodeIDs = idset.From[hw.NodeID](st.Nodes)
}
return st.nodeIDs
}

// NodeCores returns the set of Core IDs for the given NUMA Node ID.
Expand Down
2 changes: 1 addition & 1 deletion nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func TestJobEndpoint_Register_NonOverlapping(t *testing.T) {
node := mock.Node()
node.NodeResources.Processors = structs.NodeProcessorResources{
Topology: &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
Distances: numalib.SLIT{[]numalib.Cost{10}},
Cores: []numalib.Core{{
ID: 0,
Expand All @@ -142,6 +141,7 @@ func TestJobEndpoint_Register_NonOverlapping(t *testing.T) {
}},
},
}
node.NodeResources.Processors.Topology.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
node.NodeResources.Compatibility()
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1, node))

Expand Down
9 changes: 5 additions & 4 deletions nomad/structs/cpucompat_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ func topologyFromLegacyGeneric(old LegacyNodeCpuResources) *numalib.Topology {

withheld := (frequency * hw.MHz(coreCount)) - hw.MHz(old.CpuShares)

return &numalib.Topology{
// legacy: assume one node with id 0
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),

t := &numalib.Topology{
// legacy: with one node the distance matrix is 1-D
Distances: numalib.SLIT{{10}},

Expand All @@ -47,4 +44,8 @@ func topologyFromLegacyGeneric(old LegacyNodeCpuResources) *numalib.Topology {
// legacy: set since we can compute the value
OverrideWitholdCompute: withheld,
}

// legacy: assume one node with id 0
t.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
return t
}
9 changes: 5 additions & 4 deletions nomad/structs/cpucompat_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ func topologyFromLegacyLinux(old LegacyNodeCpuResources) *numalib.Topology {

withheld := (frequency * hw.MHz(old.TotalCpuCores)) - hw.MHz(old.CpuShares)

return &numalib.Topology{
// legacy: assume one node with id 0
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),

t := &numalib.Topology{
// legacy: with one node the distance matrix is 1-D
Distances: numalib.SLIT{{10}},

Expand All @@ -87,4 +84,8 @@ func topologyFromLegacyLinux(old LegacyNodeCpuResources) *numalib.Topology {
// legacy: set since we can compute the value
OverrideWitholdCompute: withheld,
}

// legacy: assume one node with id 0
t.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
return t
}
8 changes: 4 additions & 4 deletions nomad/structs/cpucompat_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func TestNUMA_topologyFromLegacy_plain(t *testing.T) {
result := topologyFromLegacy(old)

exp := &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
Distances: numalib.SLIT{{10}},
Cores: []numalib.Core{
makeLegacyCore(0),
Expand All @@ -40,12 +39,13 @@ func TestNUMA_topologyFromLegacy_plain(t *testing.T) {
OverrideTotalCompute: 12800,
OverrideWitholdCompute: 0,
}
exp.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))

// only compares total compute
must.Equal(t, exp, result)

// check underlying fields
must.Eq(t, exp.NodeIDs, result.NodeIDs)
must.Eq(t, exp.GetNodes(), result.GetNodes())
must.Eq(t, exp.Distances, result.Distances)
must.Eq(t, exp.Cores, result.Cores)
must.Eq(t, exp.OverrideTotalCompute, result.OverrideTotalCompute)
Expand All @@ -66,7 +66,6 @@ func TestNUMA_topologyFromLegacy_reservations(t *testing.T) {
result := topologyFromLegacy(old)

exp := &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
Distances: numalib.SLIT{{10}},
Cores: []numalib.Core{
makeLegacyCore(1),
Expand All @@ -76,12 +75,13 @@ func TestNUMA_topologyFromLegacy_reservations(t *testing.T) {
OverrideTotalCompute: 9600,
OverrideWitholdCompute: 3200, // core 0 excluded
}
exp.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))

// only compares total compute
must.Equal(t, exp, result)

// check underlying fields
must.Eq(t, exp.NodeIDs, result.NodeIDs)
must.Eq(t, exp.GetNodes(), result.GetNodes())
must.Eq(t, exp.Distances, result.Distances)
must.Eq(t, exp.Cores, result.Cores)
must.Eq(t, exp.OverrideTotalCompute, result.OverrideTotalCompute)
Expand Down
41 changes: 37 additions & 4 deletions nomad/structs/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,52 @@ package structs

import (
"reflect"

"github.com/hashicorp/nomad/client/lib/numalib"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/helper"
)

var (
// extendedTypes is a mapping of extended types to their extension function
// TODO: the duplicates could be simplified by looking up the base type in the case of a pointer type in ConvertExt
extendedTypes = map[reflect.Type]extendFunc{
reflect.TypeOf(Node{}): nodeExt,
reflect.TypeOf(&Node{}): nodeExt,
reflect.TypeOf(CSIVolume{}): csiVolumeExt,
reflect.TypeOf(&CSIVolume{}): csiVolumeExt,
reflect.TypeOf(Node{}): nodeExt,
reflect.TypeOf(&Node{}): nodeExt,
reflect.TypeOf(CSIVolume{}): csiVolumeExt,
reflect.TypeOf(&CSIVolume{}): csiVolumeExt,
reflect.TypeOf(&numalib.Topology{}): numaTopoExt,
}
)

// numaTopoExt is used to JSON encode topology to correctly handle the private
// idset.Set fields and so that NUMA NodeIDs are encoded as []int because
// go-msgpack will further JSON encode []uint8 into a base64-encoded bytestring,
// rather than an array
func numaTopoExt(v interface{}) interface{} {
topo := v.(*numalib.Topology)

var nodes []int
if topo.GetNodes() != nil {
nodes = helper.ConvertSlice(
topo.GetNodes().Slice(), func(n uint8) int { return int(n) })
}

return &struct {
Nodes []int
Distances numalib.SLIT
Cores []numalib.Core
OverrideTotalCompute hw.MHz
OverrideWitholdCompute hw.MHz
}{
Nodes: nodes,
Distances: topo.Distances,
Cores: topo.Cores,
OverrideTotalCompute: topo.OverrideTotalCompute,
OverrideWitholdCompute: topo.OverrideWitholdCompute,
}
}

// nodeExt ensures the node is sanitized and adds the legacy field .Drain back to encoded Node objects
func nodeExt(v interface{}) interface{} {
node := v.(*Node).Sanitize()
Expand Down
Loading

0 comments on commit 36ef5f8

Please sign in to comment.