Skip to content

Commit

Permalink
Add events for config daemon
Browse files Browse the repository at this point in the history
K8s Events on SriovNetworkNodeStates objects
will be created on following stages:

- Status change of SriovNetworkNodeStates
- Start of config daemon
- Node reboot started
- Node drain started/finished

Signed-off-by: Fred Rolland <frolland@nvidia.com>
  • Loading branch information
rollandf committed Oct 10, 2023
1 parent 8d53a20 commit 1ee7e8a
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 4 deletions.
8 changes: 7 additions & 1 deletion cmd/sriov-network-config-daemon/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,11 @@ func runStartCmd(cmd *cobra.Command, args []string) {
glog.V(0).Info("dev mode enabled")
}

eventRecorder := daemon.NewEventRecorder(writerclient, startOpts.nodeName, kubeclient)
defer eventRecorder.Shutdown()

glog.V(0).Info("starting node writer")
nodeWriter := daemon.NewNodeStateStatusWriter(writerclient, startOpts.nodeName, closeAllConns, devMode)
nodeWriter := daemon.NewNodeStateStatusWriter(writerclient, startOpts.nodeName, closeAllConns, eventRecorder, devMode)

destdir := os.Getenv("DEST_DIR")
if destdir == "" {
Expand All @@ -187,6 +190,8 @@ func runStartCmd(cmd *cobra.Command, args []string) {
panic(err.Error())
}

eventRecorder.SendEvent("ConfigDaemonStart", "Config Daemon starting")

// block the deamon process until nodeWriter finish first its run
err = nodeWriter.RunOnce(destdir, platformType)
if err != nil {
Expand All @@ -207,6 +212,7 @@ func runStartCmd(cmd *cobra.Command, args []string) {
refreshCh,
platformType,
startOpts.systemd,
eventRecorder,
devMode,
).Run(stopCh, exitCh)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ type Daemon struct {
storeManager utils.StoreManagerInterface

hostManager host.HostManagerInterface

eventRecorder *EventRecorder
}

const (
Expand Down Expand Up @@ -149,6 +151,7 @@ func New(
refreshCh chan<- Message,
platformType utils.PlatformType,
useSystemdService bool,
er *EventRecorder,
devMode bool,
) *Daemon {
return &Daemon{
Expand Down Expand Up @@ -186,6 +189,7 @@ func New(
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(updateDelay), 1)},
workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, maxUpdateBackoff)), "SriovNetworkNodeState"),
eventRecorder: er,
}
}

