diff --git a/go.mod b/go.mod index 3a87baa..721f30b 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.0 github.com/prometheus/client_golang v1.14.0 - github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612 + github.com/redhat-cne/sdk-go v1.0.1-0.20240716153735-19a18fd38ee6 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.8.0 golang.org/x/net v0.7.0 diff --git a/go.sum b/go.sum index 8e618d8..5b07b28 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= -github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612 h1:TnnP33wqdtZ4GCp8WYHVFVywWxrcGonc0ijGCpfqTdU= -github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612/go.mod h1:q9LxxPbK1tGpDbQm/KIPujqdP0bK1hhuHrIXV3vuUrM= +github.com/redhat-cne/sdk-go v1.0.1-0.20240716153735-19a18fd38ee6 h1:TYW1M1qUC51tN5nIy3cRwNFDn/M7E81jbtIthzyTwQ8= +github.com/redhat-cne/sdk-go v1.0.1-0.20240716153735-19a18fd38ee6/go.mod h1:q9LxxPbK1tGpDbQm/KIPujqdP0bK1hhuHrIXV3vuUrM= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= diff --git a/server_test.go b/server_test.go index a620c00..520d3dd 100644 --- a/server_test.go +++ b/server_test.go @@ -537,6 +537,8 @@ func Test_MultiplePost(t *testing.T) { } func TestServer_End(*testing.T) { + os.Remove("pub.json") + os.Remove("sub.json") close(eventOutCh) close(closeCh) } diff --git a/v2/routes.go b/v2/routes.go index 89706d8..c6a414c 100644 --- a/v2/routes.go +++ b/v2/routes.go @@ -109,6 +109,7 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) { respondWithStatusCode(w, http.StatusNotFound, "event not found") localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) log.Error("event not found") + return } restClient := restclient.New() @@ -400,7 +401,7 @@ func (s *Server) publishEvent(w http.ResponseWriter, r *http.Request) { respondWithError(w, fmt.Sprintf("no publisher data for id %s found to publish event for", cneEvent.ID)) return } - ceEvent, err := cneEvent.NewCloudEventV2(&pub) + ceEvent, err := cneEvent.NewCloudEventV2() if err != nil { localmetrics.UpdateEventPublishedCount(pub.Resource, localmetrics.FAIL, 1) respondWithError(w, err.Error()) @@ -467,7 +468,6 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) { } e, _ := out.CreateCloudEvents(CURRENTSTATE) - e.SetSource(resourceAddress) // statusReceiveOverrideFn must return value for if s.statusReceiveOverrideFn != nil { if statusErr := s.statusReceiveOverrideFn(*e, &out); statusErr != nil { @@ -501,10 +501,9 @@ func (s *Server) pingForSubscribedEventStatus(w http.ResponseWriter, r *http.Req cneEvent.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time) cneEvent.SetDataContentType(cloudevents.ApplicationJSON) cneEvent.SetData(cne.Data{ - - Version: "v1", + Version: cne.APISchemaVersion, }) - ceEvent, err := cneEvent.NewCloudEventV2(&sub) + ceEvent, err := cneEvent.NewCloudEventV2() if err != nil { respondWithError(w, err.Error()) diff --git a/v2/server.go b/v2/server.go index 37678f9..fc459be 100644 --- a/v2/server.go +++ b/v2/server.go @@ -388,3 +388,8 @@ func (s *Server) Shutdown() { func (s *Server) SetOnStatusReceiveOverrideFn(fn func(e cloudevents.Event, dataChan *channel.DataChan) error) { s.statusReceiveOverrideFn = fn } + +// GetSubscriberAPI ... +func (s *Server) GetSubscriberAPI() *subscriberApi.API { + return s.subscriberAPI +} diff --git a/v2/server_test.go b/v2/server_test.go index 955a93f..c8bf01a 100644 --- a/v2/server_test.go +++ b/v2/server_test.go @@ -49,9 +49,9 @@ var ( eventOutCh chan *channel.DataChan closeCh chan struct{} wg sync.WaitGroup - port = 8989 + port = 8990 apHost = "localhost" - apPath = "/routes/cne/v1/" + apPath = "/api/ocloudNotifications/v2/" resource = "test/test" resourceNoneSubscribed = "test/nonesubscribed" storePath = "." @@ -59,13 +59,30 @@ var ( ObjPub pubsub.PubSub ) +func onReceiveOverrideFn(_ cloudevents.Event, d *channel.DataChan) error { + data := &event.Data{ + Version: event.APISchemaVersion, + Values: []event.DataValue{}, + } + ce := cloudevents.NewEvent(cloudevents.VersionV1) + ce.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time) + ce.SetType("dummyType") + ce.SetSource("dummySource") + ce.SetSpecVersion(cloudevents.VersionV1) + ce.SetID(uuid.New().String()) + ce.SetData("", *data) //nolint:errcheck + d.Data = &ce + + return nil +} + func init() { eventOutCh = make(chan *channel.DataChan, 10) closeCh = make(chan struct{}) } func TestMain(m *testing.M) { - server = restapi.InitServer(port, apHost, apPath, storePath, eventOutCh, closeCh, nil) + server = restapi.InitServer(port, apHost, apPath, storePath, eventOutCh, closeCh, onReceiveOverrideFn) //start http server server.Start() @@ -89,7 +106,7 @@ func TestMain(m *testing.M) { }, }, } - data.SetVersion("v1") //nolint:errcheck + data.SetVersion("1.0") //nolint:errcheck cneEvent.SetData(data) e := cloudevents.Event{ Context: cloudevents.EventContextV1{ @@ -154,8 +171,7 @@ func TestServer_CreateSubscription(t *testing.T) { // create subscription sub := api.NewPubSub( &types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}}, - resource, "1.0") - + resource, "2.0") data, err := json.Marshal(&sub) assert.Nil(t, err) assert.NotNil(t, data) @@ -184,6 +200,31 @@ func TestServer_CreateSubscription(t *testing.T) { log.Infof("Subscription:\n%s", ObjSub.String()) } +func TestServer_CreateSubscriptionConflict(t *testing.T) { + // create subscription + sub := api.NewPubSub( + &types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}}, + resource, "2.0") + data, err := json.Marshal(&sub) + assert.Nil(t, err) + assert.NotNil(t, data) + /// create new subscription + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "subscriptions"), bytes.NewBuffer(data)) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + assert.Nil(t, err) + bodyString := string(bodyBytes) + log.Print(bodyString) + assert.Equal(t, http.StatusConflict, resp.StatusCode) +} + func TestServer_GetSubscription(t *testing.T) { // Get Just Created Subscription req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, "subscriptions", ObjSub.ID), nil) @@ -293,58 +334,17 @@ func TestServer_ListPublishers(t *testing.T) { } func TestServer_GetCurrentState_withSubscription(t *testing.T) { - // create subscription - sub := api.NewPubSub( - &types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}}, - resource, "1.0") - - data, err := json.Marshal(&sub) - assert.Nil(t, err) - assert.NotNil(t, data) - /// create new subscription ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "subscriptions"), bytes.NewBuffer(data)) + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, ObjSub.Resource, "CurrentState"), nil) assert.Nil(t, err) req.Header.Set("Content-Type", "application/json") resp, err := server.HTTPClient.Do(req) assert.Nil(t, err) defer resp.Body.Close() - bodyBytes, err := io.ReadAll(resp.Body) - assert.Nil(t, err) - bodyString := string(bodyBytes) - log.Print(bodyString) - assert.Equal(t, http.StatusCreated, resp.StatusCode) - err = json.Unmarshal(bodyBytes, &ObjSub) - assert.Nil(t, err) - assert.NotEmpty(t, ObjSub.ID) - assert.NotEmpty(t, ObjSub.URILocation) - assert.NotEmpty(t, ObjSub.EndPointURI) - assert.NotEmpty(t, ObjSub.Resource) - assert.Equal(t, sub.Resource, ObjSub.Resource) - log.Infof("Subscription:\n%s", ObjSub.String()) - - // try getting event - time.Sleep(2 * time.Second) - req, err = http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, ObjSub.Resource, "CurrentState"), nil) - assert.Nil(t, err) - req.Header.Set("Content-Type", "application/json") - resp, err = server.HTTPClient.Do(req) - assert.Nil(t, err) - defer resp.Body.Close() s, err2 := io.ReadAll(resp.Body) assert.Nil(t, err2) log.Infof("tedt %s ", string(s)) assert.Equal(t, http.StatusOK, resp.StatusCode) - - // Delete All Subscriptions - req, err = http.NewRequestWithContext(ctx, "DELETE", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "subscriptions"), nil) - assert.Nil(t, err) - req.Header.Set("Content-Type", "application/json") - resp, err = server.HTTPClient.Do(req) - assert.Nil(t, err) - defer resp.Body.Close() } func TestServer_GetCurrentState_withoutSubscription(t *testing.T) { @@ -377,6 +377,37 @@ func TestServer_TestPingStatusStatusCode(t *testing.T) { } func TestServer_DeleteSubscription(t *testing.T) { + clientIDs := server.GetSubscriberAPI().GetClientIDByResource(ObjSub.Resource) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "DELETE", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, "subscriptions", ObjSub.ID), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + assert.Equal(t, http.StatusNoContent, resp.StatusCode) + defer resp.Body.Close() + // clean up files on disk + for _, clientID := range clientIDs { + os.Remove(fmt.Sprintf("%s.json", clientID)) + } +} + +func TestServer_DeleteSubscriptionNotFound(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "DELETE", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, "subscriptions", ObjSub.ID), nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.HTTPClient.Do(req) + assert.Nil(t, err) + assert.Equal(t, http.StatusNotFound, resp.StatusCode) + defer resp.Body.Close() +} + +func TestServer_DeleteAllSubscriptions(t *testing.T) { // Delete All Subscriptions ctx := context.Background() ctx, cancel := context.WithCancel(ctx) @@ -410,7 +441,7 @@ func TestServer_GetNonExistingPublisher(t *testing.T) { resp, err := server.HTTPClient.Do(req) assert.Nil(t, err) defer resp.Body.Close() - assert.Equal(t, resp.StatusCode, http.StatusBadRequest) + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) assert.Nil(t, err) } @@ -529,7 +560,7 @@ func Test_MultiplePost(t *testing.T) { }, }, } - data.SetVersion("v1") //nolint:errcheck + data.SetVersion("1.0") //nolint:errcheck cneEvent.SetData(data) for i := 0; i < 5; i++ { go publishEvent(cneEvent) @@ -538,6 +569,11 @@ func Test_MultiplePost(t *testing.T) { } func TestServer_End(*testing.T) { + for _, clientID := range server.GetSubscriberAPI().GetClientIDByResource(ObjSub.Resource) { + os.Remove(fmt.Sprintf("%s.json", clientID)) + } + os.Remove("pub.json") + os.Remove("sub.json") close(eventOutCh) close(closeCh) } diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go b/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go index 228b5a2..a5dd91a 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go @@ -40,11 +40,11 @@ func (e *Event) NewCloudEvent(ps *pubsub.PubSub) (*cloudevent.Event, error) { } // NewCloudEvent create new cloud event from cloud native events and pubsub -func (e *Event) NewCloudEventV2(ps *pubsub.PubSub) (*cloudevent.Event, error) { +func (e *Event) NewCloudEventV2() (*cloudevent.Event, error) { ce := cloudevent.NewEvent(cloudevent.VersionV1) ce.SetTime(e.GetTime()) ce.SetType(e.Type) - ce.SetSource(ps.Resource) // bus address + ce.SetSource(e.Source) ce.SetSpecVersion(cloudevent.VersionV1) ce.SetID(uuid.New().String()) if err := ce.SetData("", e.GetData()); err != nil { diff --git a/vendor/modules.txt b/vendor/modules.txt index e9a7b74..c846c50 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -71,7 +71,7 @@ github.com/prometheus/common/model github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612 +# github.com/redhat-cne/sdk-go v1.0.1-0.20240716153735-19a18fd38ee6 ## explicit; go 1.22 github.com/redhat-cne/sdk-go/pkg/channel github.com/redhat-cne/sdk-go/pkg/common