diff --git a/pkg/datamover/backup_micro_service.go b/pkg/datamover/backup_micro_service.go index fa48b3523e..de77204a84 100644 --- a/pkg/datamover/backup_micro_service.go +++ b/pkg/datamover/backup_micro_service.go @@ -95,7 +95,7 @@ func NewBackupMicroService(ctx context.Context, client client.Client, kubeClient } func (r *BackupMicroService) Init() error { - r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataUploadName, r.nodeName) + r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataUploadName, r.nodeName, r.logger) handler, err := r.duInformer.AddEventHandler( cachetool.ResourceEventHandlerFuncs{ @@ -222,6 +222,8 @@ func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string, log.WithError(err).Error("Async fs backup was not completed") } + r.eventRecorder.EndingEvent(du, false, datapath.EventReasonStopped, "Data path for %s stopped", du.Name) + return result, err } diff --git a/pkg/datamover/backup_micro_service_test.go b/pkg/datamover/backup_micro_service_test.go index cca428ee88..90aff37b3c 100644 --- a/pkg/datamover/backup_micro_service_test.go +++ b/pkg/datamover/backup_micro_service_test.go @@ -62,6 +62,15 @@ func (bt *backupMsTestHelper) Event(_ runtime.Object, _ bool, reason string, mes bt.eventReason = reason bt.eventMsg = fmt.Sprintf(message, a...) } + +func (bt *backupMsTestHelper) EndingEvent(_ runtime.Object, _ bool, reason string, message string, a ...any) { + bt.eventLock.Lock() + defer bt.eventLock.Unlock() + + bt.withEvent = true + bt.eventReason = reason + bt.eventMsg = fmt.Sprintf(message, a...) +} func (bt *backupMsTestHelper) Shutdown() {} func (bt *backupMsTestHelper) Marshal(v any) ([]byte, error) { @@ -336,7 +345,7 @@ func TestRunCancelableDataPath(t *testing.T) { ctx: ctxTimeout, kubeClientObj: []runtime.Object{duInProgress}, dataPathStarted: true, - expectedEventMsg: fmt.Sprintf("Data path for %s started", dataUploadName), + expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataUploadName), expectedErr: "timed out waiting for fs backup to complete", }, { @@ -347,7 +356,7 @@ func TestRunCancelableDataPath(t *testing.T) { result: &dataPathResult{ err: errors.New("fake-data-path-error"), }, - expectedEventMsg: fmt.Sprintf("Data path for %s started", dataUploadName), + expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataUploadName), expectedErr: "fake-data-path-error", }, { @@ -358,7 +367,7 @@ func TestRunCancelableDataPath(t *testing.T) { result: &dataPathResult{ result: "fake-succeed-result", }, - expectedEventMsg: fmt.Sprintf("Data path for %s started", dataUploadName), + expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataUploadName), }, } diff --git a/pkg/datamover/restore_micro_service.go b/pkg/datamover/restore_micro_service.go index d0a4c6f50c..1746366c9d 100644 --- a/pkg/datamover/restore_micro_service.go +++ b/pkg/datamover/restore_micro_service.go @@ -84,7 +84,7 @@ func NewRestoreMicroService(ctx context.Context, client client.Client, kubeClien } func (r *RestoreMicroService) Init() error { - r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataDownloadName, r.nodeName) + r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataDownloadName, r.nodeName, r.logger) handler, err := r.ddInformer.AddEventHandler( cachetool.ResourceEventHandlerFuncs{ @@ -199,6 +199,8 @@ func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string log.WithError(err).Error("Async fs restore was not completed") } + r.eventRecorder.EndingEvent(dd, false, datapath.EventReasonStopped, "Data path for %s stopped", dd.Name) + return result, err } diff --git a/pkg/datamover/restore_micro_service_test.go b/pkg/datamover/restore_micro_service_test.go index 8a3ed61e1f..c2e9ca4c20 100644 --- a/pkg/datamover/restore_micro_service_test.go +++ b/pkg/datamover/restore_micro_service_test.go @@ -289,7 +289,7 @@ func TestRunCancelableRestore(t *testing.T) { ctx: ctxTimeout, kubeClientObj: []runtime.Object{ddInProgress}, dataPathStarted: true, - expectedEventMsg: fmt.Sprintf("Data path for %s started", dataDownloadName), + expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataDownloadName), expectedErr: "timed out waiting for fs restore to complete", }, { @@ -300,7 +300,7 @@ func TestRunCancelableRestore(t *testing.T) { result: &dataPathResult{ err: errors.New("fake-data-path-error"), }, - expectedEventMsg: fmt.Sprintf("Data path for %s started", dataDownloadName), + expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataDownloadName), expectedErr: "fake-data-path-error", }, { @@ -311,7 +311,7 @@ func TestRunCancelableRestore(t *testing.T) { result: &dataPathResult{ result: "fake-succeed-result", }, - expectedEventMsg: fmt.Sprintf("Data path for %s started", dataDownloadName), + expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataDownloadName), }, } diff --git a/pkg/datapath/micro_service_watcher.go b/pkg/datapath/micro_service_watcher.go index d74ca2fc2c..d211996189 100644 --- a/pkg/datapath/micro_service_watcher.go +++ b/pkg/datapath/micro_service_watcher.go @@ -52,6 +52,7 @@ const ( EventReasonCancelled = "Data-Path-Canceled" EventReasonProgress = "Data-Path-Progress" EventReasonCancelling = "Data-Path-Canceling" + EventReasonStopped = "Data-Path-Stopped" ) type microServiceBRWatcher struct { @@ -340,15 +341,15 @@ func (ms *microServiceBRWatcher) onEvent(evt *v1.Event) { ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, funcGetProgressFromMessage(evt.Message, ms.log)) case EventReasonCompleted: ms.log.Infof("Received data path completed message: %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log)) - ms.terminatedFromEvent = true case EventReasonCancelled: ms.log.Infof("Received data path canceled message: %s", evt.Message) - ms.terminatedFromEvent = true case EventReasonFailed: ms.log.Infof("Received data path failed message: %s", evt.Message) - ms.terminatedFromEvent = true case EventReasonCancelling: ms.log.Infof("Received data path canceling message: %s", evt.Message) + case EventReasonStopped: + ms.terminatedFromEvent = true + ms.log.Infof("Received data path stop message: %s", evt.Message) default: ms.log.Infof("Received event for data path %s, reason: %s, message: %s", ms.taskName, evt.Reason, evt.Message) } diff --git a/pkg/util/kube/event.go b/pkg/util/kube/event.go index d5a4bb8d22..de91d3533d 100644 --- a/pkg/util/kube/event.go +++ b/pkg/util/kube/event.go @@ -16,8 +16,11 @@ limitations under the License. package kube import ( + "sync" "time" + "github.com/google/uuid" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" @@ -27,16 +30,34 @@ import ( type EventRecorder interface { Event(object runtime.Object, warning bool, reason string, message string, a ...any) + EndingEvent(object runtime.Object, warning bool, reason string, message string, a ...any) Shutdown() } type eventRecorder struct { - broadcaster record.EventBroadcaster - recorder record.EventRecorder + broadcaster record.EventBroadcaster + recorder record.EventRecorder + lock sync.Mutex + endingSentinel *eventElement + log logrus.FieldLogger } -func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, eventSource string, eventNode string) EventRecorder { - res := eventRecorder{} +type eventElement struct { + t string + r string + m string + sinked chan struct{} +} + +type eventSink struct { + recorder *eventRecorder + sink typedcorev1.EventInterface +} + +func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, eventSource string, eventNode string, log logrus.FieldLogger) EventRecorder { + res := eventRecorder{ + log: log, + } res.broadcaster = record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{ MaxEvents: 1, @@ -45,7 +66,11 @@ func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, e }, }) - res.broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + res.broadcaster.StartRecordingToSink(&eventSink{ + recorder: &res, + sink: kubeClient.CoreV1().Events(""), + }) + res.recorder = res.broadcaster.NewRecorder(scheme, v1.EventSource{ Component: eventSource, Host: eventNode, @@ -55,6 +80,10 @@ func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, e } func (er *eventRecorder) Event(object runtime.Object, warning bool, reason string, message string, a ...any) { + if er.broadcaster == nil { + return + } + eventType := v1.EventTypeNormal if warning { eventType = v1.EventTypeWarning @@ -67,8 +96,95 @@ func (er *eventRecorder) Event(object runtime.Object, warning bool, reason strin } } +func (er *eventRecorder) EndingEvent(object runtime.Object, warning bool, reason string, message string, a ...any) { + if er.broadcaster == nil { + return + } + + er.Event(object, warning, reason, message, a...) + + var sentinelEvent string + + er.lock.Lock() + if er.endingSentinel == nil { + sentinelEvent = uuid.NewString() + er.endingSentinel = &eventElement{ + t: v1.EventTypeNormal, + r: sentinelEvent, + m: sentinelEvent, + sinked: make(chan struct{}), + } + } + er.lock.Unlock() + + if sentinelEvent != "" { + er.Event(object, false, sentinelEvent, sentinelEvent) + } else { + er.log.Warn("More than one ending events, ignore") + } +} + +var shutdownTimeout time.Duration = time.Minute + func (er *eventRecorder) Shutdown() { - // StartEventWatcher doesn't wait for writing all buffered events to API server when Shutdown is called, so have to hardcode a sleep time - time.Sleep(2 * time.Second) + var wait chan struct{} + er.lock.Lock() + if er.endingSentinel != nil { + wait = er.endingSentinel.sinked + } + er.lock.Unlock() + + if wait != nil { + er.log.Info("Waiting sentinel before shutdown") + + waitloop: + for { + select { + case <-wait: + break waitloop + case <-time.After(shutdownTimeout): + er.log.Warn("Timeout waiting for assured events processed") + break waitloop + } + } + } + er.broadcaster.Shutdown() + er.broadcaster = nil + + er.lock.Lock() + er.endingSentinel = nil + er.lock.Unlock() +} + +func (er *eventRecorder) sentinelWatch(event *v1.Event) bool { + er.lock.Lock() + defer er.lock.Unlock() + + if er.endingSentinel == nil { + return false + } + + if er.endingSentinel.m == event.Message && er.endingSentinel.r == event.Reason && er.endingSentinel.t == event.Type { + close(er.endingSentinel.sinked) + return true + } + + return false +} + +func (es *eventSink) Create(event *v1.Event) (*v1.Event, error) { + if es.recorder.sentinelWatch(event) { + return event, nil + } + + return es.sink.CreateWithEventNamespace(event) +} + +func (es *eventSink) Update(event *v1.Event) (*v1.Event, error) { + return es.sink.UpdateWithEventNamespace(event) +} + +func (es *eventSink) Patch(event *v1.Event, data []byte) (*v1.Event, error) { + return es.sink.PatchWithEventNamespace(event, data) } diff --git a/pkg/util/kube/event_test.go b/pkg/util/kube/event_test.go index 5381425960..47bab32d69 100644 --- a/pkg/util/kube/event_test.go +++ b/pkg/util/kube/event_test.go @@ -18,7 +18,9 @@ package kube import ( "context" + "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -28,35 +30,142 @@ import ( "k8s.io/client-go/kubernetes/fake" corev1 "k8s.io/api/core/v1" + + velerotest "github.com/vmware-tanzu/velero/pkg/test" ) func TestEvent(t *testing.T) { - client := fake.NewSimpleClientset() - - scheme := runtime.NewScheme() - err := corev1.AddToScheme(scheme) - require.NoError(t, err) - - recorder := NewEventRecorder(client, scheme, "source-1", "fake-node") + type testEvent struct { + warning bool + reason string + message string + ending bool + } - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "fake-ns", - Name: "fake-pod", - UID: types.UID("fake-uid"), + cases := []struct { + name string + events []testEvent + expected int + }{ + { + name: "update events, different message", + events: []testEvent{ + { + warning: false, + reason: "Progress", + message: "progress-1", + }, + { + warning: false, + reason: "Progress", + message: "progress-2", + ending: true, + }, + }, + expected: 1, }, - Spec: corev1.PodSpec{ - NodeName: "fake-node", + { + name: "create events, different reason", + events: []testEvent{ + { + warning: false, + reason: "action-1-1", + message: "fake-message", + }, + { + warning: false, + reason: "action-1-2", + message: "fake-message", + ending: true, + }, + }, + expected: 2, + }, + { + name: "create events, different warning", + events: []testEvent{ + { + warning: false, + reason: "action-2-1", + message: "fake-message", + }, + { + warning: true, + reason: "action-2-1", + message: "fake-message", + ending: true, + }, + }, + expected: 2, + }, + { + name: "endingEvent, double entrance", + events: []testEvent{ + { + warning: false, + reason: "action-2-1", + message: "fake-message", + ending: true, + }, + { + warning: true, + reason: "action-2-1", + message: "fake-message", + ending: true, + }, + }, + expected: -1, }, } - recorder.Event(pod, false, "Progress", "progress-1") - recorder.Event(pod, false, "Progress", "progress-2") + shutdownTimeout = time.Second * 5 + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + client := fake.NewSimpleClientset() + scheme := runtime.NewScheme() + err := corev1.AddToScheme(scheme) + require.NoError(t, err) - recorder.Shutdown() + recorder := NewEventRecorder(client, scheme, "source-1", "fake-node", velerotest.NewLogger()) - items, err := client.CoreV1().Events("fake-ns").List(context.Background(), metav1.ListOptions{}) - require.NoError(t, err) + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "fake-pod", + UID: types.UID("fake-uid"), + }, + Spec: corev1.PodSpec{ + NodeName: "fake-node", + }, + } + + _, err = client.CoreV1().Pods("fake-ns").Create(context.Background(), pod, metav1.CreateOptions{}) + require.NoError(t, err) + + for _, e := range tc.events { + if e.ending { + recorder.EndingEvent(pod, e.warning, e.reason, e.message) + } else { + recorder.Event(pod, e.warning, e.reason, e.message) + } + } + + recorder.Shutdown() + + items, err := client.CoreV1().Events("fake-ns").List(context.Background(), metav1.ListOptions{}) + require.NoError(t, err) + + if tc.expected != len(items.Items) { + for _, i := range items.Items { + fmt.Printf("event (%s, %s, %s)\n", i.Type, i.Message, i.Reason) + } + } + + if tc.expected >= 0 { + assert.Len(t, items.Items, tc.expected) + } + }) + } - assert.Len(t, items.Items, 1) }