diff --git a/go.mod b/go.mod index 7765270..ea58dff 100644 --- a/go.mod +++ b/go.mod @@ -7,12 +7,14 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.0 github.com/prometheus/client_golang v1.14.0 - github.com/redhat-cne/sdk-go v1.0.1-0.20240614182056-bfc7566a02ac + github.com/redhat-cne/sdk-go v1.0.1-unpublished github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.8.0 golang.org/x/net v0.7.0 ) +replace github.com/redhat-cne/sdk-go v1.0.1-unpublished => ../sdk-go + require ( github.com/BurntSushi/toml v0.3.1 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -23,6 +25,7 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect diff --git a/go.sum b/go.sum index 4b03859..210e666 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,6 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= -github.com/redhat-cne/sdk-go v1.0.1-0.20240614182056-bfc7566a02ac h1:I9dfgug3GIViEQxgKrLdWB8Y3mqVe0alismyTOdTz6w= -github.com/redhat-cne/sdk-go v1.0.1-0.20240614182056-bfc7566a02ac/go.mod h1:q9LxxPbK1tGpDbQm/KIPujqdP0bK1hhuHrIXV3vuUrM= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= diff --git a/pkg/restclient/client.go b/pkg/restclient/client.go new file mode 100644 index 0000000..5e02ca7 --- /dev/null +++ b/pkg/restclient/client.go @@ -0,0 +1,179 @@ +// Copyright 2020 The Cloud Native Events Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package restclient + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + ce "github.com/cloudevents/sdk-go/v2/event" + "github.com/redhat-cne/sdk-go/pkg/types" + log "github.com/sirupsen/logrus" + + "golang.org/x/net/context" +) + +var ( + httpTimeout = 2 * time.Second +) + +// Rest client to make http request +type Rest struct { + client http.Client +} + +// New get new rest client +func New() *Rest { + return &Rest{ + client: http.Client{ + Timeout: httpTimeout, + }, + } +} + +// PostEvent post an event to the give url and check for error +func (r *Rest) PostCloudEvent(url *types.URI, e ce.Event) (status int, err error) { + b, err := json.Marshal(e) + if err != nil { + log.Errorf("error marshalling event %v", e) + return status, err + } + + if status = r.Post(url, b); status == http.StatusBadRequest { + return status, fmt.Errorf("post returned status %d", status) + } + return status, nil +} + +// Post with data +func (r *Rest) Post(url *types.URI, data []byte) int { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + request, err := http.NewRequestWithContext(ctx, "POST", url.String(), bytes.NewBuffer(data)) + if err != nil { + log.Errorf("error creating post request %v", err) + return http.StatusBadRequest + } + request.Header.Set("content-type", "application/json") + response, err := r.client.Do(request) + if err != nil { + log.Errorf("error in post response %v", err) + return http.StatusBadRequest + } + if response.Body != nil { + defer response.Body.Close() + // read any content and print + body, readErr := io.ReadAll(response.Body) + if readErr == nil && len(body) > 0 { + log.Debugf("%s return response %s\n", url.String(), string(body)) + } + } + return response.StatusCode +} + +// PostWithReturn post with data and return data +func (r *Rest) PostWithReturn(url *types.URI, data []byte) (int, []byte) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + request, err := http.NewRequestWithContext(ctx, "POST", url.String(), bytes.NewBuffer(data)) + if err != nil { + log.Errorf("error creating post request %v", err) + return http.StatusBadRequest, nil + } + request.Header.Set("content-type", "application/json") + res, err := r.client.Do(request) + if err != nil { + log.Errorf("error in post response %v to %s ", err, url) + return http.StatusBadRequest, nil + } + if res.Body != nil { + defer res.Body.Close() + } + + body, readErr := io.ReadAll(res.Body) + if readErr != nil { + return http.StatusBadRequest, nil + } + return res.StatusCode, body +} + +// Put http request +func (r *Rest) Put(url *types.URI) int { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + request, err := http.NewRequestWithContext(ctx, "PUT", url.String(), nil) + if err != nil { + log.Errorf("error creating post request %v", err) + return http.StatusBadRequest + } + request.Header.Set("content-type", "application/json") + res, err := r.client.Do(request) + if err != nil { + log.Errorf("error in post response %v to %s ", err, url) + return http.StatusBadRequest + } + defer res.Body.Close() + return res.StatusCode +} + +// Delete http request +func (r *Rest) Delete(url *types.URI) int { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + request, err := http.NewRequestWithContext(ctx, "DELETE", url.String(), nil) + if err != nil { + log.Errorf("error creating post request %v", err) + return http.StatusBadRequest + } + request.Header.Set("content-type", "application/json") + res, err := r.client.Do(request) + if err != nil { + log.Errorf("error in post response %v to %s ", err, url) + return http.StatusBadRequest + } + defer res.Body.Close() + return res.StatusCode +} + +// Get http request +func (r *Rest) Get(url *types.URI) (int, string) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + request, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil) + if err != nil { + log.Errorf("error creating post request %v", err) + return http.StatusBadRequest, fmt.Sprintf("error creating post request %v", err) + } + request.Header.Set("content-type", "application/json") + res, err := r.client.Do(request) + if err != nil { + log.Errorf("error in post response %v to %s ", err, url) + return http.StatusBadRequest, fmt.Sprintf("error in post response %v to %s ", err, url) + } + defer res.Body.Close() + if body, readErr := io.ReadAll(res.Body); readErr == nil { + return res.StatusCode, string(body) + } + return http.StatusBadRequest, fmt.Sprintf("error in post response %v to %s ", err, url) +} diff --git a/server_test.go b/server_test.go index f8a9ed6..f72c7ee 100644 --- a/server_test.go +++ b/server_test.go @@ -153,7 +153,7 @@ func TestServer_CreateSubscription(t *testing.T) { // create subscription sub := api.NewPubSub( &types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}}, - resource) + resource, "1.0") data, err := json.Marshal(&sub) assert.Nil(t, err) @@ -295,7 +295,7 @@ func TestServer_GetCurrentState_withSubscription(t *testing.T) { // create subscription sub := api.NewPubSub( &types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}}, - resource) + resource, "1.0") data, err := json.Marshal(&sub) assert.Nil(t, err) diff --git a/v2/routes.go b/v2/routes.go index 380bf7b..64d9df5 100644 --- a/v2/routes.go +++ b/v2/routes.go @@ -21,25 +21,23 @@ import ( "strings" "time" - "github.com/redhat-cne/sdk-go/pkg/types" - "github.com/redhat-cne/rest-api/pkg/localmetrics" + "github.com/redhat-cne/rest-api/pkg/restclient" + "github.com/redhat-cne/sdk-go/pkg/channel" + cne "github.com/redhat-cne/sdk-go/pkg/event" + "github.com/redhat-cne/sdk-go/pkg/pubsub" + "github.com/redhat-cne/sdk-go/pkg/subscriber" + "github.com/redhat-cne/sdk-go/pkg/types" + "github.com/redhat-cne/sdk-go/v1/event" cloudevents "github.com/cloudevents/sdk-go/v2" ce "github.com/cloudevents/sdk-go/v2/event" - cne "github.com/redhat-cne/sdk-go/pkg/event" - "github.com/redhat-cne/sdk-go/pkg/pubsub" - - "github.com/redhat-cne/sdk-go/v1/event" + "net/http" "github.com/google/uuid" "github.com/gorilla/mux" - - "github.com/redhat-cne/sdk-go/pkg/channel" - - "log" - "net/http" + log "github.com/sirupsen/logrus" ) // createSubscription create subscription and send it to a channel that is shared by middleware to process @@ -52,7 +50,6 @@ import ( // 204: noContent func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() - var response *http.Response bodyBytes, err := io.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) @@ -60,42 +57,118 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) { } sub := pubsub.PubSub{} if err = json.Unmarshal(bodyBytes, &sub); err != nil { - respondWithError(w, "marshalling error") + respondWithStatusCode(w, http.StatusBadRequest, fmt.Sprintf("marshalling error %v", err)) localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) return } - if sub.GetEndpointURI() != "" { - response, err = s.HTTPClient.Post(sub.GetEndpointURI(), cloudevents.ApplicationJSON, nil) - if err != nil { - log.Printf("there was error validating endpointurl %v, subscription wont be created", err) - localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) - respondWithError(w, err.Error()) - return - } - defer response.Body.Close() - if response.StatusCode != http.StatusNoContent { - log.Printf("there was an error validating endpointurl %s returned status code %d", sub.GetEndpointURI(), response.StatusCode) - respondWithError(w, "return url validation check failed for create subscription.check endpointURI") - localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) - return - } + endPointURI := sub.GetEndpointURI() + if endPointURI == "" { + respondWithStatusCode(w, http.StatusBadRequest, "EndpointURI can not be empty") + localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) + return } - // check sub.EndpointURI by get - sub.SetID(uuid.New().String()) - _ = sub.SetURILocation(fmt.Sprintf("http://localhost:%d%s%s/%s", s.port, s.apiPath, "subscriptions", sub.ID)) //nolint:errcheck + if subExists, ok := s.pubSubAPI.HasSubscription(sub.GetResource()); ok { + respondWithStatusCode(w, http.StatusConflict, + fmt.Sprintf("subscription (id: %s) with same resource already exists, skipping creation", + subExists.GetID())) + localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) + return + } + + id := uuid.New().String() + sub.SetID(id) + sub.SetVersion(API_VERSION) + sub.SetURILocation(fmt.Sprintf("http://localhost:%d%s%s/%s", s.port, s.apiPath, "subscriptions", sub.ID)) //nolint:errcheck newSub, err := s.pubSubAPI.CreateSubscription(sub) if err != nil { - log.Printf("error creating subscription %v", err) + respondWithStatusCode(w, http.StatusNotFound, fmt.Sprintf("error creating subscription %v", err)) localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) - respondWithError(w, err.Error()) return } - log.Printf("subscription created successfully.") - // go ahead and create QDR to this address - s.sendOut(channel.SUBSCRIBER, &newSub) - localmetrics.UpdateSubscriptionCount(localmetrics.ACTIVE, 1) - respondWithJSON(w, http.StatusCreated, newSub) + addr := newSub.GetResource() + + // this is placeholder not sending back to report + out := channel.DataChan{ + Address: addr, + // ClientID is not used + ClientID: uuid.New(), + Status: channel.NEW, + Type: channel.STATUS, // could be new event of new subscriber (sender) + } + + e, _ := out.CreateCloudEvents(CURRENTSTATE) + e.SetSource(addr) + + if s.statusReceiveOverrideFn == nil { + respondWithStatusCode(w, http.StatusNotFound, "onReceive function not defined") + localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) + return + } + + if statusErr := s.statusReceiveOverrideFn(*e, &out); statusErr != nil { + respondWithStatusCode(w, http.StatusNotFound, statusErr.Error()) + localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) + return + } + + if out.Data == nil { + respondWithStatusCode(w, http.StatusNotFound, "event not found") + localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) + log.Error("event not found") + } + + restClient := restclient.New() + out.Data.SetID(newSub.ID) // set ID to the subscriptionID + status, err := restClient.PostCloudEvent(sub.EndPointURI, *out.Data) + if err != nil { + respondWithStatusCode(w, http.StatusBadRequest, + fmt.Sprintf("failed to POST initial notification: %v, subscription wont be created", err)) + localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) + return + } + if status != http.StatusNoContent { + respondWithStatusCode(w, http.StatusBadRequest, + fmt.Sprintf("initial notification returned wrong status code %d", status)) + localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) + return + } + + log.Infof("initial notification is successful for subscription %s", addr) + // create unique clientId for each subscription based on endPointURI + subs := subscriber.New(s.getClientIDFromURI(endPointURI)) + _ = subs.SetEndPointURI(endPointURI) + + subs.AddSubscription(newSub) + subs.Action = channel.NEW + cevent, _ := subs.CreateCloudEvents() + cevent.SetSource(addr) + + // send this to dataOut channel to update configMap + out = channel.DataChan{ + Address: addr, + Data: cevent, + Status: channel.NEW, + Type: channel.SUBSCRIBER, + } + + var updatedObj *subscriber.Subscriber + // writes a file .json that has the same content as configMap. + // configMap was created later as a way to persist the data. + if updatedObj, err = s.subscriberAPI.CreateSubscription(subs.ClientID, *subs); err != nil { + out.Status = channel.FAILED + respondWithStatusCode(w, http.StatusNotFound, + fmt.Sprintf("failed creating subscription for %s, %v", subs.ClientID.String(), err)) + localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) + } else { + out.Status = channel.SUCCESS + _ = out.Data.SetData(cloudevents.ApplicationJSON, updatedObj) + log.Infof("subscription created successfully.") + localmetrics.UpdateSubscriptionCount(localmetrics.ACTIVE, 1) + respondWithJSON(w, http.StatusCreated, newSub) + } + + s.dataOut <- &out } // createPublisher create publisher and send it to a channel that is shared by middleware to process @@ -123,14 +196,14 @@ func (s *Server) createPublisher(w http.ResponseWriter, r *http.Request) { if pub.GetEndpointURI() != "" { response, err = s.HTTPClient.Post(pub.GetEndpointURI(), cloudevents.ApplicationJSON, nil) if err != nil { - log.Printf("there was an error validating the publisher endpointurl %v, publisher won't be created.", err) + log.Infof("there was an error validating the publisher endpointurl %v, publisher won't be created.", err) localmetrics.UpdatePublisherCount(localmetrics.FAILCREATE, 1) respondWithError(w, err.Error()) return } defer response.Body.Close() if response.StatusCode != http.StatusNoContent { - log.Printf("there was an error validating endpointurl %s returned status code %d", pub.GetEndpointURI(), response.StatusCode) + log.Infof("there was an error validating endpointurl %s returned status code %d", pub.GetEndpointURI(), response.StatusCode) localmetrics.UpdatePublisherCount(localmetrics.FAILCREATE, 1) respondWithError(w, "return url validation check failed for create publisher,check endpointURI") return @@ -142,12 +215,12 @@ func (s *Server) createPublisher(w http.ResponseWriter, r *http.Request) { _ = pub.SetURILocation(fmt.Sprintf("http://localhost:%d%s%s/%s", s.port, s.apiPath, "publishers", pub.ID)) //nolint:errcheck newPub, err := s.pubSubAPI.CreatePublisher(pub) if err != nil { - log.Printf("error creating publisher %v", err) + log.Infof("error creating publisher %v", err) localmetrics.UpdatePublisherCount(localmetrics.FAILCREATE, 1) respondWithError(w, err.Error()) return } - log.Printf("publisher created successfully.") + log.Infof("publisher created successfully.") // go ahead and create QDR to this address s.sendOut(channel.PUBLISHER, &newPub) localmetrics.UpdatePublisherCount(localmetrics.ACTIVE, 1) @@ -180,12 +253,12 @@ func (s *Server) getSubscriptionByID(w http.ResponseWriter, r *http.Request) { queries := mux.Vars(r) subscriptionID, ok := queries["subscriptionid"] if !ok { - respondWithError(w, "subscription not found") + respondWithStatusCode(w, http.StatusNotFound, "") return } sub, err := s.pubSubAPI.GetSubscription(subscriptionID) if err != nil { - respondWithError(w, "subscription not found") + respondWithStatusCode(w, http.StatusNotFound, "") return } respondWithJSON(w, http.StatusOK, sub) @@ -251,12 +324,27 @@ func (s *Server) deleteSubscription(w http.ResponseWriter, r *http.Request) { if err := s.pubSubAPI.DeleteSubscription(subscriptionID); err != nil { localmetrics.UpdateSubscriptionCount(localmetrics.FAILDELETE, 1) - respondWithError(w, err.Error()) + respondWithStatusCode(w, http.StatusNotFound, err.Error()) return } + // update configMap + for _, c := range s.subscriberAPI.GetClientIDBySubID(subscriptionID) { + s.subscriberAPI.DeleteSubscription(c, subscriptionID) + } + + for _, subs := range s.subscriberAPI.SubscriberStore.Store { + cevent, _ := subs.CreateCloudEvents() + out := channel.DataChan{ + Data: cevent, + Status: channel.SUCCESS, + Type: channel.SUBSCRIBER, + } + s.dataOut <- &out + } + localmetrics.UpdateSubscriptionCount(localmetrics.ACTIVE, -1) - respondWithMessage(w, http.StatusOK, "OK") + respondWithStatusCode(w, http.StatusNoContent, "") } func (s *Server) deleteAllSubscriptions(w http.ResponseWriter, _ *http.Request) { @@ -357,41 +445,36 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) { respondWithError(w, "subscription not found") return } - cneEvent := event.CloudNativeEvent() - cneEvent.SetID(sub.ID) - cneEvent.Type = channel.STATUS.String() - cneEvent.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time) - cneEvent.SetDataContentType(cloudevents.ApplicationJSON) - cneEvent.SetData(cne.Data{ - Version: "v1", - }) - ceEvent, err := cneEvent.NewCloudEvent(sub) - if err != nil { - respondWithError(w, err.Error()) - } else { - // for http you send to the protocol address - statusChannel := make(chan *channel.StatusChan, 1) - s.dataOut <- &channel.DataChan{ - Type: channel.STATUS, - Data: ceEvent, - Address: sub.GetResource(), - StatusChan: statusChannel, - } - select { - case d := <-statusChannel: - if d.Data == nil || d.StatusCode != http.StatusOK { - if string(d.Message) == "" { - d.Message = []byte("event not found") - } - respondWithError(w, string(d.Message)) - } else { - respondWithJSON(w, d.StatusCode, *d.Data) - } - case <-time.After(5 * time.Second): - close(statusChannel) - respondWithError(w, "timeout waiting for status") + if resourceAddress == "" { + _ = json.NewEncoder(w).Encode(map[string]string{"message": "validation failed, resource is empty"}) + } + + if !strings.HasPrefix(resourceAddress, "/") { + resourceAddress = fmt.Sprintf("/%s", resourceAddress) + } + // this is placeholder not sending back to report + out := channel.DataChan{ + Address: resourceAddress, + // ClientID is not used + ClientID: uuid.New(), + Status: channel.NEW, + Type: channel.STATUS, // could be new event of new subscriber (sender) + } + + e, _ := out.CreateCloudEvents(CURRENTSTATE) + e.SetSource(resourceAddress) + // statusReceiveOverrideFn must return value for + if s.statusReceiveOverrideFn != nil { + if statusErr := s.statusReceiveOverrideFn(*e, &out); statusErr != nil { + respondWithError(w, statusErr.Error()) + } else if out.Data != nil { + respondWithJSON(w, http.StatusOK, *out.Data) + } else { + respondWithError(w, "event not found") } + } else { + respondWithError(w, "onReceive function not defined") } } @@ -445,10 +528,14 @@ func (s *Server) logEvent(w http.ResponseWriter, r *http.Request) { respondWithError(w, err.Error()) return } // check if publisher is found - log.Printf("event received %v", cneEvent) + log.Infof("event received %v", cneEvent) respondWithMessage(w, http.StatusAccepted, "Event published to log") } +func (s *Server) getClientIDFromURI(uri string) uuid.UUID { + return uuid.NewMD5(uuid.NameSpaceURL, []byte(uri)) +} + func dummy(w http.ResponseWriter, _ *http.Request) { respondWithMessage(w, http.StatusNoContent, "dummy test") } @@ -457,6 +544,15 @@ func respondWithError(w http.ResponseWriter, message string) { respondWithJSON(w, http.StatusBadRequest, map[string]string{"error": message}) } +func respondWithStatusCode(w http.ResponseWriter, code int, message string) { + if message != "" { + log.Errorf("%s", message) + // Response with message if spec were updated to allow message + // respondWithJSON(w, code, map[string]string{"error": message}) + } + w.WriteHeader(code) +} + func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) { if response, err := json.Marshal(payload); err == nil { w.Header().Set("Content-Type", cloudevents.ApplicationJSON) diff --git a/v2/server.go b/v2/server.go index be12d1d..63eeecf 100644 --- a/v2/server.go +++ b/v2/server.go @@ -5,7 +5,8 @@ // Terms Of Service: // // Schemes: http, https -// Host: localhost:8080 +// Host: localhost:8089 +// basePath: /api/ocloudNotifications/v1 // Version: 1.0.0 // Contact: Aneesh Puttur // @@ -25,12 +26,14 @@ import ( "sync" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/gorilla/mux" "github.com/redhat-cne/sdk-go/pkg/channel" "github.com/redhat-cne/sdk-go/pkg/event" "github.com/redhat-cne/sdk-go/pkg/pubsub" "github.com/redhat-cne/sdk-go/pkg/types" pubsubv1 "github.com/redhat-cne/sdk-go/v1/pubsub" + subscriberApi "github.com/redhat-cne/sdk-go/v1/subscriber" "io" "net/http" @@ -48,26 +51,32 @@ var healthCheckPause = 2 * time.Second type serverStatus int -const HTTPReadHeaderTimeout = 2 * time.Second +const ( + API_VERSION = "2.0" + HTTPReadHeaderTimeout = 2 * time.Second +) const ( starting = iota started notReady failed + CURRENTSTATE = "CurrentState" ) // Server defines rest routes server object type Server struct { port int apiPath string - //data out is transport in channel - dataOut chan<- *channel.DataChan - closeCh <-chan struct{} - HTTPClient *http.Client - httpServer *http.Server - pubSubAPI *pubsubv1.API - status serverStatus + //use dataOut chanel to write to configMap + dataOut chan<- *channel.DataChan + closeCh <-chan struct{} + HTTPClient *http.Client + httpServer *http.Server + pubSubAPI *pubsubv1.API + subscriberAPI *subscriberApi.API + status serverStatus + statusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error } // publisher/subscription data model @@ -115,7 +124,9 @@ type swaggReqAccepted struct { //nolint:deadcode,unused } // InitServer is used to supply configurations for rest routes server -func InitServer(port int, apiPath, storePath string, dataOut chan<- *channel.DataChan, closeCh <-chan struct{}) *Server { +func InitServer(port int, apiPath, storePath string, + dataOut chan<- *channel.DataChan, closeCh <-chan struct{}, + onStatusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error) *Server { once.Do(func() { ServerInstance = &Server{ port: port, @@ -129,7 +140,9 @@ func InitServer(port int, apiPath, storePath string, dataOut chan<- *channel.Dat }, Timeout: 10 * time.Second, }, - pubSubAPI: pubsubv1.GetAPIInstance(storePath), + pubSubAPI: pubsubv1.GetAPIInstance(storePath), + subscriberAPI: subscriberApi.GetAPIInstance(storePath), + statusReceiveOverrideFn: onStatusReceiveOverrideFn, } }) // singleton @@ -196,7 +209,7 @@ func (s *Server) Start() { api := r.PathPrefix(s.apiPath).Subrouter() // createSubscription create subscription and send it to a channel that is shared by middleware to process - // swagger:operation POST /subscriptions/ subscription createSubscription + // swagger:operation POST /subscriptions subscription createSubscription // --- // summary: Creates a new subscription. // description: If subscription creation is success(or if already exists), subscription will be returned with Created (201). @@ -237,6 +250,17 @@ func (s *Server) Start() { 404 Subscription resources are not available (not created). */ api.HandleFunc("/subscriptions", s.getSubscriptions).Methods(http.MethodGet) + //publishers create publisher and send it to a channel that is shared by middleware to process + // swagger:operation GET /publishers/ publishers getPublishers + // --- + // summary: Get publishers. + // description: If publisher creation is success(or if already exists), publisher will be returned with Created (201). + // parameters: + // responses: + // "200": + // "$ref": "#/responses/publishers" + // "404": + // "$ref": "#/responses/notFound" api.HandleFunc("/publishers", s.getPublishers).Methods(http.MethodGet) // 200 and 404 api.HandleFunc("/subscriptions/{subscriptionid}", s.getSubscriptionByID).Methods(http.MethodGet) @@ -322,8 +346,7 @@ func (s *Server) Start() { fmt.Fprintln(w, r) }) - log.Info("starting rest api server") - log.Infof("endpoint %s", s.apiPath) + log.Infof("starting v2 rest api server at port %d, endpoint %s", s.port, s.apiPath) go wait.Until(func() { s.status = started s.httpServer = &http.Server{ @@ -344,3 +367,8 @@ func (s *Server) Shutdown() { log.Warnf("trying to shutdown rest api sever, please use close channel to shutdown ") s.httpServer.Close() } + +// SetOnStatusReceiveOverrideFn ... sets receiver function +func (s *Server) SetOnStatusReceiveOverrideFn(fn func(e cloudevents.Event, dataChan *channel.DataChan) error) { + s.statusReceiveOverrideFn = fn +} diff --git a/v2/server_test.go b/v2/server_test.go index f8a9ed6..f72c7ee 100644 --- a/v2/server_test.go +++ b/v2/server_test.go @@ -153,7 +153,7 @@ func TestServer_CreateSubscription(t *testing.T) { // create subscription sub := api.NewPubSub( &types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}}, - resource) + resource, "1.0") data, err := json.Marshal(&sub) assert.Nil(t, err) @@ -295,7 +295,7 @@ func TestServer_GetCurrentState_withSubscription(t *testing.T) { // create subscription sub := api.NewPubSub( &types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}}, - resource) + resource, "1.0") data, err := json.Marshal(&sub) assert.Nil(t, err) diff --git a/v2/swagger.json b/v2/swagger.json index a382929..e4337c9 100644 --- a/v2/swagger.json +++ b/v2/swagger.json @@ -19,7 +19,8 @@ }, "version": "1.0.0" }, - "host": "localhost:8080", + "host": "localhost:8089", + "basePath": "/api/ocloudNotifications/v1", "paths": { "/create/event/": { "post": { @@ -50,6 +51,22 @@ } }, "/publishers/": { + "get": { + "description": "If publisher creation is success(or if already exists), publisher will be returned with Created (201).", + "tags": [ + "publishers" + ], + "summary": "Get publishers.", + "operationId": "getPublishers", + "responses": { + "200": { + "$ref": "#/responses/publishers" + }, + "404": { + "$ref": "#/responses/notFound" + } + } + }, "post": { "description": "If publisher creation is success(or if already exists), publisher will be returned with Created (201).", "tags": [ @@ -77,7 +94,7 @@ } } }, - "/subscriptions/": { + "/subscriptions": { "post": { "description": "If subscription creation is success(or if already exists), subscription will be returned with Created (201).", "tags": [ @@ -104,12 +121,37 @@ } } } + }, + "/subscriptions/status": { + "post": { + "description": "If publisher status ping is success, call will be returned with status accepted.", + "tags": [ + "subscriptions" + ], + "summary": "Get status of publishing events.", + "operationId": "pingForSubscribedEventStatus", + "parameters": [ + { + "description": "subscription id to check status for", + "name": "subscriptionid" + } + ], + "responses": { + "201": { + "$ref": "#/responses/pubSubResp" + }, + "400": { + "$ref": "#/responses/badReq" + } + } + } } }, "definitions": { "Data": { - "description": "Data ... cloud native events data\nData Json payload is as follows,\n{\n\"version\": \"v1.0\",\n\"values\": [{\n\"resource\": \"/cluster/node/ptp\",\n\"dataType\": \"notification\",\n\"valueType\": \"enumeration\",\n\"value\": \"ACQUIRING-SYNC\"\n}, {\n\"resource\": \"/cluster/node/clock\",\n\"dataType\": \"metric\",\n\"valueType\": \"decimal64.3\",\n\"value\": 100.3\n}]\n}", + "description": "{\n\"version\": \"v1.0\",\n\"values\": [{\n\"resource\": \"/sync/sync-status/sync-state\",\n\"dataType\": \"notification\",\n\"valueType\": \"enumeration\",\n\"value\": \"ACQUIRING-SYNC\"\n}, {\n\"resource\": \"/sync/sync-status/sync-state\",\n\"dataType\": \"metric\",\n\"valueType\": \"decimal64.3\",\n\"value\": 100.3\n}, {\n\"resource\": \"/redfish/v1/Systems\",\n\"dataType\": \"notification\",\n\"valueType\": \"redfish-event\",\n\"value\": {\n\"@odata.context\": \"/redfish/v1/$metadata#Event.Event\",\n\"@odata.type\": \"#Event.v1_3_0.Event\",\n\"Context\": \"any string is valid\",\n\"Events\": [{\"EventId\": \"2162\", \"MemberId\": \"615703\", \"MessageId\": \"TMP0100\"}],\n\"Id\": \"5e004f5a-e3d1-11eb-ae9c-3448edf18a38\",\n\"Name\": \"Event Array\"\n}\n}]\n}", "type": "object", + "title": "Data ... cloud native events data\nData Json payload is as follows,", "properties": { "values": { "type": "array", @@ -131,9 +173,9 @@ "x-go-package": "github.com/redhat-cne/sdk-go/pkg/event" }, "DataValue": { - "description": "DataValue Json payload is as follows,\n{\n\"resource\": \"/cluster/node/ptp\",\n\"dataType\": \"notification\",\n\"valueType\": \"enumeration\",\n\"value\": \"ACQUIRING-SYNC\"\n}", + "description": "{\n\"resource\": \"/cluster/node/ptp\",\n\"dataType\": \"notification\",\n\"valueType\": \"enumeration\",\n\"value\": \"ACQUIRING-SYNC\"\n}", "type": "object", - "title": "DataValue ...", + "title": "DataValue ...\nDataValue Json payload is as follows,", "properties": { "dataType": { "$ref": "#/definitions/DataType" @@ -143,7 +185,6 @@ "x-go-name": "Resource" }, "value": { - "type": "object", "x-go-name": "Value" }, "valueType": { @@ -153,9 +194,9 @@ "x-go-package": "github.com/redhat-cne/sdk-go/pkg/event" }, "Event": { - "description": "Event Json payload is as follows,\n{\n\"id\": \"5ce55d17-9234-4fee-a589-d0f10cb32b8e\",\n\"type\": \"event.synchronization-state-chang\",\n\"time\": \"2021-02-05T17:31:00Z\",\n\"data\": {\n\"version\": \"v1.0\",\n\"values\": [{\n\"resource\": \"/cluster/node/ptp\",\n\"dataType\": \"notification\",\n\"valueType\": \"enumeration\",\n\"value\": \"ACQUIRING-SYNC\"\n}, {\n\"resource\": \"/cluster/node/clock\",\n\"dataType\": \"metric\",\n\"valueType\": \"decimal64.3\",\n\"value\": 100.3\n}]\n}\n}\nEvent request model", + "description": "{\n\"id\": \"5ce55d17-9234-4fee-a589-d0f10cb32b8e\",\n\"type\": \"event.sync.sync-status.synchronization-state-change\",\n\"source\": \"/cluster/node/example.com/ptp/clock_realtime\",\n\"time\": \"2021-02-05T17:31:00Z\",\n\"data\": {\n\"version\": \"v1.0\",\n\"values\": [{\n\"resource\": \"/sync/sync-status/sync-state\",\n\"dataType\": \"notification\",\n\"valueType\": \"enumeration\",\n\"value\": \"ACQUIRING-SYNC\"\n}, {\n\"resource\": \"/sync/sync-status/sync-state\",\n\"dataType\": \"metric\",\n\"valueType\": \"decimal64.3\",\n\"value\": 100.3\n}]\n}\n}\n\nEvent request model", "type": "object", - "title": "Event represents the canonical representation of a Cloud Native Event.", + "title": "Event represents the canonical representation of a Cloud Native Event.\nEvent Json payload is as follows,", "properties": { "data": { "$ref": "#/definitions/Data" @@ -173,8 +214,15 @@ "type": "string", "x-go-name": "ID" }, + "source": { + "description": "Source - The source of the occurrence which has happened.\n+required", + "type": "string", + "x-go-name": "Source" + }, "time": { - "$ref": "#/definitions/Timestamp" + "description": "Time - A Timestamp when the event happened.\n+required", + "type": "string", + "x-go-name": "Time" }, "type": { "description": "Type - The type of the occurrence which has happened.\n+required", @@ -185,9 +233,9 @@ "x-go-package": "github.com/redhat-cne/sdk-go/pkg/event" }, "PubSub": { - "description": "PubSub Json request payload is as follows,\n{\n\"id\": \"789be75d-7ac3-472e-bbbc-6d62878aad4a\",\n\"endpointUri\": \"http://localhost:9090/ack/event\",\n\"uriLocation\": \"http://localhost:8080/api/ocloudNotifications/v1/publishers/{publisherid}\",\n\"resource\": \"/east-edge-10/vdu3/o-ran-sync/sync-group/sync-status/sync-state\"\n}\nPubSub request model", + "description": "{\n\"id\": \"789be75d-7ac3-472e-bbbc-6d62878aad4a\",\n\"endpointUri\": \"http://localhost:9090/ack/event\",\n\"uriLocation\": \"http://localhost:8080/api/ocloudNotifications/v1/publishers/{publisherid}\",\n\"resource\": \"/east-edge-10/vdu3/o-ran-sync/sync-group/sync-status/sync-state\"\n}\n\nPubSub request model", "type": "object", - "title": "PubSub represents the canonical representation of a Cloud Native Event Publisher and Subscribers .", + "title": "PubSub represents the canonical representation of a Cloud Native Event Publisher and Sender .\nPubSub Json request payload is as follows,", "properties": { "endpointUri": { "$ref": "#/definitions/URI" @@ -208,11 +256,6 @@ }, "x-go-package": "github.com/redhat-cne/sdk-go/pkg/pubsub" }, - "Timestamp": { - "description": "Timestamp wraps time.Time to normalize the time layout to RFC3339. It is\nintended to enforce compliance with the Cloud Native events spec for their\ndefinition of Timestamp. Custom marshal methods are implemented to ensure\nthe outbound Timestamp is a string in the RFC3339 layout.", - "type": "object", - "x-go-package": "github.com/redhat-cne/sdk-go/pkg/types" - }, "URI": { "description": "URI is a wrapper to url.URL. It is intended to enforce compliance with\nthe Cloud Native Events spec for their definition of URI. Custom\nmarshal methods are implemented to ensure the outbound URI object\nis a flat string.", "type": "object", @@ -226,6 +269,9 @@ "Host": { "type": "string" }, + "OmitHost": { + "type": "boolean" + }, "Opaque": { "type": "string" }, diff --git a/vendor/github.com/pkg/errors/.gitignore b/vendor/github.com/pkg/errors/.gitignore new file mode 100644 index 0000000..daf913b --- /dev/null +++ b/vendor/github.com/pkg/errors/.gitignore @@ -0,0 +1,24 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/vendor/github.com/pkg/errors/.travis.yml b/vendor/github.com/pkg/errors/.travis.yml new file mode 100644 index 0000000..9159de0 --- /dev/null +++ b/vendor/github.com/pkg/errors/.travis.yml @@ -0,0 +1,10 @@ +language: go +go_import_path: github.com/pkg/errors +go: + - 1.11.x + - 1.12.x + - 1.13.x + - tip + +script: + - make check diff --git a/vendor/github.com/pkg/errors/LICENSE b/vendor/github.com/pkg/errors/LICENSE new file mode 100644 index 0000000..835ba3e --- /dev/null +++ b/vendor/github.com/pkg/errors/LICENSE @@ -0,0 +1,23 @@ +Copyright (c) 2015, Dave Cheney +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/pkg/errors/Makefile b/vendor/github.com/pkg/errors/Makefile new file mode 100644 index 0000000..ce9d7cd --- /dev/null +++ b/vendor/github.com/pkg/errors/Makefile @@ -0,0 +1,44 @@ +PKGS := github.com/pkg/errors +SRCDIRS := $(shell go list -f '{{.Dir}}' $(PKGS)) +GO := go + +check: test vet gofmt misspell unconvert staticcheck ineffassign unparam + +test: + $(GO) test $(PKGS) + +vet: | test + $(GO) vet $(PKGS) + +staticcheck: + $(GO) get honnef.co/go/tools/cmd/staticcheck + staticcheck -checks all $(PKGS) + +misspell: + $(GO) get github.com/client9/misspell/cmd/misspell + misspell \ + -locale GB \ + -error \ + *.md *.go + +unconvert: + $(GO) get github.com/mdempsky/unconvert + unconvert -v $(PKGS) + +ineffassign: + $(GO) get github.com/gordonklaus/ineffassign + find $(SRCDIRS) -name '*.go' | xargs ineffassign + +pedantic: check errcheck + +unparam: + $(GO) get mvdan.cc/unparam + unparam ./... + +errcheck: + $(GO) get github.com/kisielk/errcheck + errcheck $(PKGS) + +gofmt: + @echo Checking code is gofmted + @test -z "$(shell gofmt -s -l -d -e $(SRCDIRS) | tee /dev/stderr)" diff --git a/vendor/github.com/pkg/errors/README.md b/vendor/github.com/pkg/errors/README.md new file mode 100644 index 0000000..54dfdcb --- /dev/null +++ b/vendor/github.com/pkg/errors/README.md @@ -0,0 +1,59 @@ +# errors [![Travis-CI](https://travis-ci.org/pkg/errors.svg)](https://travis-ci.org/pkg/errors) [![AppVeyor](https://ci.appveyor.com/api/projects/status/b98mptawhudj53ep/branch/master?svg=true)](https://ci.appveyor.com/project/davecheney/errors/branch/master) [![GoDoc](https://godoc.org/github.com/pkg/errors?status.svg)](http://godoc.org/github.com/pkg/errors) [![Report card](https://goreportcard.com/badge/github.com/pkg/errors)](https://goreportcard.com/report/github.com/pkg/errors) [![Sourcegraph](https://sourcegraph.com/github.com/pkg/errors/-/badge.svg)](https://sourcegraph.com/github.com/pkg/errors?badge) + +Package errors provides simple error handling primitives. + +`go get github.com/pkg/errors` + +The traditional error handling idiom in Go is roughly akin to +```go +if err != nil { + return err +} +``` +which applied recursively up the call stack results in error reports without context or debugging information. The errors package allows programmers to add context to the failure path in their code in a way that does not destroy the original value of the error. + +## Adding context to an error + +The errors.Wrap function returns a new error that adds context to the original error. For example +```go +_, err := ioutil.ReadAll(r) +if err != nil { + return errors.Wrap(err, "read failed") +} +``` +## Retrieving the cause of an error + +Using `errors.Wrap` constructs a stack of errors, adding context to the preceding error. Depending on the nature of the error it may be necessary to reverse the operation of errors.Wrap to retrieve the original error for inspection. Any error value which implements this interface can be inspected by `errors.Cause`. +```go +type causer interface { + Cause() error +} +``` +`errors.Cause` will recursively retrieve the topmost error which does not implement `causer`, which is assumed to be the original cause. For example: +```go +switch err := errors.Cause(err).(type) { +case *MyError: + // handle specifically +default: + // unknown error +} +``` + +[Read the package documentation for more information](https://godoc.org/github.com/pkg/errors). + +## Roadmap + +With the upcoming [Go2 error proposals](https://go.googlesource.com/proposal/+/master/design/go2draft.md) this package is moving into maintenance mode. The roadmap for a 1.0 release is as follows: + +- 0.9. Remove pre Go 1.9 and Go 1.10 support, address outstanding pull requests (if possible) +- 1.0. Final release. + +## Contributing + +Because of the Go2 errors changes, this package is not accepting proposals for new functionality. With that said, we welcome pull requests, bug fixes and issue reports. + +Before sending a PR, please discuss your change by raising an issue. + +## License + +BSD-2-Clause diff --git a/vendor/github.com/pkg/errors/appveyor.yml b/vendor/github.com/pkg/errors/appveyor.yml new file mode 100644 index 0000000..a932ead --- /dev/null +++ b/vendor/github.com/pkg/errors/appveyor.yml @@ -0,0 +1,32 @@ +version: build-{build}.{branch} + +clone_folder: C:\gopath\src\github.com\pkg\errors +shallow_clone: true # for startup speed + +environment: + GOPATH: C:\gopath + +platform: + - x64 + +# http://www.appveyor.com/docs/installed-software +install: + # some helpful output for debugging builds + - go version + - go env + # pre-installed MinGW at C:\MinGW is 32bit only + # but MSYS2 at C:\msys64 has mingw64 + - set PATH=C:\msys64\mingw64\bin;%PATH% + - gcc --version + - g++ --version + +build_script: + - go install -v ./... + +test_script: + - set PATH=C:\gopath\bin;%PATH% + - go test -v ./... + +#artifacts: +# - path: '%GOPATH%\bin\*.exe' +deploy: off diff --git a/vendor/github.com/pkg/errors/errors.go b/vendor/github.com/pkg/errors/errors.go new file mode 100644 index 0000000..161aea2 --- /dev/null +++ b/vendor/github.com/pkg/errors/errors.go @@ -0,0 +1,288 @@ +// Package errors provides simple error handling primitives. +// +// The traditional error handling idiom in Go is roughly akin to +// +// if err != nil { +// return err +// } +// +// which when applied recursively up the call stack results in error reports +// without context or debugging information. The errors package allows +// programmers to add context to the failure path in their code in a way +// that does not destroy the original value of the error. +// +// Adding context to an error +// +// The errors.Wrap function returns a new error that adds context to the +// original error by recording a stack trace at the point Wrap is called, +// together with the supplied message. For example +// +// _, err := ioutil.ReadAll(r) +// if err != nil { +// return errors.Wrap(err, "read failed") +// } +// +// If additional control is required, the errors.WithStack and +// errors.WithMessage functions destructure errors.Wrap into its component +// operations: annotating an error with a stack trace and with a message, +// respectively. +// +// Retrieving the cause of an error +// +// Using errors.Wrap constructs a stack of errors, adding context to the +// preceding error. Depending on the nature of the error it may be necessary +// to reverse the operation of errors.Wrap to retrieve the original error +// for inspection. Any error value which implements this interface +// +// type causer interface { +// Cause() error +// } +// +// can be inspected by errors.Cause. errors.Cause will recursively retrieve +// the topmost error that does not implement causer, which is assumed to be +// the original cause. For example: +// +// switch err := errors.Cause(err).(type) { +// case *MyError: +// // handle specifically +// default: +// // unknown error +// } +// +// Although the causer interface is not exported by this package, it is +// considered a part of its stable public interface. +// +// Formatted printing of errors +// +// All error values returned from this package implement fmt.Formatter and can +// be formatted by the fmt package. The following verbs are supported: +// +// %s print the error. If the error has a Cause it will be +// printed recursively. +// %v see %s +// %+v extended format. Each Frame of the error's StackTrace will +// be printed in detail. +// +// Retrieving the stack trace of an error or wrapper +// +// New, Errorf, Wrap, and Wrapf record a stack trace at the point they are +// invoked. This information can be retrieved with the following interface: +// +// type stackTracer interface { +// StackTrace() errors.StackTrace +// } +// +// The returned errors.StackTrace type is defined as +// +// type StackTrace []Frame +// +// The Frame type represents a call site in the stack trace. Frame supports +// the fmt.Formatter interface that can be used for printing information about +// the stack trace of this error. For example: +// +// if err, ok := err.(stackTracer); ok { +// for _, f := range err.StackTrace() { +// fmt.Printf("%+s:%d\n", f, f) +// } +// } +// +// Although the stackTracer interface is not exported by this package, it is +// considered a part of its stable public interface. +// +// See the documentation for Frame.Format for more details. +package errors + +import ( + "fmt" + "io" +) + +// New returns an error with the supplied message. +// New also records the stack trace at the point it was called. +func New(message string) error { + return &fundamental{ + msg: message, + stack: callers(), + } +} + +// Errorf formats according to a format specifier and returns the string +// as a value that satisfies error. +// Errorf also records the stack trace at the point it was called. +func Errorf(format string, args ...interface{}) error { + return &fundamental{ + msg: fmt.Sprintf(format, args...), + stack: callers(), + } +} + +// fundamental is an error that has a message and a stack, but no caller. +type fundamental struct { + msg string + *stack +} + +func (f *fundamental) Error() string { return f.msg } + +func (f *fundamental) Format(s fmt.State, verb rune) { + switch verb { + case 'v': + if s.Flag('+') { + io.WriteString(s, f.msg) + f.stack.Format(s, verb) + return + } + fallthrough + case 's': + io.WriteString(s, f.msg) + case 'q': + fmt.Fprintf(s, "%q", f.msg) + } +} + +// WithStack annotates err with a stack trace at the point WithStack was called. +// If err is nil, WithStack returns nil. +func WithStack(err error) error { + if err == nil { + return nil + } + return &withStack{ + err, + callers(), + } +} + +type withStack struct { + error + *stack +} + +func (w *withStack) Cause() error { return w.error } + +// Unwrap provides compatibility for Go 1.13 error chains. +func (w *withStack) Unwrap() error { return w.error } + +func (w *withStack) Format(s fmt.State, verb rune) { + switch verb { + case 'v': + if s.Flag('+') { + fmt.Fprintf(s, "%+v", w.Cause()) + w.stack.Format(s, verb) + return + } + fallthrough + case 's': + io.WriteString(s, w.Error()) + case 'q': + fmt.Fprintf(s, "%q", w.Error()) + } +} + +// Wrap returns an error annotating err with a stack trace +// at the point Wrap is called, and the supplied message. +// If err is nil, Wrap returns nil. +func Wrap(err error, message string) error { + if err == nil { + return nil + } + err = &withMessage{ + cause: err, + msg: message, + } + return &withStack{ + err, + callers(), + } +} + +// Wrapf returns an error annotating err with a stack trace +// at the point Wrapf is called, and the format specifier. +// If err is nil, Wrapf returns nil. +func Wrapf(err error, format string, args ...interface{}) error { + if err == nil { + return nil + } + err = &withMessage{ + cause: err, + msg: fmt.Sprintf(format, args...), + } + return &withStack{ + err, + callers(), + } +} + +// WithMessage annotates err with a new message. +// If err is nil, WithMessage returns nil. +func WithMessage(err error, message string) error { + if err == nil { + return nil + } + return &withMessage{ + cause: err, + msg: message, + } +} + +// WithMessagef annotates err with the format specifier. +// If err is nil, WithMessagef returns nil. +func WithMessagef(err error, format string, args ...interface{}) error { + if err == nil { + return nil + } + return &withMessage{ + cause: err, + msg: fmt.Sprintf(format, args...), + } +} + +type withMessage struct { + cause error + msg string +} + +func (w *withMessage) Error() string { return w.msg + ": " + w.cause.Error() } +func (w *withMessage) Cause() error { return w.cause } + +// Unwrap provides compatibility for Go 1.13 error chains. +func (w *withMessage) Unwrap() error { return w.cause } + +func (w *withMessage) Format(s fmt.State, verb rune) { + switch verb { + case 'v': + if s.Flag('+') { + fmt.Fprintf(s, "%+v\n", w.Cause()) + io.WriteString(s, w.msg) + return + } + fallthrough + case 's', 'q': + io.WriteString(s, w.Error()) + } +} + +// Cause returns the underlying cause of the error, if possible. +// An error value has a cause if it implements the following +// interface: +// +// type causer interface { +// Cause() error +// } +// +// If the error does not implement Cause, the original error will +// be returned. If the error is nil, nil will be returned without further +// investigation. +func Cause(err error) error { + type causer interface { + Cause() error + } + + for err != nil { + cause, ok := err.(causer) + if !ok { + break + } + err = cause.Cause() + } + return err +} diff --git a/vendor/github.com/pkg/errors/go113.go b/vendor/github.com/pkg/errors/go113.go new file mode 100644 index 0000000..be0d10d --- /dev/null +++ b/vendor/github.com/pkg/errors/go113.go @@ -0,0 +1,38 @@ +// +build go1.13 + +package errors + +import ( + stderrors "errors" +) + +// Is reports whether any error in err's chain matches target. +// +// The chain consists of err itself followed by the sequence of errors obtained by +// repeatedly calling Unwrap. +// +// An error is considered to match a target if it is equal to that target or if +// it implements a method Is(error) bool such that Is(target) returns true. +func Is(err, target error) bool { return stderrors.Is(err, target) } + +// As finds the first error in err's chain that matches target, and if so, sets +// target to that error value and returns true. +// +// The chain consists of err itself followed by the sequence of errors obtained by +// repeatedly calling Unwrap. +// +// An error matches target if the error's concrete value is assignable to the value +// pointed to by target, or if the error has a method As(interface{}) bool such that +// As(target) returns true. In the latter case, the As method is responsible for +// setting target. +// +// As will panic if target is not a non-nil pointer to either a type that implements +// error, or to any interface type. As returns false if err is nil. +func As(err error, target interface{}) bool { return stderrors.As(err, target) } + +// Unwrap returns the result of calling the Unwrap method on err, if err's +// type contains an Unwrap method returning error. +// Otherwise, Unwrap returns nil. +func Unwrap(err error) error { + return stderrors.Unwrap(err) +} diff --git a/vendor/github.com/pkg/errors/stack.go b/vendor/github.com/pkg/errors/stack.go new file mode 100644 index 0000000..779a834 --- /dev/null +++ b/vendor/github.com/pkg/errors/stack.go @@ -0,0 +1,177 @@ +package errors + +import ( + "fmt" + "io" + "path" + "runtime" + "strconv" + "strings" +) + +// Frame represents a program counter inside a stack frame. +// For historical reasons if Frame is interpreted as a uintptr +// its value represents the program counter + 1. +type Frame uintptr + +// pc returns the program counter for this frame; +// multiple frames may have the same PC value. +func (f Frame) pc() uintptr { return uintptr(f) - 1 } + +// file returns the full path to the file that contains the +// function for this Frame's pc. +func (f Frame) file() string { + fn := runtime.FuncForPC(f.pc()) + if fn == nil { + return "unknown" + } + file, _ := fn.FileLine(f.pc()) + return file +} + +// line returns the line number of source code of the +// function for this Frame's pc. +func (f Frame) line() int { + fn := runtime.FuncForPC(f.pc()) + if fn == nil { + return 0 + } + _, line := fn.FileLine(f.pc()) + return line +} + +// name returns the name of this function, if known. +func (f Frame) name() string { + fn := runtime.FuncForPC(f.pc()) + if fn == nil { + return "unknown" + } + return fn.Name() +} + +// Format formats the frame according to the fmt.Formatter interface. +// +// %s source file +// %d source line +// %n function name +// %v equivalent to %s:%d +// +// Format accepts flags that alter the printing of some verbs, as follows: +// +// %+s function name and path of source file relative to the compile time +// GOPATH separated by \n\t (\n\t) +// %+v equivalent to %+s:%d +func (f Frame) Format(s fmt.State, verb rune) { + switch verb { + case 's': + switch { + case s.Flag('+'): + io.WriteString(s, f.name()) + io.WriteString(s, "\n\t") + io.WriteString(s, f.file()) + default: + io.WriteString(s, path.Base(f.file())) + } + case 'd': + io.WriteString(s, strconv.Itoa(f.line())) + case 'n': + io.WriteString(s, funcname(f.name())) + case 'v': + f.Format(s, 's') + io.WriteString(s, ":") + f.Format(s, 'd') + } +} + +// MarshalText formats a stacktrace Frame as a text string. The output is the +// same as that of fmt.Sprintf("%+v", f), but without newlines or tabs. +func (f Frame) MarshalText() ([]byte, error) { + name := f.name() + if name == "unknown" { + return []byte(name), nil + } + return []byte(fmt.Sprintf("%s %s:%d", name, f.file(), f.line())), nil +} + +// StackTrace is stack of Frames from innermost (newest) to outermost (oldest). +type StackTrace []Frame + +// Format formats the stack of Frames according to the fmt.Formatter interface. +// +// %s lists source files for each Frame in the stack +// %v lists the source file and line number for each Frame in the stack +// +// Format accepts flags that alter the printing of some verbs, as follows: +// +// %+v Prints filename, function, and line number for each Frame in the stack. +func (st StackTrace) Format(s fmt.State, verb rune) { + switch verb { + case 'v': + switch { + case s.Flag('+'): + for _, f := range st { + io.WriteString(s, "\n") + f.Format(s, verb) + } + case s.Flag('#'): + fmt.Fprintf(s, "%#v", []Frame(st)) + default: + st.formatSlice(s, verb) + } + case 's': + st.formatSlice(s, verb) + } +} + +// formatSlice will format this StackTrace into the given buffer as a slice of +// Frame, only valid when called with '%s' or '%v'. +func (st StackTrace) formatSlice(s fmt.State, verb rune) { + io.WriteString(s, "[") + for i, f := range st { + if i > 0 { + io.WriteString(s, " ") + } + f.Format(s, verb) + } + io.WriteString(s, "]") +} + +// stack represents a stack of program counters. +type stack []uintptr + +func (s *stack) Format(st fmt.State, verb rune) { + switch verb { + case 'v': + switch { + case st.Flag('+'): + for _, pc := range *s { + f := Frame(pc) + fmt.Fprintf(st, "\n%+v", f) + } + } + } +} + +func (s *stack) StackTrace() StackTrace { + f := make([]Frame, len(*s)) + for i := 0; i < len(f); i++ { + f[i] = Frame((*s)[i]) + } + return f +} + +func callers() *stack { + const depth = 32 + var pcs [depth]uintptr + n := runtime.Callers(3, pcs[:]) + var st stack = pcs[0:n] + return &st +} + +// funcname removes the path prefix component of a function's name reported by func.Name(). +func funcname(name string) string { + i := strings.LastIndex(name, "/") + name = name[i+1:] + i = strings.Index(name, ".") + return name[i+1:] +} diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/common/common.go b/vendor/github.com/redhat-cne/sdk-go/pkg/common/common.go new file mode 100644 index 0000000..41337be --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/common/common.go @@ -0,0 +1,54 @@ +// Copyright 2020 The Cloud Native Events Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "strconv" + "strings" + + log "github.com/sirupsen/logrus" +) + +// REST API versions +const ( + V1 = "1.0" + V2 = "2.0" + NONCOMPLIANT = "not compliant with O-Cloud Notification API V3.0" +) + +func getMajorVersion(version string) (int, error) { + if version == "" { + return 1, nil + } + version = strings.TrimPrefix(version, "v") + version = strings.TrimPrefix(version, "V") + v := strings.Split(version, ".") + majorVersion, err := strconv.Atoi(v[0]) + if err != nil { + log.Errorf("Error parsing major version from %s, %v", version, err) + return 1, err + } + return majorVersion, nil +} + +func IsV1Api(version string) bool { + if majorVersion, err := getMajorVersion(version); err == nil { + if majorVersion >= 2 { + return false + } + } + // by default use V1 + return true +} diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go b/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go index 5fe66d4..5f0508c 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go @@ -20,6 +20,7 @@ import ( cloudevent "github.com/cloudevents/sdk-go/v2" "github.com/google/uuid" + "github.com/redhat-cne/sdk-go/pkg/common" "github.com/redhat-cne/sdk-go/pkg/pubsub" ) @@ -28,8 +29,10 @@ func (e *Event) NewCloudEvent(ps *pubsub.PubSub) (*cloudevent.Event, error) { ce := cloudevent.NewEvent(cloudevent.VersionV03) ce.SetTime(e.GetTime()) ce.SetType(e.Type) - ce.SetDataContentType(cloudevent.ApplicationJSON) - ce.SetSubject(e.Source) // subject is set to source of the event object + if common.IsV1Api(e.Data.GetVersion()) { + ce.SetDataContentType(cloudevent.ApplicationJSON) + ce.SetSubject(e.Source) // subject is set to source of the event object + } ce.SetSource(ps.Resource) // bus address ce.SetSpecVersion(cloudevent.VersionV03) ce.SetID(uuid.New().String()) diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_data.go b/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_data.go index 5130996..947600f 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_data.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_data.go @@ -17,6 +17,8 @@ package event import ( "fmt" "regexp" + + "github.com/redhat-cne/sdk-go/pkg/common" ) // DataType ... @@ -85,9 +87,9 @@ type Data struct { // "value": "ACQUIRING-SYNC" // } type DataValue struct { - Resource string `json:"resource" example:"/cluster/node/clock"` - DataType DataType `json:"dataType" example:"metric"` - ValueType ValueType `json:"valueType" example:"decimal64.3"` + Resource string `json:"ResourceAddress" example:"/cluster/node/clock"` + DataType DataType `json:"data_type" example:"metric"` + ValueType ValueType `json:"value_type" example:"decimal64.3"` Value interface{} `json:"value" example:"100.3"` } @@ -136,3 +138,28 @@ func (v *DataValue) SetResource(r string) error { } return nil } + +func (d *Data) GetResourceName() string { + if common.IsV1Api(d.Version) { + return "resource" + } + return "ResourceAddress" +} + +func (d *Data) GetDataTypeName() string { + if common.IsV1Api(d.Version) { + return "dataType" + } + return "data_type" +} + +func (d *Data) GetValueTypeName() string { + if common.IsV1Api(d.Version) { + return "valueType" + } + return "value_type" +} + +func (d *Data) GetValueName() string { + return "value" +} diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_marshal.go b/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_marshal.go index 6989728..0638bdc 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_marshal.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_marshal.go @@ -123,16 +123,16 @@ func writeJSONData(in *Data, writer io.Writer, stream *jsoniter.Stream) error { } count++ stream.WriteObjectStart() - stream.WriteObjectField("resource") + stream.WriteObjectField(in.GetResourceName()) stream.WriteString(v.GetResource()) stream.WriteMore() - stream.WriteObjectField("dataType") + stream.WriteObjectField(in.GetDataTypeName()) stream.WriteString(string(v.DataType)) stream.WriteMore() - stream.WriteObjectField("valueType") + stream.WriteObjectField(in.GetValueTypeName()) stream.WriteString(string(v.ValueType)) stream.WriteMore() - stream.WriteObjectField("value") + stream.WriteObjectField(in.GetValueName()) switch v.ValueType { case ENUMERATION: // if type is a string diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_unmarshal.go b/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_unmarshal.go index dd55066..4ca750a 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_unmarshal.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_unmarshal.go @@ -49,12 +49,12 @@ func ReadDataJSON(out *Data, reader io.Reader) error { func readDataJSONFromIterator(out *Data, iterator *jsoniter.Iterator) error { var ( // Universally parseable fields. - version string - data []DataValue + data []DataValue // These fields require knowledge about the specversion to be parsed. //schemaurl jsoniter.Any ) +READ_VERSION: for key := iterator.ReadObject(); key != ""; key = iterator.ReadObject() { // Check if we have some error in our error cache if iterator.Error != nil { @@ -64,9 +64,23 @@ func readDataJSONFromIterator(out *Data, iterator *jsoniter.Iterator) error { // If no specversion ... switch key { case "version": - version = iterator.ReadString() + out.Version = iterator.ReadString() + break READ_VERSION + default: + iterator.Skip() + } + } + + for key := iterator.ReadObject(); key != ""; key = iterator.ReadObject() { + // Check if we have some error in our error cache + if iterator.Error != nil { + return iterator.Error + } + + // If no specversion ... + switch key { case "values": - data, _ = readDataValue(iterator) + data, _ = readDataValue(iterator, out) default: iterator.Skip() @@ -76,7 +90,6 @@ func readDataJSONFromIterator(out *Data, iterator *jsoniter.Iterator) error { if iterator.Error != nil { return iterator.Error } - out.Version = version out.Values = data return nil } @@ -148,7 +161,7 @@ func readTimestamp(iter *jsoniter.Iterator) *types.Timestamp { return t } -func readDataValue(iter *jsoniter.Iterator) ([]DataValue, error) { +func readDataValue(iter *jsoniter.Iterator, d *Data) ([]DataValue, error) { var values []DataValue var err error for iter.ReadArray() { @@ -156,13 +169,13 @@ func readDataValue(iter *jsoniter.Iterator) ([]DataValue, error) { dv := DataValue{} for dvField := iter.ReadObject(); dvField != ""; dvField = iter.ReadObject() { switch dvField { - case "resource": + case d.GetResourceName(): dv.Resource = iter.ReadString() - case "dataType": + case d.GetDataTypeName(): dv.DataType = DataType(iter.ReadString()) - case "valueType": + case d.GetValueTypeName(): dv.ValueType = ValueType(iter.ReadString()) - case "value": + case d.GetValueName(): cacheValue = iter.Read() default: iter.Skip() @@ -199,6 +212,7 @@ func readData(iter *jsoniter.Iterator) (*Data, error) { Values: []DataValue{}, } +READ_VERSION: for key := iter.ReadObject(); key != ""; key = iter.ReadObject() { // Check if we have some error in our error cache if iter.Error != nil { @@ -207,8 +221,20 @@ func readData(iter *jsoniter.Iterator) (*Data, error) { switch key { case "version": data.Version = iter.ReadString() + break READ_VERSION + default: + iter.Skip() + } + } + + for key := iter.ReadObject(); key != ""; key = iter.ReadObject() { + // Check if we have some error in our error cache + if iter.Error != nil { + return data, iter.Error + } + switch key { case "values": - values, err := readDataValue(iter) + values, err := readDataValue(iter, data) if err != nil { return data, err } diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub.go b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub.go index 26cdcfd..428c9bf 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub.go @@ -17,6 +17,7 @@ package pubsub import ( "strings" + "github.com/redhat-cne/sdk-go/pkg/common" "github.com/redhat-cne/sdk-go/pkg/types" ) @@ -32,25 +33,54 @@ import ( // // PubSub request model type PubSub struct { + Version string `json:"version" omit:"1.0"` // ID of the pub/sub; is updated on successful creation of publisher/subscription. - ID string `json:"id" omit:"empty"` + ID string `json:"SubscriptionId" omit:"empty"` // EndPointURI - A URI describing the event action link. // +required - EndPointURI *types.URI `json:"endpointUri" example:"http://localhost:9090/ack/event" omit:"empty"` + EndPointURI *types.URI `json:"EndpointUri" example:"http://localhost:9090/ack/event" omit:"empty"` // URILocation - A URI describing the producer/subscription get link. - URILocation *types.URI `json:"uriLocation" omit:"empty"` + URILocation *types.URI `json:"UriLocation" omit:"empty"` // Resource - The type of the Resource. // +required - Resource string `json:"resource" example:"/east-edge-10/vdu3/o-ran-sync/sync-group/sync-status/sync-state"` + Resource string `json:"ResourceAddress" example:"/east-edge-10/vdu3/o-ran-sync/sync-group/sync-status/sync-state"` } // String returns a pretty-printed representation of the Event. func (ps *PubSub) String() string { b := strings.Builder{} - b.WriteString(" EndpointURI: " + ps.GetEndpointURI() + "\n") - b.WriteString(" URILocation: " + ps.GetURILocation() + "\n") + b.WriteString(" EndpointUri: " + ps.GetEndpointURI() + "\n") + b.WriteString(" UriLocation: " + ps.GetURILocation() + "\n") b.WriteString(" ID: " + ps.GetID() + "\n") b.WriteString(" Resource: " + ps.GetResource() + "\n") return b.String() } + +func (ps *PubSub) GetIDName() string { + if common.IsV1Api(ps.Version) { + return "id" + } + return "SubscriptionId" +} + +func (ps *PubSub) GetEndpointURIName() string { + if common.IsV1Api(ps.Version) { + return "endpointUri" + } + return "EndpointUri" +} + +func (ps *PubSub) GetURILocationName() string { + if common.IsV1Api(ps.Version) { + return "uriLocation" + } + return "UriLocation" +} + +func (ps *PubSub) GetResourceName() string { + if common.IsV1Api(ps.Version) { + return "resource" + } + return "ResourceAddress" +} diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_interface.go b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_interface.go index ccbdfd6..351176a 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_interface.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_interface.go @@ -16,6 +16,8 @@ package pubsub // Reader is the interface for reading through an event from attributes. type Reader interface { + // GetVersion returns event.GetVersion() + GetVersion() string // GetResource returns event.GetResource() GetResource() string // GetEndpointUri returns event.GetEndpointUri() @@ -31,6 +33,8 @@ type Reader interface { // If an error is thrown by a sub-component, Writer caches the error // internally and exposes errors with a call to Writer.Validate(). type Writer interface { + // SetVersion performs event.SetVersion() + SetVersion(string) // SetResource performs event.SetResource() SetResource(string) error // SetEndpointURI [erforms] event.SetEndpointURI() diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_marshal.go b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_marshal.go new file mode 100644 index 0000000..357bb0d --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_marshal.go @@ -0,0 +1,66 @@ +// Copyright 2020 The Cloud Native Events Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pubsub + +import ( + "bytes" + "fmt" + "io" + + jsoniter "github.com/json-iterator/go" +) + +// WriteJSON writes the in event in the provided writer. +// Note: this function assumes the input event is valid. +func WriteJSON(in *PubSub, writer io.Writer) error { + stream := jsoniter.ConfigFastest.BorrowStream(writer) + defer jsoniter.ConfigFastest.ReturnStream(stream) + stream.WriteObjectStart() + + stream.WriteObjectField(in.GetResourceName()) + stream.WriteString(in.GetResource()) + + stream.WriteMore() + stream.WriteObjectField(in.GetEndpointURIName()) + stream.WriteString(in.GetEndpointURI()) + + if in.GetID() != "" { + stream.WriteMore() + stream.WriteObjectField(in.GetIDName()) + stream.WriteString(in.GetID()) + } + + if in.GetURILocation() != "" { + stream.WriteMore() + stream.WriteObjectField(in.GetURILocationName()) + stream.WriteString(in.GetURILocation()) + } + + // Let's do a check on the error + if stream.Error != nil { + return fmt.Errorf("error while writing the event attributes: %w", stream.Error) + } + + stream.WriteObjectEnd() + return stream.Flush() +} + +// MarshalJSON implements a custom json marshal method used when this type is +// marshaled using json.Marshal. +func (d PubSub) MarshalJSON() ([]byte, error) { + var buf bytes.Buffer + err := WriteJSON(&d, &buf) + return buf.Bytes(), err +} diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_reader.go b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_reader.go index 1390f50..7be8809 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_reader.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_reader.go @@ -16,6 +16,11 @@ package pubsub var _ Reader = (*PubSub)(nil) +// GetResource implements EventReader.Version +func (ps *PubSub) GetVersion() string { + return ps.Version +} + // GetResource implements EventReader.Resource func (ps *PubSub) GetResource() string { return ps.Resource diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_unmarshal.go b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_unmarshal.go new file mode 100644 index 0000000..80f6a38 --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_unmarshal.go @@ -0,0 +1,98 @@ +package pubsub + +import ( + "io" + "sync" + + jsoniter "github.com/json-iterator/go" + "github.com/redhat-cne/sdk-go/pkg/common" + log "github.com/sirupsen/logrus" +) + +var iterPool = sync.Pool{ + New: func() interface{} { + return jsoniter.Parse(jsoniter.ConfigFastest, nil, 1024) + }, +} + +func borrowIterator(reader io.Reader) *jsoniter.Iterator { + iter := iterPool.Get().(*jsoniter.Iterator) + iter.Reset(reader) + return iter +} + +func returnIterator(iter *jsoniter.Iterator) { + iter.Error = nil + iter.Attachment = nil + iterPool.Put(iter) +} + +// ReadJSON ... +func ReadJSON(out *PubSub, reader io.Reader) error { + iterator := borrowIterator(reader) + defer returnIterator(iterator) + return readJSONFromIterator(out, iterator) +} + +// readJSONFromIterator allows you to read the bytes reader as an PubSub +func readJSONFromIterator(out *PubSub, iterator *jsoniter.Iterator) error { + var ( + id string + endpointUri string + uriLocation string + resource string + version string + ) + + for key := iterator.ReadObject(); key != ""; key = iterator.ReadObject() { + // Check if we have some error in our error cache + if iterator.Error != nil { + return iterator.Error + } + + // If no specversion ... + switch key { + case "SubscriptionId": + id = iterator.ReadString() + case "EndpointUri": + endpointUri = iterator.ReadString() + case "UriLocation": + uriLocation = iterator.ReadString() + case "ResourceAddress": + resource = iterator.ReadString() + version = common.V2 + case "id": + id = iterator.ReadString() + case "endpointUri": + endpointUri = iterator.ReadString() + case "uriLocation": + uriLocation = iterator.ReadString() + case "resource": + log.Warningf("%s, resource is used instead of ResourceAddress", common.NONCOMPLIANT) + resource = iterator.ReadString() + version = common.V1 + default: + iterator.Skip() + } + } + + if iterator.Error != nil { + return iterator.Error + } + + out.SetID(id) + out.SetEndpointURI(endpointUri) //nolint:errcheck + out.SetURILocation(uriLocation) + out.SetResource(resource) + out.SetVersion(version) + + return nil +} + +// UnmarshalJSON implements the json unmarshal method used when this type is +// unmarshaled using json.Unmarshal. +func (d *PubSub) UnmarshalJSON(b []byte) error { + iterator := jsoniter.ConfigFastest.BorrowIterator(b) + defer jsoniter.ConfigFastest.ReturnIterator(iterator) + return readJSONFromIterator(d, iterator) +} diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_writer.go b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_writer.go index 25b75db..48bb495 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_writer.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub_writer.go @@ -25,6 +25,11 @@ import ( var _ Writer = (*PubSub)(nil) +// SetVersion implements EventWriter.SetVersion +func (ps *PubSub) SetVersion(version string) { + ps.Version = version +} + // SetResource implements EventWriter.SetResource func (ps *PubSub) SetResource(s string) error { matched, err := regexp.MatchString(`([^/]+(/{2,}[^/]+)?)`, s) diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/store/subscriber/subscriber.go b/vendor/github.com/redhat-cne/sdk-go/pkg/store/subscriber/subscriber.go new file mode 100644 index 0000000..6584cc0 --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/store/subscriber/subscriber.go @@ -0,0 +1,40 @@ +package subscriber + +import ( + "sync" + + "github.com/google/uuid" + + "github.com/redhat-cne/sdk-go/pkg/subscriber" +) + +// Store defines subscribers connection store struct +type Store struct { + sync.RWMutex + // Store stores subscribers in a map + Store map[uuid.UUID]*subscriber.Subscriber +} + +// Set is a wrapper for setting the value of a key in the underlying map +func (ss *Store) Set(clientID uuid.UUID, val subscriber.Subscriber) { + ss.Lock() + defer ss.Unlock() + ss.Store[clientID] = &val +} + +// Get is a wrapper for Getting the value of a key in the underlying map +func (ss *Store) Get(clientID uuid.UUID) (subscriber.Subscriber, bool) { + ss.Lock() + defer ss.Unlock() + if s, ok := ss.Store[clientID]; ok { + return *s, true + } + return subscriber.Subscriber{}, false +} + +// Delete ... delete from store +func (ss *Store) Delete(clientID uuid.UUID) { + ss.Lock() + defer ss.Unlock() + delete(ss.Store, clientID) +} diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/docs.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/docs.go new file mode 100644 index 0000000..002f978 --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/docs.go @@ -0,0 +1 @@ +package subscriber diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber.go new file mode 100644 index 0000000..3c8f3fd --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber.go @@ -0,0 +1,112 @@ +package subscriber + +// Copyright 2022 The Cloud Native Events Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import ( + "strings" + "sync" + + "github.com/google/uuid" + + "github.com/redhat-cne/sdk-go/pkg/channel" + + "github.com/redhat-cne/sdk-go/pkg/pubsub" + "github.com/redhat-cne/sdk-go/pkg/store" + "github.com/redhat-cne/sdk-go/pkg/types" +) + +// Status of the client connections +type Status int64 + +const ( + SetConnectionToFailAfter = 10 +) +const ( + // InActive client + InActive Status = iota + // Active Client + Active +) + +// Subscriber object holds client connections +type Subscriber struct { + // ClientID of the sub + // +required + ClientID uuid.UUID `json:"clientID" omit:"empty"` + // +required + SubStore *store.PubSubStore `json:"subStore" omit:"empty"` + // EndPointURI - A URI describing the subscriber link . + // +required + EndPointURI *types.URI `json:"endPointURI" omit:"empty"` + // Status ... + Status Status `json:"status" omit:"empty"` + // Action ... + Action channel.Status `json:"action" omit:"empty"` + // FailedCount ... + failedCount int +} + +// Get ... get pubsub +func (s *Subscriber) Get(subID string) pubsub.PubSub { + return s.SubStore.Get(subID) +} + +// IncFailCount ... +func (s *Subscriber) IncFailCount() { + s.failedCount++ + if s.failedCount >= SetConnectionToFailAfter { + s.Action = channel.DELETE + s.Status = InActive + } +} + +// ResetFailCount ... +func (s *Subscriber) ResetFailCount() { + s.failedCount = 0 +} + +func (s *Subscriber) FailedCount() int { + return s.failedCount +} + +// String returns a pretty-printed representation of the Event. +func (s *Subscriber) String() string { + b := strings.Builder{} + b.WriteString(" EndPointURI: " + s.GetEndPointURI() + "\n") + b.WriteString(" ID: " + s.GetClientID().String() + "\n") + b.WriteString(" sub :{") + if s.SubStore != nil { + for _, v := range s.SubStore.Store { + b.WriteString(" {") + b.WriteString(v.String() + "\n") + b.WriteString(" },") + } + } + b.WriteString(" }") + return b.String() +} + +// New create new subscriber +func New(clientID uuid.UUID) *Subscriber { + return &Subscriber{ + ClientID: clientID, + SubStore: &store.PubSubStore{ + RWMutex: sync.RWMutex{}, + Store: map[string]*pubsub.PubSub{}, + }, + EndPointURI: nil, + Status: 0, + } +} diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_interface.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_interface.go new file mode 100644 index 0000000..7866033 --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_interface.go @@ -0,0 +1,51 @@ +package subscriber + +import ( + "github.com/google/uuid" + "github.com/redhat-cne/sdk-go/pkg/pubsub" + "github.com/redhat-cne/sdk-go/pkg/store" +) + +// Copyright 2022 The Cloud Native Events Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Reader is the interface for reading through an event from attributes. +type Reader interface { + // GetClientID returns clientID + GetClientID() uuid.UUID + // GetStatus Get Status of the subscribers + GetStatus() Status + // String returns a pretty-printed representation of the PubSub. + String() string + // GetSubStore return pubsub data + GetSubStore() *store.PubSubStore + // GetEndPointURI EndPointURI return endpoint + GetEndPointURI() string +} + +// Writer is the interface for writing through an event onto attributes. +// If an error is thrown by a subcomponent, Writer caches the error +// internally and exposes errors with a call to Writer.Validate(). +type Writer interface { + // SetClientID Resource performs event.SetResource() + SetClientID(clientID uuid.UUID) + + // SetStatus SetID performs event.SetID. + SetStatus(status Status) + + // SetEndPointURI SetHealthEndPoint set health endpoint + SetEndPointURI(url string) error + + AddSubscription(sub ...pubsub.PubSub) +} diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_reader.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_reader.go new file mode 100644 index 0000000..98bcf43 --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_reader.go @@ -0,0 +1,44 @@ +package subscriber + +import ( + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" + "github.com/redhat-cne/sdk-go/pkg/channel" + "github.com/redhat-cne/sdk-go/pkg/store" +) + +var _ Reader = (*Subscriber)(nil) + +// GetClientID ... Get subscriber ClientID +func (s *Subscriber) GetClientID() uuid.UUID { + return s.ClientID +} + +// GetEndPointURI EndPointURI returns uri location +func (s *Subscriber) GetEndPointURI() string { + return s.EndPointURI.String() +} + +// GetStatus of the client connection +func (s *Subscriber) GetStatus() Status { + return s.Status +} + +// GetSubStore get subscription store +func (s *Subscriber) GetSubStore() *store.PubSubStore { + return s.SubStore +} + +// CreateCloudEvents ... +func (s *Subscriber) CreateCloudEvents() (*cloudevents.Event, error) { + ce := cloudevents.NewEvent(cloudevents.VersionV03) + ce.SetDataContentType(cloudevents.ApplicationJSON) + ce.SetSpecVersion(cloudevents.VersionV03) + ce.SetType(channel.SUBSCRIBER.String()) + ce.SetSource("subscription-request") + ce.SetID(uuid.New().String()) + if err := ce.SetData(cloudevents.ApplicationJSON, s); err != nil { + return nil, err + } + return &ce, nil +} diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_writer.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_writer.go new file mode 100644 index 0000000..5b9bad9 --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_writer.go @@ -0,0 +1,53 @@ +package subscriber + +import ( + "fmt" + "net/url" + "strings" + + "github.com/google/uuid" + + "github.com/redhat-cne/sdk-go/pkg/pubsub" + "github.com/redhat-cne/sdk-go/pkg/types" +) + +var _ Writer = (*Subscriber)(nil) + +// SetClientID ... +func (s *Subscriber) SetClientID(clientID uuid.UUID) { + s.ClientID = clientID +} + +// SetEndPointURI set uri location (return url) +func (s *Subscriber) SetEndPointURI(endPointURI string) error { + sEndPointURI := strings.TrimSpace(endPointURI) + if sEndPointURI == "" { + s.EndPointURI = nil + err := fmt.Errorf("EndPointURI is given empty string,should be valid url") + return err + } + endPointURL, err := url.Parse(sEndPointURI) + if err != nil { + return err + } + s.EndPointURI = &types.URI{URL: *endPointURL} + return nil +} + +// SetStatus set status of the connection +func (s *Subscriber) SetStatus(status Status) { + s.Status = status +} + +// AddSubscription ... +func (s *Subscriber) AddSubscription(subs ...pubsub.PubSub) { + for _, ss := range subs { + newS := &pubsub.PubSub{ + ID: ss.GetID(), + EndPointURI: nil, + URILocation: nil, + Resource: ss.Resource, + } + s.SubStore.Store[ss.GetID()] = newS + } +} diff --git a/vendor/github.com/redhat-cne/sdk-go/v1/pubsub/pubsub.go b/vendor/github.com/redhat-cne/sdk-go/v1/pubsub/pubsub.go index 081a8ba..0375e2e 100644 --- a/vendor/github.com/redhat-cne/sdk-go/v1/pubsub/pubsub.go +++ b/vendor/github.com/redhat-cne/sdk-go/v1/pubsub/pubsub.go @@ -44,10 +44,14 @@ var once sync.Once var mu sync.Mutex // NewPubSub create new publisher or subscriber -func NewPubSub(endPointURI *types.URI, resource string) pubsub.PubSub { +func NewPubSub(endPointURI *types.URI, resource string, version string) pubsub.PubSub { + if version == "" { + version = "1.0" + } return pubsub.PubSub{ EndPointURI: endPointURI, Resource: resource, + Version: version, } } @@ -124,6 +128,7 @@ func (p *API) GetFromPubStore(address string) (pubsub.PubSub, error) { for _, pub := range p.pubStore.Store { if pub.GetResource() == address { return pubsub.PubSub{ + Version: pub.Version, ID: pub.ID, EndPointURI: pub.EndPointURI, URILocation: pub.URILocation, @@ -139,6 +144,7 @@ func (p *API) GetFromSubStore(address string) (pubsub.PubSub, error) { for _, sub := range p.subStore.Store { if sub.GetResource() == address { return pubsub.PubSub{ + Version: sub.Version, ID: sub.ID, EndPointURI: sub.EndPointURI, URILocation: sub.URILocation, @@ -167,6 +173,7 @@ func (p *API) HasPublisher(address string) (pubsub.PubSub, bool) { // CreateSubscription create a subscription and store it in a file and cache func (p *API) CreateSubscription(sub pubsub.PubSub) (pubsub.PubSub, error) { + //TODO-V2: remove this from v2 since already checked this before calling if subExists, ok := p.HasSubscription(sub.GetResource()); ok { log.Warnf("there was already a subscription in the store,skipping creation %v", subExists) p.subStore.Set(sub.ID, subExists) @@ -176,7 +183,6 @@ func (p *API) CreateSubscription(sub pubsub.PubSub) (pubsub.PubSub, error) { sub.SetID(uuid.New().String()) } // persist the subscription - - //TODO:might want to use PVC to live beyond pod crash err := writeToFile(sub, fmt.Sprintf("%s/%s", p.storeFilePath, p.subFile)) if err != nil { log.Errorf("error writing to a store %v\n", err) @@ -199,7 +205,6 @@ func (p *API) CreatePublisher(pub pubsub.PubSub) (pubsub.PubSub, error) { pub.SetID(uuid.New().String()) } // persist the subscription - - //TODO:might want to use PVC to live beyond pod crash err := writeToFile(pub, fmt.Sprintf("%s/%s", p.storeFilePath, p.pubFile)) if err != nil { log.Errorf("error writing to a store %v\n", err) @@ -252,9 +257,13 @@ func (p *API) DeletePublisher(publisherID string) error { func (p *API) DeleteSubscription(subscriptionID string) error { log.Info("deleting subscription") if pub, ok := p.subStore.Store[subscriptionID]; ok { - err := deleteFromFile(*pub, fmt.Sprintf("%s/%s", p.storeFilePath, p.subFile)) - p.subStore.Delete(subscriptionID) - return err + if err := deleteFromFile(*pub, fmt.Sprintf("%s/%s", p.storeFilePath, p.subFile)); err != nil { + p.subStore.Delete(subscriptionID) + } else { + return err + } + } else { + return fmt.Errorf("subscription not found") } return nil } diff --git a/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/docs.go b/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/docs.go new file mode 100644 index 0000000..002f978 --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/docs.go @@ -0,0 +1 @@ +package subscriber diff --git a/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/subscriber.go b/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/subscriber.go new file mode 100644 index 0000000..d7c5d4a --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/subscriber.go @@ -0,0 +1,492 @@ +package subscriber + +import ( + "encoding/json" + "fmt" + "io" + "os" + "sync" + + "github.com/google/uuid" + "github.com/redhat-cne/sdk-go/pkg/channel" + "github.com/redhat-cne/sdk-go/pkg/types" + + "github.com/pkg/errors" + "github.com/redhat-cne/sdk-go/pkg/pubsub" + "github.com/redhat-cne/sdk-go/pkg/store" + + log "github.com/sirupsen/logrus" + + SubscriberStore "github.com/redhat-cne/sdk-go/pkg/store/subscriber" + "github.com/redhat-cne/sdk-go/pkg/subscriber" +) + +// API ... api methods for publisher subscriber +type API struct { + SubscriberStore *SubscriberStore.Store // each client will have one store + storeFilePath string // subscribers + transportEnabled bool // http is enabled +} + +var instance *API +var once sync.Once +var mu sync.Mutex + +// NewSubscriber create new subscribers connections +func NewSubscriber(clientID uuid.UUID) subscriber.Subscriber { + return subscriber.Subscriber{ + ClientID: clientID, + SubStore: &store.PubSubStore{ + RWMutex: sync.RWMutex{}, + Store: nil, + }, + EndPointURI: nil, + Status: 1, + } +} + +// New creates empty publisher or subscriber +func New() subscriber.Subscriber { + return subscriber.Subscriber{} +} + +// GetAPIInstance get event instance +func GetAPIInstance(storeFilePath string) *API { + once.Do(func() { + instance = &API{ + transportEnabled: true, + SubscriberStore: &SubscriberStore.Store{ + RWMutex: sync.RWMutex{}, + Store: map[uuid.UUID]*subscriber.Subscriber{}, + }, + storeFilePath: storeFilePath, + } + hasDir(storeFilePath) + instance.ReloadStore() + }) + return instance +} + +// ReloadStore reload store if there is any change or refresh is required +func (p *API) ReloadStore() { + // load for file + log.Infof("reloading subscribers from the store %s", p.storeFilePath) + if files, err := loadFileNamesFromDir(p.storeFilePath); err == nil { + for _, f := range files { + if b, err1 := loadFromFile(fmt.Sprintf("%s/%s", p.storeFilePath, f)); err1 == nil { + if len(b) > 0 { + var sub subscriber.Subscriber + var err2 error + if err2 = json.Unmarshal(b, &sub); err2 == nil { + p.SubscriberStore.Set(sub.ClientID, sub) + } else { + log.Errorf("error parsing subscriber %s \n %s", string(b), err2.Error()) + } + } + } + } + } + log.Infof("%d registered clients reloaded", len(p.SubscriberStore.Store)) + for k, v := range p.SubscriberStore.Store { + log.Infof("registered clients %s : %s", k, v.String()) + } +} + +// HasTransportEnabled ... +func (p *API) HasTransportEnabled() bool { + return p.transportEnabled +} + +// DisableTransport ... +func (p *API) DisableTransport() { + p.transportEnabled = false +} + +// EnableTransport ... +func (p *API) EnableTransport() { + p.transportEnabled = true +} + +// ClientCount .. client cound +func (p *API) ClientCount() int { + return len(p.SubscriberStore.Store) +} + +// GetSubFromSubscriptionsStore get data from publisher store +func (p *API) GetSubFromSubscriptionsStore(clientID uuid.UUID, address string) (pubsub.PubSub, error) { + if subscriber, ok := p.HasClient(clientID); ok { + for _, sub := range subscriber.SubStore.Store { + if sub.GetResource() == address { + return pubsub.PubSub{ + ID: sub.ID, + EndPointURI: sub.EndPointURI, + URILocation: sub.URILocation, + Resource: sub.Resource, + }, nil + } + } + } + + return pubsub.PubSub{}, fmt.Errorf("publisher not found for address %s", address) +} + +// HasSubscription check if the subscriptionOne is already exists in the store/cache +func (p *API) HasSubscription(clientID uuid.UUID, address string) (pubsub.PubSub, bool) { + if sub, err := p.GetSubFromSubscriptionsStore(clientID, address); err == nil { + return sub, true + } + return pubsub.PubSub{}, false +} + +// HasClient check if client is already exists in the store/cache +func (p *API) HasClient(clientID uuid.UUID) (*subscriber.Subscriber, bool) { + if subscriber, ok := p.SubscriberStore.Get(clientID); ok { + return &subscriber, true + } + return nil, false +} + +// CreateSubscription create a subscriptionOne and store it in a file and cache +func (p *API) CreateSubscription(clientID uuid.UUID, sub subscriber.Subscriber) (subscriptionClient *subscriber.Subscriber, err error) { + var ok bool + if subscriptionClient, ok = p.HasClient(clientID); !ok { + subscriptionClient = subscriber.New(clientID) + } + subscriptionClient.ResetFailCount() + _ = subscriptionClient.SetEndPointURI(sub.GetEndPointURI()) + subscriptionClient.SetStatus(subscriber.Active) + subscriptionClient.Action = channel.NEW + pubStore := subscriptionClient.GetSubStore() + var hasResource bool + for key, value := range sub.SubStore.Store { + hasResource = false + for _, s := range pubStore.Store { + if s.Resource == value.Resource { + hasResource = true + continue + } + } + if !hasResource { + if key == "" { + key = uuid.New().String() + } + subscriptionClient.SubStore.Set(key, *value) + } + } + p.SubscriberStore.Set(clientID, *subscriptionClient) + // persist the subscriptionOne - + err = writeToFile(*subscriptionClient, fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID))) + if err != nil { + log.Errorf("error writing to a store %v\n", err) + return nil, err + } + log.Infof("subscription persisted into a file %s", fmt.Sprintf("%s/%s - content %s", p.storeFilePath, fmt.Sprintf("%s.json", clientID), subscriptionClient.String())) + // store the publisher + + return subscriptionClient, nil +} + +// GetSubscriptionClient get a clientID by id +func (p *API) GetSubscriptionClient(clientID uuid.UUID) (subscriber.Subscriber, error) { + if subs, ok := p.SubscriberStore.Get(clientID); ok { + return subs, nil + } + return subscriber.Subscriber{}, fmt.Errorf("subscriber data was not found for id %s", clientID) +} + +// GetSubscriptionsFromFile get subscriptions data from the file store +func (p *API) GetSubscriptionsFromFile(clientID uuid.UUID) ([]byte, error) { + b, err := loadFromFile(fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID.String()))) + return b, err +} + +// GetSubscriptions get all subscriptionOne inforamtions +func (p *API) GetSubscriptions(clientID uuid.UUID) (sub map[string]*pubsub.PubSub) { + if subs, ok := p.SubscriberStore.Get(clientID); ok { + sub = subs.SubStore.Store + } + + return +} + +// GetSubscription get subscriptionOne inforamtions +func (p *API) GetSubscription(clientID uuid.UUID, subID string) pubsub.PubSub { + if subs, ok := p.SubscriberStore.Get(clientID); ok { + return subs.Get(subID) + } + return pubsub.PubSub{} +} + +// GetSubscriberURLByResourceAndClientID get subscription information by client id/resource +func (p *API) GetSubscriberURLByResourceAndClientID(clientID uuid.UUID, resource string) (url *string) { + p.SubscriberStore.RLock() + defer p.SubscriberStore.RUnlock() + for _, subs := range p.SubscriberStore.Store { + if subs.ClientID == clientID { + for _, sub := range subs.SubStore.Store { + if sub.GetResource() == resource { + return func(s string) *string { + return &s + }(subs.GetEndPointURI()) + } + } + } + } + return nil +} + +// GetSubscriberURLByResource get subscriptionOne information +func (p *API) GetSubscriberURLByResource(resource string) (urls []string) { + p.SubscriberStore.RLock() + defer p.SubscriberStore.RUnlock() + for _, subs := range p.SubscriberStore.Store { + for _, sub := range subs.SubStore.Store { + if sub.GetResource() == resource { + urls = append(urls, subs.GetEndPointURI()) + } + } + } + return urls +} + +// GetClientIDByResource get subscriptionOne information +func (p *API) GetClientIDByResource(resource string) (clientIDs []uuid.UUID) { + p.SubscriberStore.RLock() + defer p.SubscriberStore.RUnlock() + for _, subs := range p.SubscriberStore.Store { + for _, sub := range subs.SubStore.Store { + if sub.GetResource() == resource { + clientIDs = append(clientIDs, subs.ClientID) + } + } + } + return clientIDs +} + +// GetClientIDBySubID ... +func (p *API) GetClientIDBySubID(subID string) (clientIDs []uuid.UUID) { + p.SubscriberStore.RLock() + defer p.SubscriberStore.RUnlock() + for _, subs := range p.SubscriberStore.Store { + for _, sub := range subs.SubStore.Store { + if sub.GetID() == subID { + clientIDs = append(clientIDs, subs.ClientID) + } + } + } + return clientIDs +} + +// GetClientIDAddressByResource get subscriptionOne information +func (p *API) GetClientIDAddressByResource(resource string) map[uuid.UUID]*types.URI { + clients := map[uuid.UUID]*types.URI{} + p.SubscriberStore.RLock() + defer p.SubscriberStore.RUnlock() + for _, subs := range p.SubscriberStore.Store { + for _, sub := range subs.SubStore.Store { + if sub.GetResource() == resource { + clients[subs.ClientID] = subs.EndPointURI + } + } + } + return clients +} + +// DeleteSubscription delete a subscriptionOne by id +func (p *API) DeleteSubscription(clientID uuid.UUID, subscriptionID string) error { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { // client found + if sub, ok2 := subStore.SubStore.Store[subscriptionID]; ok2 { + err := deleteFromFile(*sub, fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID))) + subStore.SubStore.Delete(subscriptionID) + p.SubscriberStore.Set(clientID, subStore) + return err + } + } + return nil +} + +// DeleteAllSubscriptions delete all subscriptionOne information +func (p *API) DeleteAllSubscriptions(clientID uuid.UUID) error { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { + if err := deleteAllFromFile(fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID))); err != nil { + return err + } + subStore.SubStore = &store.PubSubStore{ + RWMutex: sync.RWMutex{}, + Store: map[string]*pubsub.PubSub{}, + } + p.SubscriberStore.Set(clientID, subStore) + } + return nil +} + +// DeleteClient delete all subscriptionOne information +func (p *API) DeleteClient(clientID uuid.UUID) error { + if _, ok := p.SubscriberStore.Get(clientID); ok { // client found + log.Infof("delete from file %s", + fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID))) + if err := deleteAllFromFile(fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID))); err != nil { + return err + } + p.SubscriberStore.Delete(clientID) + } else { + log.Infof("subscription for client id %s not found", clientID) + } + return nil +} + +// UpdateStatus .. update status +func (p *API) UpdateStatus(clientID uuid.UUID, status subscriber.Status) error { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { + subStore.SetStatus(status) + p.SubscriberStore.Set(clientID, subStore) + // do not write to file , if restarts it will consider all client are active + } else { + return errors.New("failed to update subscriber status") + } + return nil +} + +// IncFailCountToFail .. update fail count +func (p *API) IncFailCountToFail(clientID uuid.UUID) bool { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { + subStore.IncFailCount() + p.SubscriberStore.Set(clientID, subStore) + if subStore.Action == channel.DELETE { + return true + } + } + return false +} + +// FailCountThreshold .. get threshold +func (p *API) FailCountThreshold() int { + return subscriber.SetConnectionToFailAfter +} + +func (p *API) GetFailCount(clientID uuid.UUID) int { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { + return subStore.FailedCount() + } + return 0 +} + +// SubscriberMarkedForDelete ... +func (p *API) SubscriberMarkedForDelete(clientID uuid.UUID) bool { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { + if subStore.Action == channel.DELETE { + return true + } + } + return false +} + +// deleteAllFromFile deletes publisher and subscriptionOne information from the file system +func deleteAllFromFile(filePath string) error { + return os.Remove(filePath) +} + +// DeleteFromFile is used to delete subscriptionOne from the file system +func deleteFromFile(sub pubsub.PubSub, filePath string) error { + var persistedSubClient subscriber.Subscriber + //open file + file, err := os.OpenFile(filePath, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return err + } + defer file.Close() + //read file and unmarshall json file to slice of users + b, err := io.ReadAll(file) + if err != nil { + return err + } + + if len(b) > 0 { + err = json.Unmarshal(b, &persistedSubClient) + if err != nil { + return err + } + } + delete(persistedSubClient.SubStore.Store, sub.ID) + + newBytes, err := json.MarshalIndent(&persistedSubClient, "", " ") + if err != nil { + log.Errorf("error deleting sub %v", err) + return err + } + return os.WriteFile(filePath, newBytes, 0666) +} + +// loadFromFile is used to read subscriptionOne/publisher from the file system +func loadFromFile(filePath string) (b []byte, err error) { + //open file + file, err := os.OpenFile(filePath, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return nil, err + } + defer file.Close() + //read file and unmarshall json file to slice of users + b, err = io.ReadAll(file) + if err != nil { + return nil, err + } + return b, nil +} + +func loadFileNamesFromDir(filePath string) (subFiles []string, err error) { + files, err := os.ReadDir(filePath) + if err != nil { + return subFiles, err + } + for _, file := range files { + if !file.IsDir() { + subFiles = append(subFiles, file.Name()) + } + } + return +} + +// writeToFile writes subscriptionOne data to a file +func writeToFile(subscriberClient subscriber.Subscriber, filePath string) error { + //open file + mu.Lock() + defer mu.Unlock() + file, err := os.OpenFile(filePath, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return err + } + defer file.Close() + //read file and unmarshall json file to slice of users + b, err := io.ReadAll(file) + if err != nil { + return err + } + + var persistedSubClient subscriber.Subscriber + if len(b) > 0 { + err = json.Unmarshal(b, &persistedSubClient) + if err != nil { + return err + } + } else { + persistedSubClient = *subscriber.New(subscriberClient.ClientID) + } // no file found + _ = persistedSubClient.SetEndPointURI(subscriberClient.GetEndPointURI()) + persistedSubClient.SetStatus(subscriber.Active) + for subID, sub := range subscriberClient.SubStore.Store { + persistedSubClient.SubStore.Store[subID] = sub + } + + newBytes, err := json.MarshalIndent(&persistedSubClient, "", " ") + if err != nil { + return err + } + log.Infof("persisting following contents %s to a file %s\n", string(newBytes), filePath) + return os.WriteFile(filePath, newBytes, 0666) +} + +func hasDir(path string) { + if _, err := os.Stat(path); os.IsNotExist(err) { + _ = os.Mkdir(path, 0700) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index e43282a..6090f78 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -48,6 +48,9 @@ github.com/modern-go/concurrent # github.com/modern-go/reflect2 v1.0.2 ## explicit; go 1.12 github.com/modern-go/reflect2 +# github.com/pkg/errors v0.9.1 +## explicit +github.com/pkg/errors # github.com/pmezard/go-difflib v1.0.0 ## explicit github.com/pmezard/go-difflib/difflib @@ -68,19 +71,23 @@ github.com/prometheus/common/model github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/redhat-cne/sdk-go v1.0.1-0.20240614182056-bfc7566a02ac +# github.com/redhat-cne/sdk-go v1.0.1-unpublished => ../sdk-go ## explicit; go 1.22 github.com/redhat-cne/sdk-go/pkg/channel +github.com/redhat-cne/sdk-go/pkg/common github.com/redhat-cne/sdk-go/pkg/event github.com/redhat-cne/sdk-go/pkg/event/ptp github.com/redhat-cne/sdk-go/pkg/event/redfish github.com/redhat-cne/sdk-go/pkg/pubsub github.com/redhat-cne/sdk-go/pkg/store +github.com/redhat-cne/sdk-go/pkg/store/subscriber +github.com/redhat-cne/sdk-go/pkg/subscriber github.com/redhat-cne/sdk-go/pkg/types github.com/redhat-cne/sdk-go/pkg/util/clock github.com/redhat-cne/sdk-go/pkg/util/wait github.com/redhat-cne/sdk-go/v1/event github.com/redhat-cne/sdk-go/v1/pubsub +github.com/redhat-cne/sdk-go/v1/subscriber # github.com/rogpeppe/go-internal v1.10.0 ## explicit; go 1.19 # github.com/sirupsen/logrus v1.8.1