diff --git a/go.mod b/go.mod index 7765270..28cf756 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect diff --git a/v2/routes.go b/v2/routes.go new file mode 100644 index 0000000..b95904b --- /dev/null +++ b/v2/routes.go @@ -0,0 +1,522 @@ +// Copyright 2020 The Cloud Native Events Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package restapi + +import ( + "encoding/json" + "fmt" + "io" + "strings" + "time" + + "github.com/redhat-cne/sdk-go/pkg/types" + + "github.com/redhat-cne/rest-api/pkg/localmetrics" + + cloudevents "github.com/cloudevents/sdk-go/v2" + ce "github.com/cloudevents/sdk-go/v2/event" + "github.com/redhat-cne/sdk-go/pkg/subscriber" + + cne "github.com/redhat-cne/sdk-go/pkg/event" + "github.com/redhat-cne/sdk-go/pkg/pubsub" + + "github.com/redhat-cne/sdk-go/v1/event" + + "github.com/google/uuid" + "github.com/gorilla/mux" + + "github.com/redhat-cne/sdk-go/pkg/channel" + + "log" + "net/http" +) + +// createSubscription create subscription and send it to a channel that is shared by middleware to process +// Creates a new subscription . +// If subscription exists with same resource then existing subscription is returned . +// responses: +// +// 201: repoResp +// 400: badReq +// 204: noContent +func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + var response *http.Response + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + sub := pubsub.PubSub{} + if err = json.Unmarshal(bodyBytes, &sub); err != nil { + respondWithError(w, "marshalling error") + localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) + return + } + endPointURI := sub.GetEndpointURI() + if endPointURI != "" { + response, err = s.HTTPClient.Post(sub.GetEndpointURI(), cloudevents.ApplicationJSON, nil) + if err != nil { + log.Printf("there was error validating endpointurl %v, subscription wont be created", err) + localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) + respondWithError(w, err.Error()) + return + } + defer response.Body.Close() + if response.StatusCode != http.StatusNoContent { + log.Printf("there was an error validating endpointurl %s returned status code %d", sub.GetEndpointURI(), response.StatusCode) + respondWithError(w, "return url validation check failed for create subscription.check endpointURI") + localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) + return + } + } + // check sub.EndpointURI by get + id := uuid.New().String() + sub.SetID(id) + _ = sub.SetURILocation(fmt.Sprintf("http://localhost:%d%s%s/%s", s.port, s.apiPath, "subscriptions", sub.ID)) //nolint:errcheck + + newSub, err := s.pubSubAPI.CreateSubscription(sub) + if err != nil { + log.Printf("error creating subscription %v", err) + localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) + respondWithError(w, err.Error()) + return + } + log.Printf("subscription created successfully.") + + addr := newSub.GetResource() + // create unique clientId for each subscription based on endPointURI + subs := subscriber.New(s.getClientIDFromURI(endPointURI)) + + _ = subs.SetEndPointURI(endPointURI) + + // create a subscriber model + // subs.AddSubscription(newSub) + subs.AddSubscription(newSub) + subs.Action = channel.NEW + + cevent, _ := subs.CreateCloudEvents() + cevent.SetSubject(channel.NEW.String()) + cevent.SetSource(addr) + + out := channel.DataChan{ + Address: addr, + Data: cevent, + Status: channel.NEW, + Type: channel.SUBSCRIBER, + } + + var updatedObj *subscriber.Subscriber + // writes a file .json that has the same content as configMap. + // configMap was created later as a way to persist the data. + if updatedObj, err = s.subscriberAPI.CreateSubscription(subs.ClientID, *subs); err != nil { + log.Printf("failed creating subscription for %s", subs.ClientID.String()) + out.Status = channel.FAILED + } else { + out.Status = channel.SUCCESS + _ = out.Data.SetData(cloudevents.ApplicationJSON, updatedObj) + // TODO: this function is in sdk-go + // localmetrics.UpdateSenderCreatedCount(obj.ClientID.String(), localmetrics.ACTIVE, 1) + + log.Printf("subscription created successfully.") + localmetrics.UpdateSubscriptionCount(localmetrics.ACTIVE, 1) + respondWithJSON(w, http.StatusCreated, newSub) + } + + s.dataOut <- &out +} + +// createPublisher create publisher and send it to a channel that is shared by middleware to process +// Creates a new publisher . +// If publisher exists with same resource then existing publisher is returned . +// responses: +// +// 201: repoResp +// 400: badReq +// 204: noContent +func (s *Server) createPublisher(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + var response *http.Response + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + pub := pubsub.PubSub{} + if err = json.Unmarshal(bodyBytes, &pub); err != nil { + localmetrics.UpdatePublisherCount(localmetrics.FAILCREATE, 1) + respondWithError(w, "marshalling error") + return + } + if pub.GetEndpointURI() != "" { + response, err = s.HTTPClient.Post(pub.GetEndpointURI(), cloudevents.ApplicationJSON, nil) + if err != nil { + log.Printf("there was an error validating the publisher endpointurl %v, publisher won't be created.", err) + localmetrics.UpdatePublisherCount(localmetrics.FAILCREATE, 1) + respondWithError(w, err.Error()) + return + } + defer response.Body.Close() + if response.StatusCode != http.StatusNoContent { + log.Printf("there was an error validating endpointurl %s returned status code %d", pub.GetEndpointURI(), response.StatusCode) + localmetrics.UpdatePublisherCount(localmetrics.FAILCREATE, 1) + respondWithError(w, "return url validation check failed for create publisher,check endpointURI") + return + } + } + + // check pub.EndpointURI by get + pub.SetID(uuid.New().String()) + _ = pub.SetURILocation(fmt.Sprintf("http://localhost:%d%s%s/%s", s.port, s.apiPath, "publishers", pub.ID)) //nolint:errcheck + newPub, err := s.pubSubAPI.CreatePublisher(pub) + if err != nil { + log.Printf("error creating publisher %v", err) + localmetrics.UpdatePublisherCount(localmetrics.FAILCREATE, 1) + respondWithError(w, err.Error()) + return + } + log.Printf("publisher created successfully.") + // go ahead and create QDR to this address + s.sendOut(channel.PUBLISHER, &newPub) + localmetrics.UpdatePublisherCount(localmetrics.ACTIVE, 1) + respondWithJSON(w, http.StatusCreated, newPub) +} + +func (s *Server) sendOut(eType channel.Type, sub *pubsub.PubSub) { + // go ahead and create QDR to this address + s.dataOut <- &channel.DataChan{ + ID: sub.GetID(), + Address: sub.GetResource(), + Data: &ce.Event{}, + Type: eType, + Status: channel.NEW, + } +} + +func (s *Server) sendOutToDelete(eType channel.Type, sub *pubsub.PubSub) { + // go ahead and create QDR to this address + s.dataOut <- &channel.DataChan{ + ID: sub.GetID(), + Address: sub.GetResource(), + Data: &ce.Event{}, + Type: eType, + Status: channel.DELETE, + } +} + +func (s *Server) getSubscriptionByID(w http.ResponseWriter, r *http.Request) { + queries := mux.Vars(r) + subscriptionID, ok := queries["subscriptionid"] + if !ok { + respondWithError(w, "subscription not found") + return + } + sub, err := s.pubSubAPI.GetSubscription(subscriptionID) + if err != nil { + respondWithError(w, "subscription not found") + return + } + respondWithJSON(w, http.StatusOK, sub) +} + +func (s *Server) getPublisherByID(w http.ResponseWriter, r *http.Request) { + queries := mux.Vars(r) + publisherID, ok := queries["publisherid"] + if !ok { + respondWithError(w, "publisher parameter is required") + return + } + pub, err := s.pubSubAPI.GetPublisher(publisherID) + if err != nil { + respondWithError(w, "publisher not found") + return + } + respondWithJSON(w, http.StatusOK, pub) +} +func (s *Server) getSubscriptions(w http.ResponseWriter, _ *http.Request) { + b, err := s.pubSubAPI.GetSubscriptionsFromFile() + if err != nil { + respondWithError(w, "error loading subscriber data") + return + } + respondWithByte(w, http.StatusOK, b) +} + +func (s *Server) getPublishers(w http.ResponseWriter, _ *http.Request) { + b, err := s.pubSubAPI.GetPublishersFromFile() + if err != nil { + respondWithError(w, "error loading publishers data") + return + } + respondWithByte(w, http.StatusOK, b) +} + +func (s *Server) deletePublisher(w http.ResponseWriter, r *http.Request) { + queries := mux.Vars(r) + publisherID, ok := queries["publisherid"] + if !ok { + respondWithError(w, "publisherid param is missing") + return + } + + if err := s.pubSubAPI.DeletePublisher(publisherID); err != nil { + localmetrics.UpdatePublisherCount(localmetrics.FAILDELETE, 1) + respondWithError(w, err.Error()) + return + } + + localmetrics.UpdatePublisherCount(localmetrics.ACTIVE, -1) + respondWithMessage(w, http.StatusOK, "OK") +} + +func (s *Server) deleteSubscription(w http.ResponseWriter, r *http.Request) { + queries := mux.Vars(r) + subscriptionID, ok := queries["subscriptionid"] + if !ok { + respondWithError(w, "subscriptionid param is missing") + return + } + + if err := s.pubSubAPI.DeleteSubscription(subscriptionID); err != nil { + localmetrics.UpdateSubscriptionCount(localmetrics.FAILDELETE, 1) + respondWithError(w, err.Error()) + return + } + + localmetrics.UpdateSubscriptionCount(localmetrics.ACTIVE, -1) + respondWithMessage(w, http.StatusOK, "OK") +} + +func (s *Server) deleteAllSubscriptions(w http.ResponseWriter, _ *http.Request) { + size := len(s.pubSubAPI.GetSubscriptions()) + if err := s.pubSubAPI.DeleteAllSubscriptions(); err != nil { + respondWithError(w, err.Error()) + return + } + //update metrics + if size > 0 { + localmetrics.UpdateSubscriptionCount(localmetrics.ACTIVE, -(size)) + } + // go ahead and create QDR to this address + s.sendOutToDelete(channel.SUBSCRIBER, &pubsub.PubSub{ID: "", Resource: "delete-all-subscriptions"}) + respondWithMessage(w, http.StatusOK, "deleted all subscriptions") +} + +func (s *Server) deleteAllPublishers(w http.ResponseWriter, _ *http.Request) { + size := len(s.pubSubAPI.GetPublishers()) + + if err := s.pubSubAPI.DeleteAllPublishers(); err != nil { + respondWithError(w, err.Error()) + return + } + //update metrics + if size > 0 { + localmetrics.UpdatePublisherCount(localmetrics.ACTIVE, -(size)) + } + respondWithMessage(w, http.StatusOK, "deleted all publishers") +} + +// publishEvent gets cloud native events and converts it to cloud event and publishes to a transport to send +// it to the consumer +func (s *Server) publishEvent(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + respondWithError(w, err.Error()) + return + } + cneEvent := event.CloudNativeEvent() + if err = json.Unmarshal(bodyBytes, &cneEvent); err != nil { + respondWithError(w, err.Error()) + return + } // check if publisher is found + pub, err := s.pubSubAPI.GetPublisher(cneEvent.ID) + if err != nil { + localmetrics.UpdateEventPublishedCount(cneEvent.ID, localmetrics.FAIL, 1) + respondWithError(w, fmt.Sprintf("no publisher data for id %s found to publish event for", cneEvent.ID)) + return + } + ceEvent, err := cneEvent.NewCloudEvent(&pub) + if err != nil { + localmetrics.UpdateEventPublishedCount(pub.Resource, localmetrics.FAIL, 1) + respondWithError(w, err.Error()) + } else { + s.dataOut <- &channel.DataChan{ + Type: channel.EVENT, + Data: ceEvent, + Address: pub.GetResource(), + } + localmetrics.UpdateEventPublishedCount(pub.Resource, localmetrics.SUCCESS, 1) + respondWithMessage(w, http.StatusAccepted, "Event sent") + } +} + +// getCurrentState get current status of the events that are subscribed to +func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) { + queries := mux.Vars(r) + resourceAddress, ok := queries["resourceAddress"] + if !ok { + respondWithError(w, "resourceAddress parameter not found") + return + } + + log.Printf("DZK num of subscriptions=%d, num of publishers=%d", len(s.pubSubAPI.GetSubscriptions()), len(s.pubSubAPI.GetPublishers())) + + //identify publisher or subscriber is asking for status + var sub *pubsub.PubSub + if len(s.pubSubAPI.GetSubscriptions()) > 0 { + for _, subscriptions := range s.pubSubAPI.GetSubscriptions() { + if strings.Contains(subscriptions.GetResource(), resourceAddress) { + sub = subscriptions + log.Printf("DZK found subscription %v", sub) + break + } + } + } else if len(s.pubSubAPI.GetPublishers()) > 0 { + for _, publishers := range s.pubSubAPI.GetPublishers() { + if strings.Contains(publishers.GetResource(), resourceAddress) { + sub = publishers + log.Printf("DZK found publisher %v", sub) + break + } + } + } else { + respondWithError(w, "subscription not found") + return + } + + if sub == nil { + respondWithError(w, "subscription not found") + return + } + + if resourceAddress == "" { + _ = json.NewEncoder(w).Encode(map[string]string{"message": "validation failed, resource is empty"}) + } + + if !strings.HasPrefix(resourceAddress, "/") { + resourceAddress = fmt.Sprintf("/%s", resourceAddress) + } + // this is placeholder not sending back to report + out := channel.DataChan{ + Address: resourceAddress, + // ClientID is not used + ClientID: uuid.New(), + Status: channel.NEW, + Type: channel.STATUS, // could be new event of new subscriber (sender) + } + + e, _ := out.CreateCloudEvents(CURRENTSTATE) + e.SetSource(resourceAddress) + // statusReceiveOverrideFn must return value for + if s.statusReceiveOverrideFn != nil { + if statusErr := s.statusReceiveOverrideFn(*e, &out); statusErr != nil { + respondWithError(w, statusErr.Error()) + } else if out.Data != nil { + respondWithJSON(w, http.StatusOK, *out.Data) + } else { + respondWithError(w, "event not found") + } + } else { + respondWithError(w, "onReceive function not defined") + } +} + +// pingForSubscribedEventStatus sends ping to the listening address in the producer to fire all status as events +func (s *Server) pingForSubscribedEventStatus(w http.ResponseWriter, r *http.Request) { + queries := mux.Vars(r) + subscriptionID, ok := queries["subscriptionid"] + if !ok { + respondWithError(w, "subscription parameter not found") + return + } + sub, err := s.pubSubAPI.GetSubscription(subscriptionID) + if err != nil { + respondWithError(w, "subscription not found") + return + } + cneEvent := event.CloudNativeEvent() + cneEvent.SetID(sub.ID) + cneEvent.Type = "status_check" + cneEvent.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time) + cneEvent.SetDataContentType(cloudevents.ApplicationJSON) + cneEvent.SetData(cne.Data{ + + Version: "v1", + }) + ceEvent, err := cneEvent.NewCloudEvent(&sub) + + if err != nil { + respondWithError(w, err.Error()) + } else { + s.dataOut <- &channel.DataChan{ + Type: channel.STATUS, + StatusChan: nil, + Data: ceEvent, + Address: fmt.Sprintf("%s/%s", sub.GetResource(), "status"), + } + respondWithMessage(w, http.StatusAccepted, "ping sent") + } +} + +// logEvent gets cloud native events and converts it to cloud native event and writes to log +func (s *Server) logEvent(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + respondWithError(w, err.Error()) + return + } + cneEvent := event.CloudNativeEvent() + if err := json.Unmarshal(bodyBytes, &cneEvent); err != nil { + respondWithError(w, err.Error()) + return + } // check if publisher is found + log.Printf("event received %v", cneEvent) + respondWithMessage(w, http.StatusAccepted, "Event published to log") +} + +func (s *Server) getClientIDFromURI(uri string) uuid.UUID { + return uuid.NewMD5(uuid.NameSpaceURL, []byte(uri)) +} + +func dummy(w http.ResponseWriter, _ *http.Request) { + respondWithMessage(w, http.StatusNoContent, "dummy test") +} + +func respondWithError(w http.ResponseWriter, message string) { + respondWithJSON(w, http.StatusBadRequest, map[string]string{"error": message}) +} + +func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) { + if response, err := json.Marshal(payload); err == nil { + w.Header().Set("Content-Type", cloudevents.ApplicationJSON) + w.WriteHeader(code) + w.Write(response) //nolint:errcheck + } else { + respondWithMessage(w, http.StatusBadRequest, "error parsing event data") + } +} + +func respondWithMessage(w http.ResponseWriter, code int, message string) { + w.Header().Set("Content-Type", cloudevents.ApplicationJSON) + respondWithJSON(w, code, map[string]string{"status": message}) +} + +func respondWithByte(w http.ResponseWriter, code int, message []byte) { + w.Header().Set("Content-Type", cloudevents.ApplicationJSON) + w.WriteHeader(code) + w.Write(message) //nolint:errcheck +} diff --git a/v2/server.go b/v2/server.go new file mode 100644 index 0000000..2951fb1 --- /dev/null +++ b/v2/server.go @@ -0,0 +1,371 @@ +// Package restapi Pub/Sub Rest API. +// +// Rest API spec . +// +// Terms Of Service: +// +// Schemes: http, https +// Host: localhost:8089 +// basePath: /api/ocloudNotifications/v1 +// Version: 1.0.0 +// Contact: Aneesh Puttur +// +// Consumes: +// - application/json +// +// Produces: +// - application/json +// +// swagger:meta +package restapi + +import ( + "fmt" + + "github.com/redhat-cne/sdk-go/pkg/util/wait" + + "sync" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/gorilla/mux" + "github.com/redhat-cne/sdk-go/pkg/channel" + "github.com/redhat-cne/sdk-go/pkg/event" + "github.com/redhat-cne/sdk-go/pkg/pubsub" + "github.com/redhat-cne/sdk-go/pkg/types" + pubsubv1 "github.com/redhat-cne/sdk-go/v1/pubsub" + subscriberApi "github.com/redhat-cne/sdk-go/v1/subscriber" + + "io" + "net/http" + "strings" + "time" + + log "github.com/sirupsen/logrus" +) + +var once sync.Once + +// ServerInstance ... is singleton instance +var ServerInstance *Server +var healthCheckPause = 2 * time.Second + +type serverStatus int + +const HTTPReadHeaderTimeout = 2 * time.Second + +const ( + starting = iota + started + notReady + failed + CURRENTSTATE = "CurrentState" +) + +// Server defines rest routes server object +type Server struct { + port int + apiPath string + //use dataOut chanel to write to configMap + dataOut chan<- *channel.DataChan + closeCh <-chan struct{} + HTTPClient *http.Client + httpServer *http.Server + pubSubAPI *pubsubv1.API + subscriberAPI *subscriberApi.API + status serverStatus + statusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error +} + +// publisher/subscription data model +// swagger:response pubSubResp +type swaggPubSubRes struct { //nolint:deadcode,unused + // in:body + Body pubsub.PubSub +} + +// PubSub request model +// swagger:response eventResp +type swaggPubSubEventRes struct { //nolint:deadcode,unused + // in:body + Body event.Event +} + +// Error Bad Request +// swagger:response badReq +type swaggReqBadRequest struct { //nolint:deadcode,unused + // in:body + Body struct { + // HTTP status code 400 - Bad Request + Code int `json:"code" example:"400"` + } +} + +// Error Not Found +// swagger:response notFoundReq +type swaggReqNotFound struct { //nolint:deadcode,unused + // in:body + Body struct { + // HTTP status code 404 - Not Found + Code int `json:"code" example:"404"` + } +} + +// Accepted +// swagger:response acceptedReq +type swaggReqAccepted struct { //nolint:deadcode,unused + // in:body + Body struct { + // HTTP status code 202 - Accepted + Code int `json:"code" example:"202"` + } +} + +// InitServer is used to supply configurations for rest routes server +func InitServer(port int, apiPath, storePath string, + dataOut chan<- *channel.DataChan, closeCh <-chan struct{}, + onStatusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error) *Server { + once.Do(func() { + ServerInstance = &Server{ + port: port, + apiPath: apiPath, + dataOut: dataOut, + closeCh: closeCh, + status: notReady, + HTTPClient: &http.Client{ + Transport: &http.Transport{ + MaxIdleConnsPerHost: 20, + }, + Timeout: 10 * time.Second, + }, + pubSubAPI: pubsubv1.GetAPIInstance(storePath), + subscriberAPI: subscriberApi.GetAPIInstance(storePath), + statusReceiveOverrideFn: onStatusReceiveOverrideFn, + } + }) + // singleton + return ServerInstance +} + +// EndPointHealthChk checks for rest service health +func (s *Server) EndPointHealthChk() (err error) { + log.Info("checking for rest service health\n") + for i := 0; i <= 5; i++ { + if !s.Ready() { + time.Sleep(healthCheckPause) + log.Printf("server status %t", s.Ready()) + continue + } + + log.Debugf("health check %s%s ", s.GetHostPath(), "health") + response, errResp := http.Get(fmt.Sprintf("%s%s", s.GetHostPath(), "health")) + if errResp != nil { + log.Errorf("try %d, return health check of the rest service for error %v", i, errResp) + time.Sleep(healthCheckPause) + err = errResp + continue + } + if response != nil && response.StatusCode == http.StatusOK { + response.Body.Close() + log.Infof("rest service returned healthy status") + time.Sleep(healthCheckPause) + err = nil + return + } + response.Body.Close() + } + if err != nil { + err = fmt.Errorf("error connecting to rest api %s", err.Error()) + } + return +} + +// Port port id +func (s *Server) Port() int { + return s.port +} + +// Ready gives the status of the server +func (s *Server) Ready() bool { + return s.status == started +} + +// GetHostPath returns hostpath +func (s *Server) GetHostPath() *types.URI { + return types.ParseURI(fmt.Sprintf("http://localhost:%d%s", s.port, s.apiPath)) +} + +// Start will start res routes service +func (s *Server) Start() { + if s.status == started || s.status == starting { + log.Infof("Server is already running at port %d", s.port) + return + } + s.status = starting + r := mux.NewRouter() + + api := r.PathPrefix(s.apiPath).Subrouter() + + // createSubscription create subscription and send it to a channel that is shared by middleware to process + // swagger:operation POST /subscriptions subscription createSubscription + // --- + // summary: Creates a new subscription. + // description: If subscription creation is success(or if already exists), subscription will be returned with Created (201). + // parameters: + // - name: subscription + // description: subscription to add to the list of subscriptions + // in: body + // schema: + // "$ref": "#/definitions/PubSub" + // responses: + // "201": + // "$ref": "#/responses/pubSubResp" + // "400": + // "$ref": "#/responses/badReq" + api.HandleFunc("/subscriptions", s.createSubscription).Methods(http.MethodPost) + + //createPublisher create publisher and send it to a channel that is shared by middleware to process + // swagger:operation POST /publishers/ publishers createPublisher + // --- + // summary: Creates a new publisher. + // description: If publisher creation is success(or if already exists), publisher will be returned with Created (201). + // parameters: + // - name: publisher + // description: publisher to add to the list of publishers + // in: body + // schema: + // "$ref": "#/definitions/PubSub" + // responses: + // "201": + // "$ref": "#/responses/pubSubResp" + // "400": + // "$ref": "#/responses/badReq" + api.HandleFunc("/publishers", s.createPublisher).Methods(http.MethodPost) + /* + this method a list of subscription object(s) and their associated properties + 200 Returns the subscription resources and their associated properties that already exist. + See note below. + 404 Subscription resources are not available (not created). + */ + api.HandleFunc("/subscriptions", s.getSubscriptions).Methods(http.MethodGet) + //publishers create publisher and send it to a channel that is shared by middleware to process + // swagger:operation GET /publishers/ publishers getPublishers + // --- + // summary: Get publishers. + // description: If publisher creation is success(or if already exists), publisher will be returned with Created (201). + // parameters: + // responses: + // "200": + // "$ref": "#/responses/publishers" + // "404": + // "$ref": "#/responses/notFound" + api.HandleFunc("/publishers", s.getPublishers).Methods(http.MethodGet) + // 200 and 404 + api.HandleFunc("/subscriptions/{subscriptionid}", s.getSubscriptionByID).Methods(http.MethodGet) + api.HandleFunc("/publishers/{publisherid}", s.getPublisherByID).Methods(http.MethodGet) + // 204 on success or 404 + api.HandleFunc("/subscriptions/{subscriptionid}", s.deleteSubscription).Methods(http.MethodDelete) + api.HandleFunc("/publishers/{publisherid}", s.deletePublisher).Methods(http.MethodDelete) + + api.HandleFunc("/subscriptions", s.deleteAllSubscriptions).Methods(http.MethodDelete) + api.HandleFunc("/publishers", s.deleteAllPublishers).Methods(http.MethodDelete) + + //pingForSubscribedEventStatus pings for event status if the publisher has capability to push event on demand + // swagger:operation POST /subscriptions/status subscriptions pingForSubscribedEventStatus + // --- + // summary: Get status of publishing events. + // description: If publisher status ping is success, call will be returned with status accepted. + // parameters: + // - name: subscriptionid + // description: subscription id to check status for + // responses: + // "201": + // "$ref": "#/responses/pubSubResp" + // "400": + // "$ref": "#/responses/badReq" + api.HandleFunc("/subscriptions/status/{subscriptionid}", s.pingForSubscribedEventStatus).Methods(http.MethodPut) + + api.HandleFunc("/{resourceAddress:.*}/CurrentState", s.getCurrentState).Methods(http.MethodGet) + + api.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) { + io.WriteString(w, "OK") //nolint:errcheck + }).Methods(http.MethodGet) + + api.HandleFunc("/dummy", dummy).Methods(http.MethodPost) + api.HandleFunc("/log", s.logEvent).Methods(http.MethodPost) + + //publishEvent create event and send it to a channel that is shared by middleware to process + // swagger:operation POST /create/event/ event publishEvent + // --- + // summary: Creates a new event. + // description: If publisher is present for the event, then event creation is success and be returned with Accepted (202). + // parameters: + // - name: event + // description: event along with publisher id + // in: body + // schema: + // "$ref": "#/definitions/Event" + // responses: + // "202": + // "$ref": "#/responses/acceptedReq" + // "400": + // "$ref": "#/responses/badReq" + api.HandleFunc("/create/event", s.publishEvent).Methods(http.MethodPost) + + err := r.Walk(func(route *mux.Route, _ *mux.Router, _ []*mux.Route) error { + pathTemplate, err := route.GetPathTemplate() + if err == nil { + log.Println("ROUTE:", pathTemplate) + } + pathRegexp, err := route.GetPathRegexp() + if err == nil { + log.Println("Path regexp:", pathRegexp) + } + queriesTemplates, err := route.GetQueriesTemplates() + if err == nil { + log.Println("Queries templates:", strings.Join(queriesTemplates, ",")) + } + queriesRegexps, err := route.GetQueriesRegexp() + if err == nil { + log.Println("Queries regexps:", strings.Join(queriesRegexps, ",")) + } + methods, err := route.GetMethods() + if err == nil { + log.Println("Methods:", strings.Join(methods, ",")) + } + log.Println() + return nil + }) + + if err != nil { + log.Println(err) + } + api.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, r) + }) + + log.Infof("starting v2 rest api server at port %d, endpoint %s", s.port, s.apiPath) + go wait.Until(func() { + s.status = started + s.httpServer = &http.Server{ + ReadHeaderTimeout: HTTPReadHeaderTimeout, + Addr: fmt.Sprintf(":%d", s.port), + Handler: api, + } + err := s.httpServer.ListenAndServe() + if err != nil { + log.Errorf("restarting due to error with api server %s\n", err.Error()) + s.status = failed + } + }, 1*time.Second, s.closeCh) +} + +// Shutdown ... shutdown rest service api, but it will not close until close chan is called +func (s *Server) Shutdown() { + log.Warnf("trying to shutdown rest api sever, please use close channel to shutdown ") + s.httpServer.Close() +} + +// SetOnStatusReceiveOverrideFn ... sets receiver function +func (s *Server) SetOnStatusReceiveOverrideFn(fn func(e cloudevents.Event, dataChan *channel.DataChan) error) { + s.statusReceiveOverrideFn = fn +} diff --git a/v2/server_test.go b/v2/server_test.go new file mode 100644 index 0000000..f8a9ed6 --- /dev/null +++ b/v2/server_test.go @@ -0,0 +1,596 @@ +// Copyright 2020 The Cloud Native Events Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package restapi_test + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "sync" + "testing" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + types2 "github.com/cloudevents/sdk-go/v2/types" + "github.com/google/uuid" + + restapi "github.com/redhat-cne/rest-api" + "github.com/redhat-cne/sdk-go/pkg/channel" + "github.com/redhat-cne/sdk-go/pkg/event" + "github.com/redhat-cne/sdk-go/pkg/event/ptp" + "github.com/redhat-cne/sdk-go/pkg/pubsub" + "github.com/redhat-cne/sdk-go/pkg/types" + v1event "github.com/redhat-cne/sdk-go/v1/event" + api "github.com/redhat-cne/sdk-go/v1/pubsub" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" +) + +var ( + server *restapi.Server + + eventOutCh chan *channel.DataChan + closeCh chan struct{} + wg sync.WaitGroup + port = 8989 + apPath = "/routes/cne/v1/" + resource = "test/test" + resourceNoneSubscribed = "test/nonesubscribed" + storePath = "." + ObjSub pubsub.PubSub + ObjPub pubsub.PubSub +) + +func init() { + eventOutCh = make(chan *channel.DataChan, 10) + closeCh = make(chan struct{}) +} + +func TestMain(m *testing.M) { + server = restapi.InitServer(port, apPath, storePath, eventOutCh, closeCh) + //start http server + server.Start() + + wg.Add(1) + go func() { + for d := range eventOutCh { + if d.Type == channel.STATUS && d.StatusChan != nil { + clientID, _ := uuid.Parse("123e4567-e89b-12d3-a456-426614174001") + cneEvent := v1event.CloudNativeEvent() + cneEvent.SetID(ObjPub.ID) + cneEvent.Type = string(ptp.PtpStateChange) + cneEvent.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time) + cneEvent.SetDataContentType(event.ApplicationJSON) + data := event.Data{ + Version: "event", + Values: []event.DataValue{{ + Resource: "test", + DataType: event.NOTIFICATION, + ValueType: event.ENUMERATION, + Value: ptp.ACQUIRING_SYNC, + }, + }, + } + data.SetVersion("v1") //nolint:errcheck + cneEvent.SetData(data) + e := cloudevents.Event{ + Context: cloudevents.EventContextV1{ + Type: string(ptp.PtpStateChange), + Source: cloudevents.URIRef{URL: url.URL{Scheme: "http", Host: "example.com", Path: "/source"}}, + ID: "status event", + Time: &cloudevents.Timestamp{Time: time.Date(2020, 03, 21, 12, 34, 56, 780000000, time.UTC)}, + DataSchema: &types2.URI{URL: url.URL{Scheme: "http", Host: "example.com", Path: "/schema"}}, + Subject: func(s string) *string { return &s }("topic"), + }.AsV1(), + } + _ = e.SetData(cloudevents.ApplicationJSON, cneEvent) + func() { + defer func() { + if err := recover(); err != nil { + log.Errorf("error on close channel") + } + }() + if d.Address == resourceNoneSubscribed { + d.StatusChan <- &channel.StatusChan{ + ID: "123", + ClientID: clientID, + Data: &e, + StatusCode: http.StatusBadRequest, + Message: []byte("Client not subscribed"), + } + } else { + d.StatusChan <- &channel.StatusChan{ + ID: "123", + ClientID: clientID, + Data: &e, + StatusCode: http.StatusOK, + Message: []byte("ok"), + } + } + }() + } + log.Infof("incoming data %#v", d) + } + }() + time.Sleep(2 * time.Second) + port = server.Port() + exitVal := m.Run() + os.Exit(exitVal) +} + +func TestServer_Health(t *testing.T) { + // CHECK URL IS UP + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "health"), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +func TestServer_CreateSubscription(t *testing.T) { + // create subscription + sub := api.NewPubSub( + &types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}}, + resource) + + data, err := json.Marshal(&sub) + assert.Nil(t, err) + assert.NotNil(t, data) + /// create new subscription + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "subscriptions"), bytes.NewBuffer(data)) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + assert.Nil(t, err) + bodyString := string(bodyBytes) + log.Print(bodyString) + assert.Equal(t, http.StatusCreated, resp.StatusCode) + err = json.Unmarshal(bodyBytes, &ObjSub) + assert.Nil(t, err) + assert.NotEmpty(t, ObjSub.ID) + assert.NotEmpty(t, ObjSub.URILocation) + assert.NotEmpty(t, ObjSub.EndPointURI) + assert.NotEmpty(t, ObjSub.Resource) + assert.Equal(t, sub.Resource, ObjSub.Resource) + log.Infof("Subscription:\n%s", ObjSub.String()) +} + +func TestServer_GetSubscription(t *testing.T) { + // Get Just Created Subscription + req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, "subscriptions", ObjSub.ID), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + assert.Nil(t, err) + rSub := api.New() + err = json.Unmarshal(bodyBytes, &rSub) + if e, ok := err.(*json.SyntaxError); ok { + log.Infof("syntax error at byte offset %d", e.Offset) + } + assert.Nil(t, err) + assert.Equal(t, rSub.ID, ObjSub.ID) +} + +func TestServer_CreatePublisher(t *testing.T) { + pub := pubsub.PubSub{ + ID: "", + EndPointURI: &types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}}, + Resource: resource, + } + pubData, err := json.Marshal(&pub) + assert.Nil(t, err) + assert.NotNil(t, pubData) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "publishers"), bytes.NewBuffer(pubData)) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + pubBodyBytes, err := io.ReadAll(resp.Body) + assert.Nil(t, err) + err = json.Unmarshal(pubBodyBytes, &ObjPub) + assert.Nil(t, err) + assert.Equal(t, http.StatusCreated, resp.StatusCode) + assert.NotEmpty(t, ObjPub.ID) + assert.NotEmpty(t, ObjPub.URILocation) + assert.NotEmpty(t, ObjPub.EndPointURI) + assert.NotEmpty(t, ObjPub.Resource) + assert.Equal(t, pub.Resource, ObjPub.Resource) + log.Infof("publisher \n%s", ObjPub.String()) +} + +func TestServer_GetPublisher(t *testing.T) { + // Get Just created Publisher + req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, "publishers", ObjPub.ID), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + pubBodyBytes, err := io.ReadAll(resp.Body) + assert.Nil(t, err) + var rPub pubsub.PubSub + log.Printf("the data %s", string(pubBodyBytes)) + err = json.Unmarshal(pubBodyBytes, &rPub) + assert.Equal(t, resp.StatusCode, http.StatusOK) + assert.Nil(t, err) + assert.Equal(t, ObjPub.ID, rPub.ID) +} + +func TestServer_ListSubscriptions(t *testing.T) { + // Get All Subscriptions + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "subscriptions"), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() // Close body only if response non-nil + bodyBytes, err := io.ReadAll(resp.Body) + assert.Nil(t, err) + var subList []pubsub.PubSub + log.Printf("TestServer_ListSubscriptions :%s\n", string(bodyBytes)) + err = json.Unmarshal(bodyBytes, &subList) + assert.Nil(t, err) + assert.Greater(t, len(subList), 0) +} + +func TestServer_ListPublishers(t *testing.T) { + // Get All Publisher + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "publishers"), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + pubBodyBytes, err := io.ReadAll(resp.Body) + assert.Nil(t, err) + var pubList []pubsub.PubSub + err = json.Unmarshal(pubBodyBytes, &pubList) + assert.Nil(t, err) + defer resp.Body.Close() + assert.Greater(t, len(pubList), 0) +} + +func TestServer_GetCurrentState_withSubscription(t *testing.T) { + // create subscription + sub := api.NewPubSub( + &types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}}, + resource) + + data, err := json.Marshal(&sub) + assert.Nil(t, err) + assert.NotNil(t, data) + /// create new subscription + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "subscriptions"), bytes.NewBuffer(data)) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + assert.Nil(t, err) + bodyString := string(bodyBytes) + log.Print(bodyString) + assert.Equal(t, http.StatusCreated, resp.StatusCode) + err = json.Unmarshal(bodyBytes, &ObjSub) + assert.Nil(t, err) + assert.NotEmpty(t, ObjSub.ID) + assert.NotEmpty(t, ObjSub.URILocation) + assert.NotEmpty(t, ObjSub.EndPointURI) + assert.NotEmpty(t, ObjSub.Resource) + assert.Equal(t, sub.Resource, ObjSub.Resource) + log.Infof("Subscription:\n%s", ObjSub.String()) + + // try getting event + time.Sleep(2 * time.Second) + req, err = http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, ObjSub.Resource, "CurrentState"), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err = server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + s, err2 := io.ReadAll(resp.Body) + assert.Nil(t, err2) + log.Infof("tedt %s ", string(s)) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + // Delete All Subscriptions + req, err = http.NewRequestWithContext(ctx, "DELETE", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "subscriptions"), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err = server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() +} + +func TestServer_GetCurrentState_withoutSubscription(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // try getting event + time.Sleep(2 * time.Second) + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, resourceNoneSubscribed, "CurrentState"), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + s, err2 := io.ReadAll(resp.Body) + assert.Nil(t, err2) + log.Infof("tedt %s ", string(s)) + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) +} + +func TestServer_TestPingStatusStatusCode(t *testing.T) { + req, err := http.NewRequest("PUT", fmt.Sprintf("http://localhost:%d%s%s%s", port, apPath, "subscriptions/status/", ObjSub.ID), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) +} + +func TestServer_DeleteSubscription(t *testing.T) { + // Delete All Subscriptions + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "DELETE", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "subscriptions"), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() +} + +func TestServer_DeletePublisher(t *testing.T) { + // Delete All Publisher + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "DELETE", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "publishers"), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() +} + +func TestServer_GetNonExistingPublisher(t *testing.T) { + // Get Just created Publisher + req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, "publishers", ObjPub.ID), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + assert.Equal(t, resp.StatusCode, http.StatusBadRequest) + assert.Nil(t, err) +} + +func TestServer_GetNonExistingSubscription(t *testing.T) { + // Get Just Created Subscription + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, "subscriptions", ObjSub.ID), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) +} + +func TestServer_TestDummyStatusCode(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "dummy"), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + assert.Equal(t, http.StatusNoContent, resp.StatusCode) +} + +func TestServer_KillAndRecover(t *testing.T) { + server.Shutdown() + time.Sleep(2 * time.Second) + // CHECK URL IS UP + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "health"), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +// New get new rest client +func NewRestClient() *Rest { + return &Rest{ + client: http.Client{ + Timeout: 1 * time.Second, + }, + } +} + +func publishEvent(e event.Event) { + //create publisher + url := &types.URI{URL: url.URL{Scheme: "http", + Host: fmt.Sprintf("localhost:%d", port), + Path: fmt.Sprintf("%s%s", apPath, "create/event")}} + rc := NewRestClient() + err := rc.PostEvent(url, e) + if err != nil { + log.Errorf("error publishing events %v to url %s", err, url.String()) + } else { + log.Debugf("published event %s", e.ID) + } +} + +func Test_MultiplePost(t *testing.T) { + pub := pubsub.PubSub{ + ID: "", + EndPointURI: &types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}}, + Resource: resource, + } + pubData, err := json.Marshal(&pub) + assert.Nil(t, err) + assert.NotNil(t, pubData) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "publishers"), bytes.NewBuffer(pubData)) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer func() { + if resp != nil { + resp.Body.Close() + } + }() + pubBodyBytes, err := io.ReadAll(resp.Body) + assert.Nil(t, err) + err = json.Unmarshal(pubBodyBytes, &ObjPub) + assert.Nil(t, err) + assert.Equal(t, http.StatusCreated, resp.StatusCode) + assert.NotEmpty(t, ObjPub.ID) + assert.NotEmpty(t, ObjPub.URILocation) + assert.NotEmpty(t, ObjPub.EndPointURI) + assert.NotEmpty(t, ObjPub.Resource) + assert.Equal(t, pub.Resource, ObjPub.Resource) + log.Infof("publisher \n%s", ObjPub.String()) + + cneEvent := v1event.CloudNativeEvent() + cneEvent.SetID(ObjPub.ID) + cneEvent.Type = string(ptp.PtpStateChange) + cneEvent.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time) + cneEvent.SetDataContentType(event.ApplicationJSON) + data := event.Data{ + Version: "event", + Values: []event.DataValue{{ + Resource: "test", + DataType: event.NOTIFICATION, + ValueType: event.ENUMERATION, + Value: ptp.ACQUIRING_SYNC, + }, + }, + } + data.SetVersion("v1") //nolint:errcheck + cneEvent.SetData(data) + for i := 0; i < 5; i++ { + go publishEvent(cneEvent) + } + time.Sleep(2 * time.Second) +} + +func TestServer_End(*testing.T) { + close(eventOutCh) + close(closeCh) +} + +// Rest client to make http request +type Rest struct { + client http.Client +} + +// Post post with data +func (r *Rest) Post(url *types.URI, data []byte) int { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + request, err := http.NewRequestWithContext(ctx, "POST", url.String(), bytes.NewBuffer(data)) + if err != nil { + log.Errorf("error creating post request %v", err) + return http.StatusBadRequest + } + request.Header.Set("content-type", "application/json") + response, err := r.client.Do(request) + if err != nil { + log.Errorf("error in post response %v", err) + return http.StatusBadRequest + } + if response.Body != nil { + defer response.Body.Close() + // read any content and print + body, readErr := io.ReadAll(response.Body) + if readErr == nil && len(body) > 0 { + log.Debugf("%s return response %s\n", url.String(), string(body)) + } + } + return response.StatusCode +} + +// New get new rest client +func New() *Rest { + return &Rest{ + client: http.Client{ + Timeout: 1 * time.Second, + }, + } +} + +// PostEvent post an event to the give url and check for error +func (r *Rest) PostEvent(url *types.URI, e event.Event) error { + b, err := json.Marshal(e) + if err != nil { + log.Errorf("error marshalling event %v", e) + return err + } + if status := r.Post(url, b); status == http.StatusBadRequest { + return fmt.Errorf("post returned status %d", status) + } + return nil +} diff --git a/v2/swagger.json b/v2/swagger.json new file mode 100644 index 0000000..e4337c9 --- /dev/null +++ b/v2/swagger.json @@ -0,0 +1,366 @@ +{ + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "schemes": [ + "http", + "https" + ], + "swagger": "2.0", + "info": { + "description": "Rest API spec .", + "title": "Pub/Sub Rest API.", + "contact": { + "name": "Aneesh Puttur", + "email": "aputtur@redhat.com" + }, + "version": "1.0.0" + }, + "host": "localhost:8089", + "basePath": "/api/ocloudNotifications/v1", + "paths": { + "/create/event/": { + "post": { + "description": "If publisher is present for the event, then event creation is success and be returned with Accepted (202).", + "tags": [ + "event" + ], + "summary": "Creates a new event.", + "operationId": "publishEvent", + "parameters": [ + { + "description": "event along with publisher id", + "name": "event", + "in": "body", + "schema": { + "$ref": "#/definitions/Event" + } + } + ], + "responses": { + "202": { + "$ref": "#/responses/acceptedReq" + }, + "400": { + "$ref": "#/responses/badReq" + } + } + } + }, + "/publishers/": { + "get": { + "description": "If publisher creation is success(or if already exists), publisher will be returned with Created (201).", + "tags": [ + "publishers" + ], + "summary": "Get publishers.", + "operationId": "getPublishers", + "responses": { + "200": { + "$ref": "#/responses/publishers" + }, + "404": { + "$ref": "#/responses/notFound" + } + } + }, + "post": { + "description": "If publisher creation is success(or if already exists), publisher will be returned with Created (201).", + "tags": [ + "publishers" + ], + "summary": "Creates a new publisher.", + "operationId": "createPublisher", + "parameters": [ + { + "description": "publisher to add to the list of publishers", + "name": "publisher", + "in": "body", + "schema": { + "$ref": "#/definitions/PubSub" + } + } + ], + "responses": { + "201": { + "$ref": "#/responses/pubSubResp" + }, + "400": { + "$ref": "#/responses/badReq" + } + } + } + }, + "/subscriptions": { + "post": { + "description": "If subscription creation is success(or if already exists), subscription will be returned with Created (201).", + "tags": [ + "subscription" + ], + "summary": "Creates a new subscription.", + "operationId": "createSubscription", + "parameters": [ + { + "description": "subscription to add to the list of subscriptions", + "name": "subscription", + "in": "body", + "schema": { + "$ref": "#/definitions/PubSub" + } + } + ], + "responses": { + "201": { + "$ref": "#/responses/pubSubResp" + }, + "400": { + "$ref": "#/responses/badReq" + } + } + } + }, + "/subscriptions/status": { + "post": { + "description": "If publisher status ping is success, call will be returned with status accepted.", + "tags": [ + "subscriptions" + ], + "summary": "Get status of publishing events.", + "operationId": "pingForSubscribedEventStatus", + "parameters": [ + { + "description": "subscription id to check status for", + "name": "subscriptionid" + } + ], + "responses": { + "201": { + "$ref": "#/responses/pubSubResp" + }, + "400": { + "$ref": "#/responses/badReq" + } + } + } + } + }, + "definitions": { + "Data": { + "description": "{\n\"version\": \"v1.0\",\n\"values\": [{\n\"resource\": \"/sync/sync-status/sync-state\",\n\"dataType\": \"notification\",\n\"valueType\": \"enumeration\",\n\"value\": \"ACQUIRING-SYNC\"\n}, {\n\"resource\": \"/sync/sync-status/sync-state\",\n\"dataType\": \"metric\",\n\"valueType\": \"decimal64.3\",\n\"value\": 100.3\n}, {\n\"resource\": \"/redfish/v1/Systems\",\n\"dataType\": \"notification\",\n\"valueType\": \"redfish-event\",\n\"value\": {\n\"@odata.context\": \"/redfish/v1/$metadata#Event.Event\",\n\"@odata.type\": \"#Event.v1_3_0.Event\",\n\"Context\": \"any string is valid\",\n\"Events\": [{\"EventId\": \"2162\", \"MemberId\": \"615703\", \"MessageId\": \"TMP0100\"}],\n\"Id\": \"5e004f5a-e3d1-11eb-ae9c-3448edf18a38\",\n\"Name\": \"Event Array\"\n}\n}]\n}", + "type": "object", + "title": "Data ... cloud native events data\nData Json payload is as follows,", + "properties": { + "values": { + "type": "array", + "items": { + "$ref": "#/definitions/DataValue" + }, + "x-go-name": "Values" + }, + "version": { + "type": "string", + "x-go-name": "Version" + } + }, + "x-go-package": "github.com/redhat-cne/sdk-go/pkg/event" + }, + "DataType": { + "type": "string", + "title": "DataType ...", + "x-go-package": "github.com/redhat-cne/sdk-go/pkg/event" + }, + "DataValue": { + "description": "{\n\"resource\": \"/cluster/node/ptp\",\n\"dataType\": \"notification\",\n\"valueType\": \"enumeration\",\n\"value\": \"ACQUIRING-SYNC\"\n}", + "type": "object", + "title": "DataValue ...\nDataValue Json payload is as follows,", + "properties": { + "dataType": { + "$ref": "#/definitions/DataType" + }, + "resource": { + "type": "string", + "x-go-name": "Resource" + }, + "value": { + "x-go-name": "Value" + }, + "valueType": { + "$ref": "#/definitions/ValueType" + } + }, + "x-go-package": "github.com/redhat-cne/sdk-go/pkg/event" + }, + "Event": { + "description": "{\n\"id\": \"5ce55d17-9234-4fee-a589-d0f10cb32b8e\",\n\"type\": \"event.sync.sync-status.synchronization-state-change\",\n\"source\": \"/cluster/node/example.com/ptp/clock_realtime\",\n\"time\": \"2021-02-05T17:31:00Z\",\n\"data\": {\n\"version\": \"v1.0\",\n\"values\": [{\n\"resource\": \"/sync/sync-status/sync-state\",\n\"dataType\": \"notification\",\n\"valueType\": \"enumeration\",\n\"value\": \"ACQUIRING-SYNC\"\n}, {\n\"resource\": \"/sync/sync-status/sync-state\",\n\"dataType\": \"metric\",\n\"valueType\": \"decimal64.3\",\n\"value\": 100.3\n}]\n}\n}\n\nEvent request model", + "type": "object", + "title": "Event represents the canonical representation of a Cloud Native Event.\nEvent Json payload is as follows,", + "properties": { + "data": { + "$ref": "#/definitions/Data" + }, + "dataContentType": { + "description": "DataContentType - the Data content type\n+required", + "type": "string", + "x-go-name": "DataContentType" + }, + "dataSchema": { + "$ref": "#/definitions/URI" + }, + "id": { + "description": "ID of the event; must be non-empty and unique within the scope of the producer.\n+required", + "type": "string", + "x-go-name": "ID" + }, + "source": { + "description": "Source - The source of the occurrence which has happened.\n+required", + "type": "string", + "x-go-name": "Source" + }, + "time": { + "description": "Time - A Timestamp when the event happened.\n+required", + "type": "string", + "x-go-name": "Time" + }, + "type": { + "description": "Type - The type of the occurrence which has happened.\n+required", + "type": "string", + "x-go-name": "Type" + } + }, + "x-go-package": "github.com/redhat-cne/sdk-go/pkg/event" + }, + "PubSub": { + "description": "{\n\"id\": \"789be75d-7ac3-472e-bbbc-6d62878aad4a\",\n\"endpointUri\": \"http://localhost:9090/ack/event\",\n\"uriLocation\": \"http://localhost:8080/api/ocloudNotifications/v1/publishers/{publisherid}\",\n\"resource\": \"/east-edge-10/vdu3/o-ran-sync/sync-group/sync-status/sync-state\"\n}\n\nPubSub request model", + "type": "object", + "title": "PubSub represents the canonical representation of a Cloud Native Event Publisher and Sender .\nPubSub Json request payload is as follows,", + "properties": { + "endpointUri": { + "$ref": "#/definitions/URI" + }, + "id": { + "description": "ID of the pub/sub; is updated on successful creation of publisher/subscription.", + "type": "string", + "x-go-name": "ID" + }, + "resource": { + "description": "Resource - The type of the Resource.\n+required", + "type": "string", + "x-go-name": "Resource" + }, + "uriLocation": { + "$ref": "#/definitions/URI" + } + }, + "x-go-package": "github.com/redhat-cne/sdk-go/pkg/pubsub" + }, + "URI": { + "description": "URI is a wrapper to url.URL. It is intended to enforce compliance with\nthe Cloud Native Events spec for their definition of URI. Custom\nmarshal methods are implemented to ensure the outbound URI object\nis a flat string.", + "type": "object", + "properties": { + "ForceQuery": { + "type": "boolean" + }, + "Fragment": { + "type": "string" + }, + "Host": { + "type": "string" + }, + "OmitHost": { + "type": "boolean" + }, + "Opaque": { + "type": "string" + }, + "Path": { + "type": "string" + }, + "RawFragment": { + "type": "string" + }, + "RawPath": { + "type": "string" + }, + "RawQuery": { + "type": "string" + }, + "Scheme": { + "type": "string" + }, + "User": { + "$ref": "#/definitions/Userinfo" + } + }, + "x-go-package": "github.com/redhat-cne/sdk-go/pkg/types" + }, + "Userinfo": { + "description": "The Userinfo type is an immutable encapsulation of username and\npassword details for a URL. An existing Userinfo value is guaranteed\nto have a username set (potentially empty, as allowed by RFC 2396),\nand optionally a password.", + "type": "object", + "x-go-package": "net/url" + }, + "ValueType": { + "type": "string", + "title": "ValueType ...", + "x-go-package": "github.com/redhat-cne/sdk-go/pkg/event" + } + }, + "responses": { + "acceptedReq": { + "description": "Accepted", + "schema": { + "type": "object", + "properties": { + "code": { + "description": "HTTP status code 202 - Accepted", + "type": "integer", + "format": "int64", + "x-go-name": "Code" + } + } + } + }, + "badReq": { + "description": "Error Bad Request", + "schema": { + "type": "object", + "properties": { + "code": { + "description": "HTTP status code 400 - Bad Request", + "type": "integer", + "format": "int64", + "x-go-name": "Code" + } + } + } + }, + "eventResp": { + "description": "PubSub request model", + "schema": { + "$ref": "#/definitions/Event" + } + }, + "notFoundReq": { + "description": "Error Not Found", + "schema": { + "type": "object", + "properties": { + "code": { + "description": "HTTP status code 404 - Not Found", + "type": "integer", + "format": "int64", + "x-go-name": "Code" + } + } + } + }, + "pubSubResp": { + "description": "publisher/subscription data model", + "schema": { + "$ref": "#/definitions/PubSub" + } + } + } +} \ No newline at end of file diff --git a/vendor/github.com/pkg/errors/.gitignore b/vendor/github.com/pkg/errors/.gitignore new file mode 100644 index 0000000..daf913b --- /dev/null +++ b/vendor/github.com/pkg/errors/.gitignore @@ -0,0 +1,24 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/vendor/github.com/pkg/errors/.travis.yml b/vendor/github.com/pkg/errors/.travis.yml new file mode 100644 index 0000000..9159de0 --- /dev/null +++ b/vendor/github.com/pkg/errors/.travis.yml @@ -0,0 +1,10 @@ +language: go +go_import_path: github.com/pkg/errors +go: + - 1.11.x + - 1.12.x + - 1.13.x + - tip + +script: + - make check diff --git a/vendor/github.com/pkg/errors/LICENSE b/vendor/github.com/pkg/errors/LICENSE new file mode 100644 index 0000000..835ba3e --- /dev/null +++ b/vendor/github.com/pkg/errors/LICENSE @@ -0,0 +1,23 @@ +Copyright (c) 2015, Dave Cheney +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/pkg/errors/Makefile b/vendor/github.com/pkg/errors/Makefile new file mode 100644 index 0000000..ce9d7cd --- /dev/null +++ b/vendor/github.com/pkg/errors/Makefile @@ -0,0 +1,44 @@ +PKGS := github.com/pkg/errors +SRCDIRS := $(shell go list -f '{{.Dir}}' $(PKGS)) +GO := go + +check: test vet gofmt misspell unconvert staticcheck ineffassign unparam + +test: + $(GO) test $(PKGS) + +vet: | test + $(GO) vet $(PKGS) + +staticcheck: + $(GO) get honnef.co/go/tools/cmd/staticcheck + staticcheck -checks all $(PKGS) + +misspell: + $(GO) get github.com/client9/misspell/cmd/misspell + misspell \ + -locale GB \ + -error \ + *.md *.go + +unconvert: + $(GO) get github.com/mdempsky/unconvert + unconvert -v $(PKGS) + +ineffassign: + $(GO) get github.com/gordonklaus/ineffassign + find $(SRCDIRS) -name '*.go' | xargs ineffassign + +pedantic: check errcheck + +unparam: + $(GO) get mvdan.cc/unparam + unparam ./... + +errcheck: + $(GO) get github.com/kisielk/errcheck + errcheck $(PKGS) + +gofmt: + @echo Checking code is gofmted + @test -z "$(shell gofmt -s -l -d -e $(SRCDIRS) | tee /dev/stderr)" diff --git a/vendor/github.com/pkg/errors/README.md b/vendor/github.com/pkg/errors/README.md new file mode 100644 index 0000000..54dfdcb --- /dev/null +++ b/vendor/github.com/pkg/errors/README.md @@ -0,0 +1,59 @@ +# errors [![Travis-CI](https://travis-ci.org/pkg/errors.svg)](https://travis-ci.org/pkg/errors) [![AppVeyor](https://ci.appveyor.com/api/projects/status/b98mptawhudj53ep/branch/master?svg=true)](https://ci.appveyor.com/project/davecheney/errors/branch/master) [![GoDoc](https://godoc.org/github.com/pkg/errors?status.svg)](http://godoc.org/github.com/pkg/errors) [![Report card](https://goreportcard.com/badge/github.com/pkg/errors)](https://goreportcard.com/report/github.com/pkg/errors) [![Sourcegraph](https://sourcegraph.com/github.com/pkg/errors/-/badge.svg)](https://sourcegraph.com/github.com/pkg/errors?badge) + +Package errors provides simple error handling primitives. + +`go get github.com/pkg/errors` + +The traditional error handling idiom in Go is roughly akin to +```go +if err != nil { + return err +} +``` +which applied recursively up the call stack results in error reports without context or debugging information. The errors package allows programmers to add context to the failure path in their code in a way that does not destroy the original value of the error. + +## Adding context to an error + +The errors.Wrap function returns a new error that adds context to the original error. For example +```go +_, err := ioutil.ReadAll(r) +if err != nil { + return errors.Wrap(err, "read failed") +} +``` +## Retrieving the cause of an error + +Using `errors.Wrap` constructs a stack of errors, adding context to the preceding error. Depending on the nature of the error it may be necessary to reverse the operation of errors.Wrap to retrieve the original error for inspection. Any error value which implements this interface can be inspected by `errors.Cause`. +```go +type causer interface { + Cause() error +} +``` +`errors.Cause` will recursively retrieve the topmost error which does not implement `causer`, which is assumed to be the original cause. For example: +```go +switch err := errors.Cause(err).(type) { +case *MyError: + // handle specifically +default: + // unknown error +} +``` + +[Read the package documentation for more information](https://godoc.org/github.com/pkg/errors). + +## Roadmap + +With the upcoming [Go2 error proposals](https://go.googlesource.com/proposal/+/master/design/go2draft.md) this package is moving into maintenance mode. The roadmap for a 1.0 release is as follows: + +- 0.9. Remove pre Go 1.9 and Go 1.10 support, address outstanding pull requests (if possible) +- 1.0. Final release. + +## Contributing + +Because of the Go2 errors changes, this package is not accepting proposals for new functionality. With that said, we welcome pull requests, bug fixes and issue reports. + +Before sending a PR, please discuss your change by raising an issue. + +## License + +BSD-2-Clause diff --git a/vendor/github.com/pkg/errors/appveyor.yml b/vendor/github.com/pkg/errors/appveyor.yml new file mode 100644 index 0000000..a932ead --- /dev/null +++ b/vendor/github.com/pkg/errors/appveyor.yml @@ -0,0 +1,32 @@ +version: build-{build}.{branch} + +clone_folder: C:\gopath\src\github.com\pkg\errors +shallow_clone: true # for startup speed + +environment: + GOPATH: C:\gopath + +platform: + - x64 + +# http://www.appveyor.com/docs/installed-software +install: + # some helpful output for debugging builds + - go version + - go env + # pre-installed MinGW at C:\MinGW is 32bit only + # but MSYS2 at C:\msys64 has mingw64 + - set PATH=C:\msys64\mingw64\bin;%PATH% + - gcc --version + - g++ --version + +build_script: + - go install -v ./... + +test_script: + - set PATH=C:\gopath\bin;%PATH% + - go test -v ./... + +#artifacts: +# - path: '%GOPATH%\bin\*.exe' +deploy: off diff --git a/vendor/github.com/pkg/errors/errors.go b/vendor/github.com/pkg/errors/errors.go new file mode 100644 index 0000000..161aea2 --- /dev/null +++ b/vendor/github.com/pkg/errors/errors.go @@ -0,0 +1,288 @@ +// Package errors provides simple error handling primitives. +// +// The traditional error handling idiom in Go is roughly akin to +// +// if err != nil { +// return err +// } +// +// which when applied recursively up the call stack results in error reports +// without context or debugging information. The errors package allows +// programmers to add context to the failure path in their code in a way +// that does not destroy the original value of the error. +// +// Adding context to an error +// +// The errors.Wrap function returns a new error that adds context to the +// original error by recording a stack trace at the point Wrap is called, +// together with the supplied message. For example +// +// _, err := ioutil.ReadAll(r) +// if err != nil { +// return errors.Wrap(err, "read failed") +// } +// +// If additional control is required, the errors.WithStack and +// errors.WithMessage functions destructure errors.Wrap into its component +// operations: annotating an error with a stack trace and with a message, +// respectively. +// +// Retrieving the cause of an error +// +// Using errors.Wrap constructs a stack of errors, adding context to the +// preceding error. Depending on the nature of the error it may be necessary +// to reverse the operation of errors.Wrap to retrieve the original error +// for inspection. Any error value which implements this interface +// +// type causer interface { +// Cause() error +// } +// +// can be inspected by errors.Cause. errors.Cause will recursively retrieve +// the topmost error that does not implement causer, which is assumed to be +// the original cause. For example: +// +// switch err := errors.Cause(err).(type) { +// case *MyError: +// // handle specifically +// default: +// // unknown error +// } +// +// Although the causer interface is not exported by this package, it is +// considered a part of its stable public interface. +// +// Formatted printing of errors +// +// All error values returned from this package implement fmt.Formatter and can +// be formatted by the fmt package. The following verbs are supported: +// +// %s print the error. If the error has a Cause it will be +// printed recursively. +// %v see %s +// %+v extended format. Each Frame of the error's StackTrace will +// be printed in detail. +// +// Retrieving the stack trace of an error or wrapper +// +// New, Errorf, Wrap, and Wrapf record a stack trace at the point they are +// invoked. This information can be retrieved with the following interface: +// +// type stackTracer interface { +// StackTrace() errors.StackTrace +// } +// +// The returned errors.StackTrace type is defined as +// +// type StackTrace []Frame +// +// The Frame type represents a call site in the stack trace. Frame supports +// the fmt.Formatter interface that can be used for printing information about +// the stack trace of this error. For example: +// +// if err, ok := err.(stackTracer); ok { +// for _, f := range err.StackTrace() { +// fmt.Printf("%+s:%d\n", f, f) +// } +// } +// +// Although the stackTracer interface is not exported by this package, it is +// considered a part of its stable public interface. +// +// See the documentation for Frame.Format for more details. +package errors + +import ( + "fmt" + "io" +) + +// New returns an error with the supplied message. +// New also records the stack trace at the point it was called. +func New(message string) error { + return &fundamental{ + msg: message, + stack: callers(), + } +} + +// Errorf formats according to a format specifier and returns the string +// as a value that satisfies error. +// Errorf also records the stack trace at the point it was called. +func Errorf(format string, args ...interface{}) error { + return &fundamental{ + msg: fmt.Sprintf(format, args...), + stack: callers(), + } +} + +// fundamental is an error that has a message and a stack, but no caller. +type fundamental struct { + msg string + *stack +} + +func (f *fundamental) Error() string { return f.msg } + +func (f *fundamental) Format(s fmt.State, verb rune) { + switch verb { + case 'v': + if s.Flag('+') { + io.WriteString(s, f.msg) + f.stack.Format(s, verb) + return + } + fallthrough + case 's': + io.WriteString(s, f.msg) + case 'q': + fmt.Fprintf(s, "%q", f.msg) + } +} + +// WithStack annotates err with a stack trace at the point WithStack was called. +// If err is nil, WithStack returns nil. +func WithStack(err error) error { + if err == nil { + return nil + } + return &withStack{ + err, + callers(), + } +} + +type withStack struct { + error + *stack +} + +func (w *withStack) Cause() error { return w.error } + +// Unwrap provides compatibility for Go 1.13 error chains. +func (w *withStack) Unwrap() error { return w.error } + +func (w *withStack) Format(s fmt.State, verb rune) { + switch verb { + case 'v': + if s.Flag('+') { + fmt.Fprintf(s, "%+v", w.Cause()) + w.stack.Format(s, verb) + return + } + fallthrough + case 's': + io.WriteString(s, w.Error()) + case 'q': + fmt.Fprintf(s, "%q", w.Error()) + } +} + +// Wrap returns an error annotating err with a stack trace +// at the point Wrap is called, and the supplied message. +// If err is nil, Wrap returns nil. +func Wrap(err error, message string) error { + if err == nil { + return nil + } + err = &withMessage{ + cause: err, + msg: message, + } + return &withStack{ + err, + callers(), + } +} + +// Wrapf returns an error annotating err with a stack trace +// at the point Wrapf is called, and the format specifier. +// If err is nil, Wrapf returns nil. +func Wrapf(err error, format string, args ...interface{}) error { + if err == nil { + return nil + } + err = &withMessage{ + cause: err, + msg: fmt.Sprintf(format, args...), + } + return &withStack{ + err, + callers(), + } +} + +// WithMessage annotates err with a new message. +// If err is nil, WithMessage returns nil. +func WithMessage(err error, message string) error { + if err == nil { + return nil + } + return &withMessage{ + cause: err, + msg: message, + } +} + +// WithMessagef annotates err with the format specifier. +// If err is nil, WithMessagef returns nil. +func WithMessagef(err error, format string, args ...interface{}) error { + if err == nil { + return nil + } + return &withMessage{ + cause: err, + msg: fmt.Sprintf(format, args...), + } +} + +type withMessage struct { + cause error + msg string +} + +func (w *withMessage) Error() string { return w.msg + ": " + w.cause.Error() } +func (w *withMessage) Cause() error { return w.cause } + +// Unwrap provides compatibility for Go 1.13 error chains. +func (w *withMessage) Unwrap() error { return w.cause } + +func (w *withMessage) Format(s fmt.State, verb rune) { + switch verb { + case 'v': + if s.Flag('+') { + fmt.Fprintf(s, "%+v\n", w.Cause()) + io.WriteString(s, w.msg) + return + } + fallthrough + case 's', 'q': + io.WriteString(s, w.Error()) + } +} + +// Cause returns the underlying cause of the error, if possible. +// An error value has a cause if it implements the following +// interface: +// +// type causer interface { +// Cause() error +// } +// +// If the error does not implement Cause, the original error will +// be returned. If the error is nil, nil will be returned without further +// investigation. +func Cause(err error) error { + type causer interface { + Cause() error + } + + for err != nil { + cause, ok := err.(causer) + if !ok { + break + } + err = cause.Cause() + } + return err +} diff --git a/vendor/github.com/pkg/errors/go113.go b/vendor/github.com/pkg/errors/go113.go new file mode 100644 index 0000000..be0d10d --- /dev/null +++ b/vendor/github.com/pkg/errors/go113.go @@ -0,0 +1,38 @@ +// +build go1.13 + +package errors + +import ( + stderrors "errors" +) + +// Is reports whether any error in err's chain matches target. +// +// The chain consists of err itself followed by the sequence of errors obtained by +// repeatedly calling Unwrap. +// +// An error is considered to match a target if it is equal to that target or if +// it implements a method Is(error) bool such that Is(target) returns true. +func Is(err, target error) bool { return stderrors.Is(err, target) } + +// As finds the first error in err's chain that matches target, and if so, sets +// target to that error value and returns true. +// +// The chain consists of err itself followed by the sequence of errors obtained by +// repeatedly calling Unwrap. +// +// An error matches target if the error's concrete value is assignable to the value +// pointed to by target, or if the error has a method As(interface{}) bool such that +// As(target) returns true. In the latter case, the As method is responsible for +// setting target. +// +// As will panic if target is not a non-nil pointer to either a type that implements +// error, or to any interface type. As returns false if err is nil. +func As(err error, target interface{}) bool { return stderrors.As(err, target) } + +// Unwrap returns the result of calling the Unwrap method on err, if err's +// type contains an Unwrap method returning error. +// Otherwise, Unwrap returns nil. +func Unwrap(err error) error { + return stderrors.Unwrap(err) +} diff --git a/vendor/github.com/pkg/errors/stack.go b/vendor/github.com/pkg/errors/stack.go new file mode 100644 index 0000000..779a834 --- /dev/null +++ b/vendor/github.com/pkg/errors/stack.go @@ -0,0 +1,177 @@ +package errors + +import ( + "fmt" + "io" + "path" + "runtime" + "strconv" + "strings" +) + +// Frame represents a program counter inside a stack frame. +// For historical reasons if Frame is interpreted as a uintptr +// its value represents the program counter + 1. +type Frame uintptr + +// pc returns the program counter for this frame; +// multiple frames may have the same PC value. +func (f Frame) pc() uintptr { return uintptr(f) - 1 } + +// file returns the full path to the file that contains the +// function for this Frame's pc. +func (f Frame) file() string { + fn := runtime.FuncForPC(f.pc()) + if fn == nil { + return "unknown" + } + file, _ := fn.FileLine(f.pc()) + return file +} + +// line returns the line number of source code of the +// function for this Frame's pc. +func (f Frame) line() int { + fn := runtime.FuncForPC(f.pc()) + if fn == nil { + return 0 + } + _, line := fn.FileLine(f.pc()) + return line +} + +// name returns the name of this function, if known. +func (f Frame) name() string { + fn := runtime.FuncForPC(f.pc()) + if fn == nil { + return "unknown" + } + return fn.Name() +} + +// Format formats the frame according to the fmt.Formatter interface. +// +// %s source file +// %d source line +// %n function name +// %v equivalent to %s:%d +// +// Format accepts flags that alter the printing of some verbs, as follows: +// +// %+s function name and path of source file relative to the compile time +// GOPATH separated by \n\t (\n\t) +// %+v equivalent to %+s:%d +func (f Frame) Format(s fmt.State, verb rune) { + switch verb { + case 's': + switch { + case s.Flag('+'): + io.WriteString(s, f.name()) + io.WriteString(s, "\n\t") + io.WriteString(s, f.file()) + default: + io.WriteString(s, path.Base(f.file())) + } + case 'd': + io.WriteString(s, strconv.Itoa(f.line())) + case 'n': + io.WriteString(s, funcname(f.name())) + case 'v': + f.Format(s, 's') + io.WriteString(s, ":") + f.Format(s, 'd') + } +} + +// MarshalText formats a stacktrace Frame as a text string. The output is the +// same as that of fmt.Sprintf("%+v", f), but without newlines or tabs. +func (f Frame) MarshalText() ([]byte, error) { + name := f.name() + if name == "unknown" { + return []byte(name), nil + } + return []byte(fmt.Sprintf("%s %s:%d", name, f.file(), f.line())), nil +} + +// StackTrace is stack of Frames from innermost (newest) to outermost (oldest). +type StackTrace []Frame + +// Format formats the stack of Frames according to the fmt.Formatter interface. +// +// %s lists source files for each Frame in the stack +// %v lists the source file and line number for each Frame in the stack +// +// Format accepts flags that alter the printing of some verbs, as follows: +// +// %+v Prints filename, function, and line number for each Frame in the stack. +func (st StackTrace) Format(s fmt.State, verb rune) { + switch verb { + case 'v': + switch { + case s.Flag('+'): + for _, f := range st { + io.WriteString(s, "\n") + f.Format(s, verb) + } + case s.Flag('#'): + fmt.Fprintf(s, "%#v", []Frame(st)) + default: + st.formatSlice(s, verb) + } + case 's': + st.formatSlice(s, verb) + } +} + +// formatSlice will format this StackTrace into the given buffer as a slice of +// Frame, only valid when called with '%s' or '%v'. +func (st StackTrace) formatSlice(s fmt.State, verb rune) { + io.WriteString(s, "[") + for i, f := range st { + if i > 0 { + io.WriteString(s, " ") + } + f.Format(s, verb) + } + io.WriteString(s, "]") +} + +// stack represents a stack of program counters. +type stack []uintptr + +func (s *stack) Format(st fmt.State, verb rune) { + switch verb { + case 'v': + switch { + case st.Flag('+'): + for _, pc := range *s { + f := Frame(pc) + fmt.Fprintf(st, "\n%+v", f) + } + } + } +} + +func (s *stack) StackTrace() StackTrace { + f := make([]Frame, len(*s)) + for i := 0; i < len(f); i++ { + f[i] = Frame((*s)[i]) + } + return f +} + +func callers() *stack { + const depth = 32 + var pcs [depth]uintptr + n := runtime.Callers(3, pcs[:]) + var st stack = pcs[0:n] + return &st +} + +// funcname removes the path prefix component of a function's name reported by func.Name(). +func funcname(name string) string { + i := strings.LastIndex(name, "/") + name = name[i+1:] + i = strings.Index(name, ".") + return name[i+1:] +} diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/store/subscriber/subscriber.go b/vendor/github.com/redhat-cne/sdk-go/pkg/store/subscriber/subscriber.go new file mode 100644 index 0000000..6584cc0 --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/store/subscriber/subscriber.go @@ -0,0 +1,40 @@ +package subscriber + +import ( + "sync" + + "github.com/google/uuid" + + "github.com/redhat-cne/sdk-go/pkg/subscriber" +) + +// Store defines subscribers connection store struct +type Store struct { + sync.RWMutex + // Store stores subscribers in a map + Store map[uuid.UUID]*subscriber.Subscriber +} + +// Set is a wrapper for setting the value of a key in the underlying map +func (ss *Store) Set(clientID uuid.UUID, val subscriber.Subscriber) { + ss.Lock() + defer ss.Unlock() + ss.Store[clientID] = &val +} + +// Get is a wrapper for Getting the value of a key in the underlying map +func (ss *Store) Get(clientID uuid.UUID) (subscriber.Subscriber, bool) { + ss.Lock() + defer ss.Unlock() + if s, ok := ss.Store[clientID]; ok { + return *s, true + } + return subscriber.Subscriber{}, false +} + +// Delete ... delete from store +func (ss *Store) Delete(clientID uuid.UUID) { + ss.Lock() + defer ss.Unlock() + delete(ss.Store, clientID) +} diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/docs.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/docs.go new file mode 100644 index 0000000..002f978 --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/docs.go @@ -0,0 +1 @@ +package subscriber diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber.go new file mode 100644 index 0000000..3c8f3fd --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber.go @@ -0,0 +1,112 @@ +package subscriber + +// Copyright 2022 The Cloud Native Events Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import ( + "strings" + "sync" + + "github.com/google/uuid" + + "github.com/redhat-cne/sdk-go/pkg/channel" + + "github.com/redhat-cne/sdk-go/pkg/pubsub" + "github.com/redhat-cne/sdk-go/pkg/store" + "github.com/redhat-cne/sdk-go/pkg/types" +) + +// Status of the client connections +type Status int64 + +const ( + SetConnectionToFailAfter = 10 +) +const ( + // InActive client + InActive Status = iota + // Active Client + Active +) + +// Subscriber object holds client connections +type Subscriber struct { + // ClientID of the sub + // +required + ClientID uuid.UUID `json:"clientID" omit:"empty"` + // +required + SubStore *store.PubSubStore `json:"subStore" omit:"empty"` + // EndPointURI - A URI describing the subscriber link . + // +required + EndPointURI *types.URI `json:"endPointURI" omit:"empty"` + // Status ... + Status Status `json:"status" omit:"empty"` + // Action ... + Action channel.Status `json:"action" omit:"empty"` + // FailedCount ... + failedCount int +} + +// Get ... get pubsub +func (s *Subscriber) Get(subID string) pubsub.PubSub { + return s.SubStore.Get(subID) +} + +// IncFailCount ... +func (s *Subscriber) IncFailCount() { + s.failedCount++ + if s.failedCount >= SetConnectionToFailAfter { + s.Action = channel.DELETE + s.Status = InActive + } +} + +// ResetFailCount ... +func (s *Subscriber) ResetFailCount() { + s.failedCount = 0 +} + +func (s *Subscriber) FailedCount() int { + return s.failedCount +} + +// String returns a pretty-printed representation of the Event. +func (s *Subscriber) String() string { + b := strings.Builder{} + b.WriteString(" EndPointURI: " + s.GetEndPointURI() + "\n") + b.WriteString(" ID: " + s.GetClientID().String() + "\n") + b.WriteString(" sub :{") + if s.SubStore != nil { + for _, v := range s.SubStore.Store { + b.WriteString(" {") + b.WriteString(v.String() + "\n") + b.WriteString(" },") + } + } + b.WriteString(" }") + return b.String() +} + +// New create new subscriber +func New(clientID uuid.UUID) *Subscriber { + return &Subscriber{ + ClientID: clientID, + SubStore: &store.PubSubStore{ + RWMutex: sync.RWMutex{}, + Store: map[string]*pubsub.PubSub{}, + }, + EndPointURI: nil, + Status: 0, + } +} diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_interface.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_interface.go new file mode 100644 index 0000000..7866033 --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_interface.go @@ -0,0 +1,51 @@ +package subscriber + +import ( + "github.com/google/uuid" + "github.com/redhat-cne/sdk-go/pkg/pubsub" + "github.com/redhat-cne/sdk-go/pkg/store" +) + +// Copyright 2022 The Cloud Native Events Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Reader is the interface for reading through an event from attributes. +type Reader interface { + // GetClientID returns clientID + GetClientID() uuid.UUID + // GetStatus Get Status of the subscribers + GetStatus() Status + // String returns a pretty-printed representation of the PubSub. + String() string + // GetSubStore return pubsub data + GetSubStore() *store.PubSubStore + // GetEndPointURI EndPointURI return endpoint + GetEndPointURI() string +} + +// Writer is the interface for writing through an event onto attributes. +// If an error is thrown by a subcomponent, Writer caches the error +// internally and exposes errors with a call to Writer.Validate(). +type Writer interface { + // SetClientID Resource performs event.SetResource() + SetClientID(clientID uuid.UUID) + + // SetStatus SetID performs event.SetID. + SetStatus(status Status) + + // SetEndPointURI SetHealthEndPoint set health endpoint + SetEndPointURI(url string) error + + AddSubscription(sub ...pubsub.PubSub) +} diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_reader.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_reader.go new file mode 100644 index 0000000..98bcf43 --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_reader.go @@ -0,0 +1,44 @@ +package subscriber + +import ( + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" + "github.com/redhat-cne/sdk-go/pkg/channel" + "github.com/redhat-cne/sdk-go/pkg/store" +) + +var _ Reader = (*Subscriber)(nil) + +// GetClientID ... Get subscriber ClientID +func (s *Subscriber) GetClientID() uuid.UUID { + return s.ClientID +} + +// GetEndPointURI EndPointURI returns uri location +func (s *Subscriber) GetEndPointURI() string { + return s.EndPointURI.String() +} + +// GetStatus of the client connection +func (s *Subscriber) GetStatus() Status { + return s.Status +} + +// GetSubStore get subscription store +func (s *Subscriber) GetSubStore() *store.PubSubStore { + return s.SubStore +} + +// CreateCloudEvents ... +func (s *Subscriber) CreateCloudEvents() (*cloudevents.Event, error) { + ce := cloudevents.NewEvent(cloudevents.VersionV03) + ce.SetDataContentType(cloudevents.ApplicationJSON) + ce.SetSpecVersion(cloudevents.VersionV03) + ce.SetType(channel.SUBSCRIBER.String()) + ce.SetSource("subscription-request") + ce.SetID(uuid.New().String()) + if err := ce.SetData(cloudevents.ApplicationJSON, s); err != nil { + return nil, err + } + return &ce, nil +} diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_writer.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_writer.go new file mode 100644 index 0000000..5b9bad9 --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_writer.go @@ -0,0 +1,53 @@ +package subscriber + +import ( + "fmt" + "net/url" + "strings" + + "github.com/google/uuid" + + "github.com/redhat-cne/sdk-go/pkg/pubsub" + "github.com/redhat-cne/sdk-go/pkg/types" +) + +var _ Writer = (*Subscriber)(nil) + +// SetClientID ... +func (s *Subscriber) SetClientID(clientID uuid.UUID) { + s.ClientID = clientID +} + +// SetEndPointURI set uri location (return url) +func (s *Subscriber) SetEndPointURI(endPointURI string) error { + sEndPointURI := strings.TrimSpace(endPointURI) + if sEndPointURI == "" { + s.EndPointURI = nil + err := fmt.Errorf("EndPointURI is given empty string,should be valid url") + return err + } + endPointURL, err := url.Parse(sEndPointURI) + if err != nil { + return err + } + s.EndPointURI = &types.URI{URL: *endPointURL} + return nil +} + +// SetStatus set status of the connection +func (s *Subscriber) SetStatus(status Status) { + s.Status = status +} + +// AddSubscription ... +func (s *Subscriber) AddSubscription(subs ...pubsub.PubSub) { + for _, ss := range subs { + newS := &pubsub.PubSub{ + ID: ss.GetID(), + EndPointURI: nil, + URILocation: nil, + Resource: ss.Resource, + } + s.SubStore.Store[ss.GetID()] = newS + } +} diff --git a/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/docs.go b/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/docs.go new file mode 100644 index 0000000..002f978 --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/docs.go @@ -0,0 +1 @@ +package subscriber diff --git a/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/subscriber.go b/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/subscriber.go new file mode 100644 index 0000000..cb3c4c2 --- /dev/null +++ b/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/subscriber.go @@ -0,0 +1,479 @@ +package subscriber + +import ( + "encoding/json" + "fmt" + "io" + "os" + "sync" + + "github.com/google/uuid" + "github.com/redhat-cne/sdk-go/pkg/channel" + "github.com/redhat-cne/sdk-go/pkg/types" + + "github.com/pkg/errors" + "github.com/redhat-cne/sdk-go/pkg/pubsub" + "github.com/redhat-cne/sdk-go/pkg/store" + + log "github.com/sirupsen/logrus" + + SubscriberStore "github.com/redhat-cne/sdk-go/pkg/store/subscriber" + "github.com/redhat-cne/sdk-go/pkg/subscriber" +) + +// API ... api methods for publisher subscriber +type API struct { + SubscriberStore *SubscriberStore.Store // each client will have one store + storeFilePath string // subscribers + transportEnabled bool // http is enabled +} + +var instance *API +var once sync.Once +var mu sync.Mutex + +// NewSubscriber create new subscribers connections +func NewSubscriber(clientID uuid.UUID) subscriber.Subscriber { + return subscriber.Subscriber{ + ClientID: clientID, + SubStore: &store.PubSubStore{ + RWMutex: sync.RWMutex{}, + Store: nil, + }, + EndPointURI: nil, + Status: 1, + } +} + +// New creates empty publisher or subscriber +func New() subscriber.Subscriber { + return subscriber.Subscriber{} +} + +// GetAPIInstance get event instance +func GetAPIInstance(storeFilePath string) *API { + once.Do(func() { + instance = &API{ + transportEnabled: true, + SubscriberStore: &SubscriberStore.Store{ + RWMutex: sync.RWMutex{}, + Store: map[uuid.UUID]*subscriber.Subscriber{}, + }, + storeFilePath: storeFilePath, + } + hasDir(storeFilePath) + instance.ReloadStore() + }) + return instance +} + +// ReloadStore reload store if there is any change or refresh is required +func (p *API) ReloadStore() { + // load for file + log.Infof("reloading subscribers from the store %s", p.storeFilePath) + if files, err := loadFileNamesFromDir(p.storeFilePath); err == nil { + for _, f := range files { + if b, err1 := loadFromFile(fmt.Sprintf("%s/%s", p.storeFilePath, f)); err1 == nil { + if len(b) > 0 { + var sub subscriber.Subscriber + var err2 error + if err2 = json.Unmarshal(b, &sub); err2 == nil { + p.SubscriberStore.Set(sub.ClientID, sub) + } else { + log.Errorf("error parsing subscriber %s \n %s", string(b), err2.Error()) + } + } + } + } + } + log.Infof("%d registered clients reloaded", len(p.SubscriberStore.Store)) + for k, v := range p.SubscriberStore.Store { + log.Infof("registered clients %s : %s", k, v.String()) + } +} + +// HasTransportEnabled ... +func (p *API) HasTransportEnabled() bool { + return p.transportEnabled +} + +// DisableTransport ... +func (p *API) DisableTransport() { + p.transportEnabled = false +} + +// EnableTransport ... +func (p *API) EnableTransport() { + p.transportEnabled = true +} + +// ClientCount .. client cound +func (p *API) ClientCount() int { + return len(p.SubscriberStore.Store) +} + +// GetSubFromSubscriptionsStore get data from publisher store +func (p *API) GetSubFromSubscriptionsStore(clientID uuid.UUID, address string) (pubsub.PubSub, error) { + if subscriber, ok := p.HasClient(clientID); ok { + for _, sub := range subscriber.SubStore.Store { + if sub.GetResource() == address { + return pubsub.PubSub{ + ID: sub.ID, + EndPointURI: sub.EndPointURI, + URILocation: sub.URILocation, + Resource: sub.Resource, + }, nil + } + } + } + + return pubsub.PubSub{}, fmt.Errorf("publisher not found for address %s", address) +} + +// HasSubscription check if the subscriptionOne is already exists in the store/cache +func (p *API) HasSubscription(clientID uuid.UUID, address string) (pubsub.PubSub, bool) { + if sub, err := p.GetSubFromSubscriptionsStore(clientID, address); err == nil { + return sub, true + } + return pubsub.PubSub{}, false +} + +// HasClient check if client is already exists in the store/cache +func (p *API) HasClient(clientID uuid.UUID) (*subscriber.Subscriber, bool) { + if subscriber, ok := p.SubscriberStore.Get(clientID); ok { + return &subscriber, true + } + return nil, false +} + +// CreateSubscription create a subscriptionOne and store it in a file and cache +func (p *API) CreateSubscription(clientID uuid.UUID, sub subscriber.Subscriber) (subscriptionClient *subscriber.Subscriber, err error) { + var ok bool + if subscriptionClient, ok = p.HasClient(clientID); !ok { + subscriptionClient = subscriber.New(clientID) + } + subscriptionClient.ResetFailCount() + _ = subscriptionClient.SetEndPointURI(sub.GetEndPointURI()) + subscriptionClient.SetStatus(subscriber.Active) + subscriptionClient.Action = channel.NEW + pubStore := subscriptionClient.GetSubStore() + var hasResource bool + for key, value := range sub.SubStore.Store { + hasResource = false + for _, s := range pubStore.Store { + if s.Resource == value.Resource { + hasResource = true + continue + } + } + if !hasResource { + if key == "" { + key = uuid.New().String() + } + subscriptionClient.SubStore.Set(key, *value) + } + } + p.SubscriberStore.Set(clientID, *subscriptionClient) + // persist the subscriptionOne - + //TODO:might want to use PVC to live beyond pod crash + err = writeToFile(*subscriptionClient, fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID))) + if err != nil { + log.Errorf("error writing to a store %v\n", err) + return nil, err + } + log.Infof("subscription persisted into a file %s", fmt.Sprintf("%s/%s - content %s", p.storeFilePath, fmt.Sprintf("%s.json", clientID), subscriptionClient.String())) + // store the publisher + + return subscriptionClient, nil +} + +// GetSubscriptionClient get a clientID by id +func (p *API) GetSubscriptionClient(clientID uuid.UUID) (subscriber.Subscriber, error) { + if subs, ok := p.SubscriberStore.Get(clientID); ok { + return subs, nil + } + return subscriber.Subscriber{}, fmt.Errorf("subscriber data was not found for id %s", clientID) +} + +// GetSubscriptionsFromFile get subscriptions data from the file store +func (p *API) GetSubscriptionsFromFile(clientID uuid.UUID) ([]byte, error) { + b, err := loadFromFile(fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID.String()))) + return b, err +} + +// GetSubscriptions get all subscriptionOne inforamtions +func (p *API) GetSubscriptions(clientID uuid.UUID) (sub map[string]*pubsub.PubSub) { + if subs, ok := p.SubscriberStore.Get(clientID); ok { + sub = subs.SubStore.Store + } + + return +} + +// GetSubscription get subscriptionOne inforamtions +func (p *API) GetSubscription(clientID uuid.UUID, subID string) pubsub.PubSub { + if subs, ok := p.SubscriberStore.Get(clientID); ok { + return subs.Get(subID) + } + return pubsub.PubSub{} +} + +// GetSubscriberURLByResourceAndClientID get subscription information by client id/resource +func (p *API) GetSubscriberURLByResourceAndClientID(clientID uuid.UUID, resource string) (url *string) { + p.SubscriberStore.RLock() + defer p.SubscriberStore.RUnlock() + for _, subs := range p.SubscriberStore.Store { + if subs.ClientID == clientID { + for _, sub := range subs.SubStore.Store { + if sub.GetResource() == resource { + return func(s string) *string { + return &s + }(subs.GetEndPointURI()) + } + } + } + } + return nil +} + +// GetSubscriberURLByResource get subscriptionOne information +func (p *API) GetSubscriberURLByResource(resource string) (urls []string) { + p.SubscriberStore.RLock() + defer p.SubscriberStore.RUnlock() + for _, subs := range p.SubscriberStore.Store { + for _, sub := range subs.SubStore.Store { + if sub.GetResource() == resource { + urls = append(urls, subs.GetEndPointURI()) + } + } + } + return urls +} + +// GetClientIDByResource get subscriptionOne information +func (p *API) GetClientIDByResource(resource string) (clientIDs []uuid.UUID) { + p.SubscriberStore.RLock() + defer p.SubscriberStore.RUnlock() + for _, subs := range p.SubscriberStore.Store { + for _, sub := range subs.SubStore.Store { + if sub.GetResource() == resource { + clientIDs = append(clientIDs, subs.ClientID) + } + } + } + return clientIDs +} + +// GetClientIDAddressByResource get subscriptionOne information +func (p *API) GetClientIDAddressByResource(resource string) map[uuid.UUID]*types.URI { + clients := map[uuid.UUID]*types.URI{} + p.SubscriberStore.RLock() + defer p.SubscriberStore.RUnlock() + for _, subs := range p.SubscriberStore.Store { + for _, sub := range subs.SubStore.Store { + if sub.GetResource() == resource { + clients[subs.ClientID] = subs.EndPointURI + } + } + } + return clients +} + +// DeleteSubscription delete a subscriptionOne by id +func (p *API) DeleteSubscription(clientID uuid.UUID, subscriptionID string) error { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { // client found + if sub, ok2 := subStore.SubStore.Store[subscriptionID]; ok2 { + err := deleteFromFile(*sub, fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID))) + subStore.SubStore.Delete(subscriptionID) + p.SubscriberStore.Set(clientID, subStore) + return err + } + } + return nil +} + +// DeleteAllSubscriptions delete all subscriptionOne information +func (p *API) DeleteAllSubscriptions(clientID uuid.UUID) error { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { + if err := deleteAllFromFile(fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID))); err != nil { + return err + } + subStore.SubStore = &store.PubSubStore{ + RWMutex: sync.RWMutex{}, + Store: map[string]*pubsub.PubSub{}, + } + p.SubscriberStore.Set(clientID, subStore) + } + return nil +} + +// DeleteClient delete all subscriptionOne information +func (p *API) DeleteClient(clientID uuid.UUID) error { + if _, ok := p.SubscriberStore.Get(clientID); ok { // client found + log.Infof("delete from file %s", + fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID))) + if err := deleteAllFromFile(fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID))); err != nil { + return err + } + p.SubscriberStore.Delete(clientID) + } else { + log.Infof("subscription for client id %s not found", clientID) + } + return nil +} + +// UpdateStatus .. update status +func (p *API) UpdateStatus(clientID uuid.UUID, status subscriber.Status) error { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { + subStore.SetStatus(status) + p.SubscriberStore.Set(clientID, subStore) + // do not write to file , if restarts it will consider all client are active + } else { + return errors.New("failed to update subscriber status") + } + return nil +} + +// IncFailCountToFail .. update fail count +func (p *API) IncFailCountToFail(clientID uuid.UUID) bool { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { + subStore.IncFailCount() + p.SubscriberStore.Set(clientID, subStore) + if subStore.Action == channel.DELETE { + return true + } + } + return false +} + +// FailCountThreshold .. get threshold +func (p *API) FailCountThreshold() int { + return subscriber.SetConnectionToFailAfter +} + +func (p *API) GetFailCount(clientID uuid.UUID) int { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { + return subStore.FailedCount() + } + return 0 +} + +// SubscriberMarkedForDelete ... +func (p *API) SubscriberMarkedForDelete(clientID uuid.UUID) bool { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { + if subStore.Action == channel.DELETE { + return true + } + } + return false +} + +// deleteAllFromFile deletes publisher and subscriptionOne information from the file system +func deleteAllFromFile(filePath string) error { + return os.Remove(filePath) +} + +// DeleteFromFile is used to delete subscriptionOne from the file system +func deleteFromFile(sub pubsub.PubSub, filePath string) error { + var persistedSubClient subscriber.Subscriber + //open file + file, err := os.OpenFile(filePath, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return err + } + defer file.Close() + //read file and unmarshall json file to slice of users + b, err := io.ReadAll(file) + if err != nil { + return err + } + + if len(b) > 0 { + err = json.Unmarshal(b, &persistedSubClient) + if err != nil { + return err + } + } + delete(persistedSubClient.SubStore.Store, sub.ID) + + newBytes, err := json.MarshalIndent(&persistedSubClient, "", " ") + if err != nil { + log.Errorf("error deleting sub %v", err) + return err + } + return os.WriteFile(filePath, newBytes, 0666) +} + +// loadFromFile is used to read subscriptionOne/publisher from the file system +func loadFromFile(filePath string) (b []byte, err error) { + //open file + file, err := os.OpenFile(filePath, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return nil, err + } + defer file.Close() + //read file and unmarshall json file to slice of users + b, err = io.ReadAll(file) + if err != nil { + return nil, err + } + return b, nil +} + +func loadFileNamesFromDir(filePath string) (subFiles []string, err error) { + files, err := os.ReadDir(filePath) + if err != nil { + return subFiles, err + } + for _, file := range files { + if !file.IsDir() { + subFiles = append(subFiles, file.Name()) + } + } + return +} + +// writeToFile writes subscriptionOne data to a file +func writeToFile(subscriberClient subscriber.Subscriber, filePath string) error { + //open file + mu.Lock() + defer mu.Unlock() + file, err := os.OpenFile(filePath, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return err + } + defer file.Close() + //read file and unmarshall json file to slice of users + b, err := io.ReadAll(file) + if err != nil { + return err + } + + var persistedSubClient subscriber.Subscriber + if len(b) > 0 { + err = json.Unmarshal(b, &persistedSubClient) + if err != nil { + return err + } + } else { + persistedSubClient = *subscriber.New(subscriberClient.ClientID) + } // no file found + _ = persistedSubClient.SetEndPointURI(subscriberClient.GetEndPointURI()) + persistedSubClient.SetStatus(subscriber.Active) + for subID, sub := range subscriberClient.SubStore.Store { + persistedSubClient.SubStore.Store[subID] = sub + } + + newBytes, err := json.MarshalIndent(&persistedSubClient, "", " ") + if err != nil { + return err + } + log.Infof("persisting following contents %s to a file %s\n", string(newBytes), filePath) + return os.WriteFile(filePath, newBytes, 0666) +} + +func hasDir(path string) { + if _, err := os.Stat(path); os.IsNotExist(err) { + _ = os.Mkdir(path, 0700) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index e43282a..7ced906 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -48,6 +48,9 @@ github.com/modern-go/concurrent # github.com/modern-go/reflect2 v1.0.2 ## explicit; go 1.12 github.com/modern-go/reflect2 +# github.com/pkg/errors v0.9.1 +## explicit +github.com/pkg/errors # github.com/pmezard/go-difflib v1.0.0 ## explicit github.com/pmezard/go-difflib/difflib @@ -76,11 +79,14 @@ github.com/redhat-cne/sdk-go/pkg/event/ptp github.com/redhat-cne/sdk-go/pkg/event/redfish github.com/redhat-cne/sdk-go/pkg/pubsub github.com/redhat-cne/sdk-go/pkg/store +github.com/redhat-cne/sdk-go/pkg/store/subscriber +github.com/redhat-cne/sdk-go/pkg/subscriber github.com/redhat-cne/sdk-go/pkg/types github.com/redhat-cne/sdk-go/pkg/util/clock github.com/redhat-cne/sdk-go/pkg/util/wait github.com/redhat-cne/sdk-go/v1/event github.com/redhat-cne/sdk-go/v1/pubsub +github.com/redhat-cne/sdk-go/v1/subscriber # github.com/rogpeppe/go-internal v1.10.0 ## explicit; go 1.19 # github.com/sirupsen/logrus v1.8.1