From 00cbdce9e5db862f1ed34544004a793f09ad056b Mon Sep 17 00:00:00 2001 From: Jack Ding Date: Thu, 4 Jul 2024 13:48:55 -0400 Subject: [PATCH] O-RAN V3 Rest Api: Status Notification Signed-off-by: Jack Ding --- go.mod | 4 +- go.sum | 2 - server_test.go | 2 +- v2/routes.go | 67 +++++++++---------- v2/server.go | 4 +- v2/server_test.go | 2 +- .../redhat-cne/sdk-go/pkg/channel/data.go | 5 +- .../redhat-cne/sdk-go/pkg/event/event_ce.go | 8 +-- .../sdk-go/pkg/event/ptp/resource.go | 11 ++- .../sdk-go/pkg/event/ptp/syncstate.go | 12 ++++ .../redhat-cne/sdk-go/pkg/event/ptp/types.go | 13 +++- .../redhat-cne/sdk-go/pkg/pubsub/pubsub.go | 2 +- .../sdk-go/pkg/subscriber/subscriber.go | 6 +- .../pkg/subscriber/subscriber_reader.go | 7 +- .../pkg/subscriber/subscriber_writer.go | 8 +-- .../redhat-cne/sdk-go/v1/event/event.go | 8 +-- .../sdk-go/v1/subscriber/subscriber.go | 25 +++++-- vendor/modules.txt | 2 +- 18 files changed, 108 insertions(+), 80 deletions(-) diff --git a/go.mod b/go.mod index 4db048b..ea58dff 100644 --- a/go.mod +++ b/go.mod @@ -7,12 +7,14 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.0 github.com/prometheus/client_golang v1.14.0 - github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9 + github.com/redhat-cne/sdk-go v1.0.1-unpublished github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.8.0 golang.org/x/net v0.7.0 ) +replace github.com/redhat-cne/sdk-go v1.0.1-unpublished => ../sdk-go + require ( github.com/BurntSushi/toml v0.3.1 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/go.sum b/go.sum index 7ff684e..210e666 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,6 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= -github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9 h1:qDOGSHOtHRszd8FnM0GZVUvbIvHhZrw5GeccXYPwT04= -github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9/go.mod h1:q9LxxPbK1tGpDbQm/KIPujqdP0bK1hhuHrIXV3vuUrM= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= diff --git a/server_test.go b/server_test.go index f72c7ee..a620c00 100644 --- a/server_test.go +++ b/server_test.go @@ -100,7 +100,7 @@ func TestMain(m *testing.M) { Subject: func(s string) *string { return &s }("topic"), }.AsV1(), } - _ = e.SetData(cloudevents.ApplicationJSON, cneEvent) + _ = e.SetData("", cneEvent) func() { defer func() { if err := recover(); err != nil { diff --git a/v2/routes.go b/v2/routes.go index 668dba5..141db6f 100644 --- a/v2/routes.go +++ b/v2/routes.go @@ -56,6 +56,7 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) { return } sub := pubsub.PubSub{} + sub.SetVersion(API_VERSION) if err = json.Unmarshal(bodyBytes, &sub); err != nil { respondWithStatusCode(w, http.StatusBadRequest, fmt.Sprintf("marshalling error %v", err)) localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) @@ -67,26 +68,18 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) { localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) return } - if subExists, ok := s.pubSubAPI.HasSubscription(sub.GetResource()); ok { + clientIDs := s.subscriberAPI.GetClientIDByResource(sub.GetResource()) + if len(clientIDs) != 0 { respondWithStatusCode(w, http.StatusConflict, - fmt.Sprintf("subscription (id: %s) with same resource already exists, skipping creation", - subExists.GetID())) + fmt.Sprintf("subscription (clientID: %s) with same resource already exists, skipping creation", + clientIDs[0])) return } id := uuid.New().String() sub.SetID(id) - sub.SetVersion(API_VERSION) - sub.SetURILocation(fmt.Sprintf("http://localhost:%d%s%s/%s", s.port, s.apiPath, "subscriptions", sub.ID)) //nolint:errcheck - - // TODO: cleanup: local pubsub is no longer needed since we are using configMap - newSub, err := s.pubSubAPI.CreateSubscription(sub) - if err != nil { - respondWithStatusCode(w, http.StatusNotFound, fmt.Sprintf("error creating subscription %v", err)) - localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) - return - } - addr := newSub.GetResource() + sub.SetURILocation(fmt.Sprintf("http://%s:%d%s%s/%s", s.apiHost, s.port, s.apiPath, "subscriptions", sub.ID)) //nolint:errcheck + addr := sub.GetResource() // this is placeholder not sending back to report out := channel.DataChan{ @@ -119,7 +112,8 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) { } restClient := restclient.New() - out.Data.SetID(newSub.ID) // set ID to the subscriptionID + // make sure event ID is unique + out.Data.SetID(uuid.New().String()) status, err := restClient.PostCloudEvent(sub.EndPointURI, *out.Data) if err != nil { respondWithStatusCode(w, http.StatusBadRequest, @@ -139,7 +133,7 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) { subs := subscriber.New(s.getClientIDFromURI(endPointURI)) _ = subs.SetEndPointURI(endPointURI) - subs.AddSubscription(newSub) + subs.AddSubscription(sub) subs.Action = channel.NEW cevent, _ := subs.CreateCloudEvents() cevent.SetSource(addr) @@ -162,10 +156,10 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) { localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) } else { out.Status = channel.SUCCESS - _ = out.Data.SetData(cloudevents.ApplicationJSON, updatedObj) + _ = out.Data.SetData("", updatedObj) log.Infof("subscription created successfully.") localmetrics.UpdateSubscriptionCount(localmetrics.ACTIVE, 1) - respondWithJSON(w, http.StatusCreated, newSub) + respondWithJSON(w, http.StatusCreated, sub) } s.dataOut <- &out @@ -256,12 +250,15 @@ func (s *Server) getSubscriptionByID(w http.ResponseWriter, r *http.Request) { respondWithStatusCode(w, http.StatusNotFound, "") return } - sub, err := s.pubSubAPI.GetSubscription(subscriptionID) - if err != nil { - respondWithStatusCode(w, http.StatusNotFound, "") - return + + for _, c := range s.subscriberAPI.GetClientIDBySubID(subscriptionID) { + sub, err := s.subscriberAPI.GetSubscription(c, subscriptionID) + if err == nil { + respondWithJSON(w, http.StatusOK, sub) + return + } } - respondWithJSON(w, http.StatusOK, sub) + respondWithStatusCode(w, http.StatusNotFound, "") } func (s *Server) getPublisherByID(w http.ResponseWriter, r *http.Request) { @@ -278,9 +275,11 @@ func (s *Server) getPublisherByID(w http.ResponseWriter, r *http.Request) { } respondWithJSON(w, http.StatusOK, pub) } + func (s *Server) getSubscriptions(w http.ResponseWriter, _ *http.Request) { - b, err := s.pubSubAPI.GetSubscriptionsFromFile() + b, err := s.subscriberAPI.GetSubscriptions() if err != nil { + log.Errorf("error loading subscriber data %v", err) respondWithError(w, "error loading subscriber data") return } @@ -322,14 +321,13 @@ func (s *Server) deleteSubscription(w http.ResponseWriter, r *http.Request) { return } - if err := s.pubSubAPI.DeleteSubscription(subscriptionID); err != nil { - localmetrics.UpdateSubscriptionCount(localmetrics.FAILDELETE, 1) - respondWithStatusCode(w, http.StatusNotFound, err.Error()) + clientIDs := s.subscriberAPI.GetClientIDBySubID(subscriptionID) + if len(clientIDs) == 0 { + respondWithStatusCode(w, http.StatusNotFound, "") return } - // update configMap - for _, c := range s.subscriberAPI.GetClientIDBySubID(subscriptionID) { + for _, c := range clientIDs { if err := s.subscriberAPI.DeleteSubscription(c, subscriptionID); err != nil { localmetrics.UpdateSubscriptionCount(localmetrics.FAILDELETE, 1) respondWithStatusCode(w, http.StatusNotFound, err.Error()) @@ -337,6 +335,7 @@ func (s *Server) deleteSubscription(w http.ResponseWriter, r *http.Request) { } } + // update configMap for _, subs := range s.subscriberAPI.SubscriberStore.Store { cevent, _ := subs.CreateCloudEvents() out := channel.DataChan{ @@ -441,12 +440,12 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) { } } } else { - respondWithError(w, "subscription not found") + respondWithStatusCode(w, http.StatusNotFound, "subscription not found") return } if sub == nil { - respondWithError(w, "subscription not found") + respondWithStatusCode(w, http.StatusNotFound, "subscription not found") return } @@ -471,14 +470,14 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) { // statusReceiveOverrideFn must return value for if s.statusReceiveOverrideFn != nil { if statusErr := s.statusReceiveOverrideFn(*e, &out); statusErr != nil { - respondWithError(w, statusErr.Error()) + respondWithStatusCode(w, http.StatusNotFound, statusErr.Error()) } else if out.Data != nil { respondWithJSON(w, http.StatusOK, *out.Data) } else { - respondWithError(w, "event not found") + respondWithStatusCode(w, http.StatusNotFound, "event not found") } } else { - respondWithError(w, "onReceive function not defined") + respondWithStatusCode(w, http.StatusNotFound, "onReceive function not defined") } } diff --git a/v2/server.go b/v2/server.go index eff5cc1..37678f9 100644 --- a/v2/server.go +++ b/v2/server.go @@ -81,6 +81,7 @@ const ( // Server defines rest routes server object type Server struct { port int + apiHost string apiPath string //use dataOut chanel to write to configMap dataOut chan<- *channel.DataChan @@ -138,12 +139,13 @@ type swaggReqAccepted struct { //nolint:deadcode,unused } // InitServer is used to supply configurations for rest routes server -func InitServer(port int, apiPath, storePath string, +func InitServer(port int, apiHost, apiPath, storePath string, dataOut chan<- *channel.DataChan, closeCh <-chan struct{}, onStatusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error) *Server { once.Do(func() { ServerInstance = &Server{ port: port, + apiHost: apiHost, apiPath: apiPath, dataOut: dataOut, closeCh: closeCh, diff --git a/v2/server_test.go b/v2/server_test.go index d805442..1472244 100644 --- a/v2/server_test.go +++ b/v2/server_test.go @@ -100,7 +100,7 @@ func TestMain(m *testing.M) { Subject: func(s string) *string { return &s }("topic"), }.AsV1(), } - _ = e.SetData(cloudevents.ApplicationJSON, cneEvent) + _ = e.SetData("", cneEvent) func() { defer func() { if err := recover(); err != nil { diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/channel/data.go b/vendor/github.com/redhat-cne/sdk-go/pkg/channel/data.go index 7c20be5..45dc812 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/channel/data.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/channel/data.go @@ -51,9 +51,8 @@ type StatusChan struct { // CreateCloudEvents ... func (d *DataChan) CreateCloudEvents(dataType string) (*cloudevents.Event, error) { - ce := cloudevents.NewEvent(cloudevents.VersionV03) - ce.SetDataContentType(cloudevents.ApplicationJSON) - ce.SetSpecVersion(cloudevents.VersionV03) + ce := cloudevents.NewEvent(cloudevents.VersionV1) + ce.SetSpecVersion(cloudevents.VersionV1) ce.SetType(dataType) ce.SetSource(d.Address) ce.SetID(d.ClientID.String()) diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go b/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go index 5fe66d4..afd5aa2 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go @@ -25,15 +25,13 @@ import ( // NewCloudEvent create new cloud event from cloud native events and pubsub func (e *Event) NewCloudEvent(ps *pubsub.PubSub) (*cloudevent.Event, error) { - ce := cloudevent.NewEvent(cloudevent.VersionV03) + ce := cloudevent.NewEvent(cloudevent.VersionV1) ce.SetTime(e.GetTime()) ce.SetType(e.Type) - ce.SetDataContentType(cloudevent.ApplicationJSON) - ce.SetSubject(e.Source) // subject is set to source of the event object ce.SetSource(ps.Resource) // bus address - ce.SetSpecVersion(cloudevent.VersionV03) + ce.SetSpecVersion(cloudevent.VersionV1) ce.SetID(uuid.New().String()) - if err := ce.SetData(cloudevent.ApplicationJSON, e.GetData()); err != nil { + if err := ce.SetData("", e.GetData()); err != nil { return nil, err } return &ce, nil diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/resource.go b/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/resource.go index 10cd127..7d21726 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/resource.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/resource.go @@ -18,27 +18,34 @@ package ptp type EventResource string const ( + // O-RAN 7.2.3.6 // GnssSyncStatus notification is signalled from equipment at state change GnssSyncStatus EventResource = "/sync/gnss-status/gnss-sync-status" + // O-RAN 7.2.3.8 // OsClockSyncState State of node OS clock synchronization is notified at state change OsClockSyncState EventResource = "/sync/sync-status/os-clock-sync-state" + // O-RAN 7.2.3.10 // PtpClockClass notification is generated when the clock-class changes. - PtpClockClass EventResource = "/sync/ptp-status/ptp-clock-class-change" + PtpClockClass EventResource = "/sync/ptp-status/clock-class" + // O-RAN 7.2.3.3 // PtpLockState notification is signalled from equipment at state change PtpLockState EventResource = "/sync/ptp-status/lock-state" + // O-RAN 7.2.3.11 // SynceClockQuality notification is generated when the clock-quality changes. SynceClockQuality EventResource = "/sync/synce-status/clock-quality" + // O-RAN 7.2.3.9 // SynceLockState Notification used to inform about synce synchronization state change SynceLockState EventResource = "/sync/synce-status/lock-state" // SynceLockStateExtended notification is signalled from equipment at state change, enhanced information SynceLockStateExtended EventResource = "/sync/synce-status/lock-state-extended" - // SyncStatusState State of equipment synchronization is notified at state change + // O-RAN 7.2.3.1 + // SyncStatusState is the overall synchronization health of the node, including the OS System Clock SyncStatusState EventResource = "/sync/sync-status/sync-state" ) diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/syncstate.go b/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/syncstate.go index 741e337..4b2ee93 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/syncstate.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/syncstate.go @@ -30,6 +30,18 @@ const ( // BOOTING ... BOOTING SyncState = "BOOTING" + // FAILURE_MULTIPATH is GNSS Sync Failure - Multipath condition detected + FAILURE_MULTIPATH SyncState = "FAILURE-MULTIPATH" + + // FAILURE_NOFIX is GNSS Sync Failure - Unknown + FAILURE_NOFIX SyncState = "FAILURE-NOFIX" + + // FAILURE_LOW_SNR is GNSS Sync Failure - Low SNR condition detected + FAILURE_LOW_SNR SyncState = "FAILURE-LOW-SNR" + + // FAILURE_PLL is GNSS Sync Failure - PLL is not functioning + FAILURE_PLL SyncState = "FAILURE-PLL" + // FREERUN ... FREERUN SyncState = "FREERUN" diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/types.go b/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/types.go index 9ff2865..ff6e5ea 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/types.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/types.go @@ -18,27 +18,34 @@ package ptp type EventType string const ( + // O-RAN 7.2.3.6 // GnssStateChange is Notification used to inform about gnss synchronization state change GnssStateChange EventType = "event.sync.gnss-status.gnss-state-change" + // O-RAN 7.2.3.8 // OsClockSyncStateChange is the object contains information related to a notification OsClockSyncStateChange EventType = "event.sync.sync-status.os-clock-sync-state-change" + // O-RAN 7.2.3.10 // PtpClockClassChange is Notification used to inform about ptp clock class changes. PtpClockClassChange EventType = "event.sync.ptp-status.ptp-clock-class-change" + // O-RAN 7.2.3.3 // PtpStateChange is Notification used to inform about ptp synchronization state change PtpStateChange EventType = "event.sync.ptp-status.ptp-state-change" + // O-RAN 7.2.3.11 // SynceClockQualityChange is Notification used to inform about changes in the clock quality of the primary SyncE signal advertised in ESMC packets - SynceClockQualityChange EventType = "event.sync.synce-status.sync-clock-quality-change" + SynceClockQualityChange EventType = "event.sync.synce-status.synce-clock-quality-change" + // O-RAN 7.2.3.9 // SynceStateChange is Notification used to inform about synce synchronization state change - SynceStateChange EventType = "event.sync.sync-status.synce-state-change" + SynceStateChange EventType = "event.sync.synce-status.synce-state-change" // SynceStateChangeExtended is Notification used to inform about synce synchronization state change, enhanced state information SynceStateChangeExtended EventType = "event.sync.synce-status.synce-state-change-extended" - // SyncStateChange is Notification used to inform about synchronization state change + // O-RAN 7.2.3.1 + // SyncStateChange is Notification used to inform about the overall synchronization state change SyncStateChange EventType = "event.sync.sync-status.synchronization-state-change" ) diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub.go b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub.go index 428c9bf..be7afd0 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub.go @@ -52,7 +52,7 @@ func (ps *PubSub) String() string { b := strings.Builder{} b.WriteString(" EndpointUri: " + ps.GetEndpointURI() + "\n") b.WriteString(" UriLocation: " + ps.GetURILocation() + "\n") - b.WriteString(" ID: " + ps.GetID() + "\n") + b.WriteString(" SubscriptionId: " + ps.GetID() + "\n") b.WriteString(" Resource: " + ps.GetResource() + "\n") return b.String() } diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber.go index 3c8f3fd..ce8e82c 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber.go @@ -49,7 +49,7 @@ type Subscriber struct { SubStore *store.PubSubStore `json:"subStore" omit:"empty"` // EndPointURI - A URI describing the subscriber link . // +required - EndPointURI *types.URI `json:"endPointURI" omit:"empty"` + EndPointURI *types.URI `json:"EndpointUri" omit:"empty"` // Status ... Status Status `json:"status" omit:"empty"` // Action ... @@ -84,8 +84,8 @@ func (s *Subscriber) FailedCount() int { // String returns a pretty-printed representation of the Event. func (s *Subscriber) String() string { b := strings.Builder{} - b.WriteString(" EndPointURI: " + s.GetEndPointURI() + "\n") - b.WriteString(" ID: " + s.GetClientID().String() + "\n") + b.WriteString(" EndpointUri: " + s.GetEndPointURI() + "\n") + b.WriteString(" clientID: " + s.GetClientID().String() + "\n") b.WriteString(" sub :{") if s.SubStore != nil { for _, v := range s.SubStore.Store { diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_reader.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_reader.go index 98bcf43..48fcbb6 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_reader.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_reader.go @@ -31,13 +31,12 @@ func (s *Subscriber) GetSubStore() *store.PubSubStore { // CreateCloudEvents ... func (s *Subscriber) CreateCloudEvents() (*cloudevents.Event, error) { - ce := cloudevents.NewEvent(cloudevents.VersionV03) - ce.SetDataContentType(cloudevents.ApplicationJSON) - ce.SetSpecVersion(cloudevents.VersionV03) + ce := cloudevents.NewEvent(cloudevents.VersionV1) + ce.SetSpecVersion(cloudevents.VersionV1) ce.SetType(channel.SUBSCRIBER.String()) ce.SetSource("subscription-request") ce.SetID(uuid.New().String()) - if err := ce.SetData(cloudevents.ApplicationJSON, s); err != nil { + if err := ce.SetData("", s); err != nil { return nil, err } return &ce, nil diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_writer.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_writer.go index 5b9bad9..9d3d3b8 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_writer.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_writer.go @@ -42,12 +42,6 @@ func (s *Subscriber) SetStatus(status Status) { // AddSubscription ... func (s *Subscriber) AddSubscription(subs ...pubsub.PubSub) { for _, ss := range subs { - newS := &pubsub.PubSub{ - ID: ss.GetID(), - EndPointURI: nil, - URILocation: nil, - Resource: ss.Resource, - } - s.SubStore.Store[ss.GetID()] = newS + s.SubStore.Store[ss.GetID()] = &ss } } diff --git a/vendor/github.com/redhat-cne/sdk-go/v1/event/event.go b/vendor/github.com/redhat-cne/sdk-go/v1/event/event.go index 93786f4..c9b9d96 100644 --- a/vendor/github.com/redhat-cne/sdk-go/v1/event/event.go +++ b/vendor/github.com/redhat-cne/sdk-go/v1/event/event.go @@ -85,15 +85,13 @@ func SendCloudEventsToDataChannel(inChan chan<- *channel.DataChan, status channe // CreateCloudEvents create new cloud event from cloud native events and pubsub func CreateCloudEvents(e event.Event, ps pubsub.PubSub) (*cloudevents.Event, error) { - ce := cloudevents.NewEvent(cloudevents.VersionV03) + ce := cloudevents.NewEvent(cloudevents.VersionV1) ce.SetTime(e.GetTime()) ce.SetType(e.Type) - ce.SetDataContentType(cloudevents.ApplicationJSON) - ce.SetSubject(e.Source) // subject is set to source of the event object ce.SetSource(ps.Resource) // bus address - ce.SetSpecVersion(cloudevents.VersionV03) + ce.SetSpecVersion(cloudevents.VersionV1) ce.SetID(uuid.New().String()) - if err := ce.SetData(cloudevents.ApplicationJSON, e.GetData()); err != nil { + if err := ce.SetData("", e.GetData()); err != nil { return nil, err } return &ce, nil diff --git a/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/subscriber.go b/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/subscriber.go index 6fb59c8..93b1f55 100644 --- a/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/subscriber.go +++ b/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/subscriber.go @@ -201,8 +201,8 @@ func (p *API) GetSubscriptionsFromFile(clientID uuid.UUID) ([]byte, error) { return b, err } -// GetSubscriptions get all subscriptionOne inforamtions -func (p *API) GetSubscriptions(clientID uuid.UUID) (sub map[string]*pubsub.PubSub) { +// GetSubscriptionsFromClient get all subs from the client +func (p *API) GetSubscriptionsFromClientID(clientID uuid.UUID) (sub map[string]*pubsub.PubSub) { if subs, ok := p.SubscriberStore.Get(clientID); ok { sub = subs.SubStore.Store } @@ -210,12 +210,25 @@ func (p *API) GetSubscriptions(clientID uuid.UUID) (sub map[string]*pubsub.PubSu return } -// GetSubscription get subscriptionOne inforamtions -func (p *API) GetSubscription(clientID uuid.UUID, subID string) pubsub.PubSub { +// GetSubscriptions get all subs +func (p *API) GetSubscriptions() ([]byte, error) { + p.SubscriberStore.RLock() + defer p.SubscriberStore.RUnlock() + var allSubs []pubsub.PubSub + for _, s := range p.SubscriberStore.Store { + for _, sub := range s.SubStore.Store { + allSubs = append(allSubs, *sub) + } + } + return json.MarshalIndent(&allSubs, "", " ") +} + +// GetSubscription get sub info from clientID and subID +func (p *API) GetSubscription(clientID uuid.UUID, subID string) (pubsub.PubSub, error) { if subs, ok := p.SubscriberStore.Get(clientID); ok { - return subs.Get(subID) + return subs.Get(subID), nil } - return pubsub.PubSub{} + return pubsub.PubSub{}, fmt.Errorf("subscription data was not found for id %s", subID) } // GetSubscriberURLByResourceAndClientID get subscription information by client id/resource diff --git a/vendor/modules.txt b/vendor/modules.txt index ac0647f..6090f78 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -71,7 +71,7 @@ github.com/prometheus/common/model github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9 +# github.com/redhat-cne/sdk-go v1.0.1-unpublished => ../sdk-go ## explicit; go 1.22 github.com/redhat-cne/sdk-go/pkg/channel github.com/redhat-cne/sdk-go/pkg/common