Skip to content

Commit

Permalink
Merge pull request #519 from rollandf/events
Browse files Browse the repository at this point in the history
Add events for config daemon
  • Loading branch information
e0ne authored Oct 17, 2023
2 parents d5499fd + 6838aec commit bf595e4
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 4 deletions.
8 changes: 7 additions & 1 deletion cmd/sriov-network-config-daemon/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,11 @@ func runStartCmd(cmd *cobra.Command, args []string) {
glog.V(0).Info("dev mode enabled")
}

eventRecorder := daemon.NewEventRecorder(writerclient, startOpts.nodeName, kubeclient)
defer eventRecorder.Shutdown()

glog.V(0).Info("starting node writer")
nodeWriter := daemon.NewNodeStateStatusWriter(writerclient, startOpts.nodeName, closeAllConns, devMode)
nodeWriter := daemon.NewNodeStateStatusWriter(writerclient, startOpts.nodeName, closeAllConns, eventRecorder, devMode)

destdir := os.Getenv("DEST_DIR")
if destdir == "" {
Expand All @@ -187,6 +190,8 @@ func runStartCmd(cmd *cobra.Command, args []string) {
panic(err.Error())
}

eventRecorder.SendEvent("ConfigDaemonStart", "Config Daemon starting")

// block the deamon process until nodeWriter finish first its run
err = nodeWriter.RunOnce(destdir, platformType)
if err != nil {
Expand All @@ -207,6 +212,7 @@ func runStartCmd(cmd *cobra.Command, args []string) {
refreshCh,
platformType,
startOpts.systemd,
eventRecorder,
devMode,
).Run(stopCh, exitCh)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ type Daemon struct {
storeManager utils.StoreManagerInterface

hostManager host.HostManagerInterface

eventRecorder *EventRecorder
}

const (
Expand Down Expand Up @@ -149,6 +151,7 @@ func New(
refreshCh chan<- Message,
platformType utils.PlatformType,
useSystemdService bool,
er *EventRecorder,
devMode bool,
) *Daemon {
return &Daemon{
Expand Down Expand Up @@ -186,6 +189,7 @@ func New(
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(updateDelay), 1)},
workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, maxUpdateBackoff)), "SriovNetworkNodeState"),
eventRecorder: er,
}
}

Expand Down Expand Up @@ -656,6 +660,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {

if reqReboot {
glog.Info("nodeStateSyncHandler(): reboot node")
dn.eventRecorder.SendEvent("RebootNode", "Reboot node has been initiated")
rebootNode()
return nil
}
Expand Down Expand Up @@ -1030,6 +1035,7 @@ func (dn *Daemon) drainNode() error {
var lastErr error

glog.Info("drainNode(): Start draining")
dn.eventRecorder.SendEvent("DrainNode", "Drain node has been initiated")
if err = wait.ExponentialBackoff(backoff, func() (bool, error) {
err := drain.RunCordonOrUncordon(dn.drainer, dn.node, true)
if err != nil {
Expand All @@ -1048,9 +1054,11 @@ func (dn *Daemon) drainNode() error {
if err == wait.ErrWaitTimeout {
glog.Errorf("drainNode(): failed to drain node (%d tries): %v :%v", backoff.Steps, err, lastErr)
}
dn.eventRecorder.SendEvent("DrainNode", "Drain node failed")
glog.Errorf("drainNode(): failed to drain node: %v", err)
return err
}
dn.eventRecorder.SendEvent("DrainNode", "Drain node completed")
glog.Info("drainNode(): drain complete")
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ var _ = Describe("Config Daemon", func() {
err = sriovnetworkv1.InitNicIDMapFromConfigMap(kubeClient, namespace)
Expect(err).ToNot(HaveOccurred())

er := NewEventRecorder(client, "test-node", kubeClient)

sut = New("test-node",
client,
kubeClient,
Expand All @@ -114,6 +116,7 @@ var _ = Describe("Config Daemon", func() {
refreshCh,
utils.Baremetal,
false,
er,
false,
)

Expand Down
52 changes: 52 additions & 0 deletions pkg/daemon/event_recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package daemon

import (
"context"

"github.com/golang/glog"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedv1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"

snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
)

type EventRecorder struct {
client snclientset.Interface
node string
eventRecorder record.EventRecorder
eventBroadcaster record.EventBroadcaster
}

// NewEventRecorder Create a new EventRecorder
func NewEventRecorder(c snclientset.Interface, n string, kubeclient kubernetes.Interface) *EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(4)
eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: kubeclient.CoreV1().Events("")})
eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "config-daemon"})
return &EventRecorder{
client: c,
node: n,
eventRecorder: eventRecorder,
eventBroadcaster: eventBroadcaster,
}
}

// SendEvent Send an Event on the NodeState object
func (e *EventRecorder) SendEvent(eventType string, msg string) {
nodeState, err := e.client.SriovnetworkV1().SriovNetworkNodeStates(namespace).Get(context.Background(), e.node, metav1.GetOptions{})
if err != nil {
glog.Warningf("SendEvent(): Failed to fetch node state %s (%v); skip SendEvent", e.node, err)
return
}
e.eventRecorder.Event(nodeState, corev1.EventTypeNormal, eventType, msg)
}

// Shutdown Close the EventBroadcaster
func (e *EventRecorder) Shutdown() {
e.eventBroadcaster.Shutdown()
}
24 changes: 21 additions & 3 deletions pkg/daemon/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

const (
CheckpointFileName = "sno-initial-node-state.json"
Unknown = "Unknown"
)

type NodeStateStatusWriter struct {
Expand All @@ -31,14 +32,16 @@ type NodeStateStatusWriter struct {
openStackDevicesInfo utils.OSPDevicesInfo
withUnsupportedDevices bool
storeManager utils.StoreManagerInterface
eventRecorder *EventRecorder
}

// NewNodeStateStatusWriter Create a new NodeStateStatusWriter
func NewNodeStateStatusWriter(c snclientset.Interface, n string, f func(), devMode bool) *NodeStateStatusWriter {
func NewNodeStateStatusWriter(c snclientset.Interface, n string, f func(), er *EventRecorder, devMode bool) *NodeStateStatusWriter {
return &NodeStateStatusWriter{
client: c,
node: n,
OnHeartbeatFailure: f,
eventRecorder: er,
withUnsupportedDevices: devMode,
}
}
Expand Down Expand Up @@ -170,9 +173,24 @@ func (w *NodeStateStatusWriter) setNodeStateStatus(msg Message) (*sriovnetworkv1
// clear lastSyncError when sync Succeeded
nodeState.Status.LastSyncError = msg.lastSyncError
}
nodeState.Status.SyncStatus = msg.syncStatus

oldStatus := nodeState.Status.SyncStatus
newStatus := msg.syncStatus
nodeState.Status.SyncStatus = newStatus
glog.V(0).Infof("setNodeStateStatus(): syncStatus: %s, lastSyncError: %s", nodeState.Status.SyncStatus, nodeState.Status.LastSyncError)

if oldStatus != newStatus {
if oldStatus == "" {
oldStatus = Unknown
}
if newStatus == "" {
newStatus = Unknown
}
eventMsg := fmt.Sprintf("Status changed from: %s to: %s", oldStatus, newStatus)
if nodeState.Status.LastSyncError != "" {
eventMsg = fmt.Sprintf("%s. Last Error: %s", eventMsg, nodeState.Status.LastSyncError)
}
w.eventRecorder.SendEvent("SyncStatusChanged", eventMsg)
}
})
if err != nil {
return nil, err
Expand Down

0 comments on commit bf595e4

Please sign in to comment.