Expand Down Expand Up @@ -656,6 +660,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {

if reqReboot {
glog.Info("nodeStateSyncHandler(): reboot node")
dn.eventRecorder.SendEvent("RebootNode", "Reboot node has been initiated")
rebootNode()
return nil
}
Expand Down Expand Up @@ -1030,6 +1035,7 @@ func (dn *Daemon) drainNode() error {
var lastErr error

glog.Info("drainNode(): Start draining")
dn.eventRecorder.SendEvent("DrainNode", "Drain node has been initiated")
if err = wait.ExponentialBackoff(backoff, func() (bool, error) {
err := drain.RunCordonOrUncordon(dn.drainer, dn.node, true)
if err != nil {
Expand All @@ -1048,9 +1054,11 @@ func (dn *Daemon) drainNode() error {
if err == wait.ErrWaitTimeout {
glog.Errorf("drainNode(): failed to drain node (%d tries): %v :%v", backoff.Steps, err, lastErr)
}
dn.eventRecorder.SendEvent("DrainNode", "Drain node failed")
glog.Errorf("drainNode(): failed to drain node: %v", err)
return err
}
dn.eventRecorder.SendEvent("DrainNode", "Drain node completed")
glog.Info("drainNode(): drain complete")
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ var _ = Describe("Config Daemon", func() {
err = sriovnetworkv1.InitNicIDMapFromConfigMap(kubeClient, namespace)
Expect(err).ToNot(HaveOccurred())

er := NewEventRecorder(client, "test-node", kubeClient)

sut = New("test-node",
client,
kubeClient,
Expand All @@ -114,6 +116,7 @@ var _ = Describe("Config Daemon", func() {
refreshCh,
utils.Baremetal,
false,
er,
false,
)

Expand Down
52 changes: 52 additions & 0 deletions pkg/daemon/event_recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package daemon

import (
"context"

"github.com/golang/glog"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedv1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"

snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
)

type EventRecorder struct {
client snclientset.Interface
node string
eventRecorder record.EventRecorder
eventBroadcaster record.EventBroadcaster
}

// NewEventRecorder Create a new EventRecorder
func NewEventRecorder(c snclientset.Interface, n string, kubeclient kubernetes.Interface) *EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(4)
eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: kubeclient.CoreV1().Events("")})
eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "config-daemon"})
return &EventRecorder{
client: c,
node: n,
eventRecorder: eventRecorder,
eventBroadcaster: eventBroadcaster,
}
}

// SendEvent Send an Event on the NodeState object
func (e *EventRecorder) SendEvent(eventType string, msg string) {
nodeState, err := e.client.SriovnetworkV1().SriovNetworkNodeStates(namespace).Get(context.Background(), e.node, metav1.GetOptions{})
if err != nil {
glog.Warningf("getNodeState(): Failed to fetch node state %s (%v); skip SendEvent", e.node, err)
return
}
e.eventRecorder.Event(nodeState, corev1.EventTypeNormal, eventType, msg)
}

// Shutdown Close the EventBroadcaster
func (e *EventRecorder) Shutdown() {
e.eventBroadcaster.Shutdown()
}
24 changes: 21 additions & 3 deletions pkg/daemon/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

const (
CheckpointFileName = "sno-initial-node-state.json"
Unknown = "Unknown"
)

type NodeStateStatusWriter struct {
Expand All @@ -31,14 +32,16 @@ type NodeStateStatusWriter struct {
openStackDevicesInfo utils.OSPDevicesInfo
withUnsupportedDevices bool
storeManager utils.StoreManagerInterface
eventRecorder *EventRecorder
}

// NewNodeStateStatusWriter Create a new NodeStateStatusWriter
func NewNodeStateStatusWriter(c snclientset.Interface, n string, f func(), devMode bool) *NodeStateStatusWriter {
func NewNodeStateStatusWriter(c snclientset.Interface, n string, f func(), er *EventRecorder, devMode bool) *NodeStateStatusWriter {
return &NodeStateStatusWriter{
client: c,
node: n,
OnHeartbeatFailure: f,
eventRecorder: er,
withUnsupportedDevices: devMode,
}
}
Expand Down Expand Up @@ -170,9 +173,24 @@ func (w *NodeStateStatusWriter) setNodeStateStatus(msg Message) (*sriovnetworkv1
// clear lastSyncError when sync Succeeded
nodeState.Status.LastSyncError = msg.lastSyncError
}
nodeState.Status.SyncStatus = msg.syncStatus

oldStatus := nodeState.Status.SyncStatus
newStatus := msg.syncStatus
nodeState.Status.SyncStatus = newStatus
glog.V(0).Infof("setNodeStateStatus(): syncStatus: %s, lastSyncError: %s", nodeState.Status.SyncStatus, nodeState.Status.LastSyncError)

if oldStatus != newStatus {
if oldStatus == "" {
oldStatus = Unknown
}
if newStatus == "" {
newStatus = Unknown
}
eventMsg := fmt.Sprintf("Status changed from: %s to: %s", oldStatus, newStatus)
if nodeState.Status.LastSyncError != "" {
eventMsg = fmt.Sprintf("%s. Last Error: %s", eventMsg, nodeState.Status.LastSyncError)
}
w.eventRecorder.SendEvent("SyncStatusChanged", eventMsg)
}
})
if err != nil {
return nil, err
Expand Down

0 comments on commit 1ee7e8a

Please sign in to comment.