Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set Event Source according to O-RAN Spec #72

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.0
github.com/prometheus/client_golang v1.14.0
github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612
github.com/redhat-cne/sdk-go v1.0.1-0.20240716153735-19a18fd38ee6
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.8.0
golang.org/x/net v0.7.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI
github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo=
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612 h1:TnnP33wqdtZ4GCp8WYHVFVywWxrcGonc0ijGCpfqTdU=
github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612/go.mod h1:q9LxxPbK1tGpDbQm/KIPujqdP0bK1hhuHrIXV3vuUrM=
github.com/redhat-cne/sdk-go v1.0.1-0.20240716153735-19a18fd38ee6 h1:TYW1M1qUC51tN5nIy3cRwNFDn/M7E81jbtIthzyTwQ8=
github.com/redhat-cne/sdk-go v1.0.1-0.20240716153735-19a18fd38ee6/go.mod h1:q9LxxPbK1tGpDbQm/KIPujqdP0bK1hhuHrIXV3vuUrM=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
Expand Down
2 changes: 2 additions & 0 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ func Test_MultiplePost(t *testing.T) {
}

func TestServer_End(*testing.T) {
os.Remove("pub.json")
os.Remove("sub.json")
close(eventOutCh)
close(closeCh)
}
Expand Down
9 changes: 4 additions & 5 deletions v2/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
respondWithStatusCode(w, http.StatusNotFound, "event not found")
localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1)
log.Error("event not found")
return
}

restClient := restclient.New()
Expand Down Expand Up @@ -400,7 +401,7 @@ func (s *Server) publishEvent(w http.ResponseWriter, r *http.Request) {
respondWithError(w, fmt.Sprintf("no publisher data for id %s found to publish event for", cneEvent.ID))
return
}
ceEvent, err := cneEvent.NewCloudEventV2(&pub)
ceEvent, err := cneEvent.NewCloudEventV2()
if err != nil {
localmetrics.UpdateEventPublishedCount(pub.Resource, localmetrics.FAIL, 1)
respondWithError(w, err.Error())
Expand Down Expand Up @@ -467,7 +468,6 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) {
}

e, _ := out.CreateCloudEvents(CURRENTSTATE)
e.SetSource(resourceAddress)
// statusReceiveOverrideFn must return value for
if s.statusReceiveOverrideFn != nil {
if statusErr := s.statusReceiveOverrideFn(*e, &out); statusErr != nil {
Expand Down Expand Up @@ -501,10 +501,9 @@ func (s *Server) pingForSubscribedEventStatus(w http.ResponseWriter, r *http.Req
cneEvent.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time)
cneEvent.SetDataContentType(cloudevents.ApplicationJSON)
cneEvent.SetData(cne.Data{

Version: "v1",
Version: cne.APISchemaVersion,
})
ceEvent, err := cneEvent.NewCloudEventV2(&sub)
ceEvent, err := cneEvent.NewCloudEventV2()

if err != nil {
respondWithError(w, err.Error())
Expand Down
5 changes: 5 additions & 0 deletions v2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,3 +388,8 @@ func (s *Server) Shutdown() {
func (s *Server) SetOnStatusReceiveOverrideFn(fn func(e cloudevents.Event, dataChan *channel.DataChan) error) {
s.statusReceiveOverrideFn = fn
}

// GetSubscriberAPI ...
func (s *Server) GetSubscriberAPI() *subscriberApi.API {
return s.subscriberAPI
}
136 changes: 86 additions & 50 deletions v2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,40 @@ var (
eventOutCh chan *channel.DataChan
closeCh chan struct{}
wg sync.WaitGroup
port = 8989
port = 8990
apHost = "localhost"
apPath = "/routes/cne/v1/"
apPath = "/api/ocloudNotifications/v2/"
resource = "test/test"
resourceNoneSubscribed = "test/nonesubscribed"
storePath = "."
ObjSub pubsub.PubSub
ObjPub pubsub.PubSub
)

func onReceiveOverrideFn(_ cloudevents.Event, d *channel.DataChan) error {
data := &event.Data{
Version: event.APISchemaVersion,
Values: []event.DataValue{},
}
ce := cloudevents.NewEvent(cloudevents.VersionV1)
ce.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time)
ce.SetType("dummyType")
ce.SetSource("dummySource")
ce.SetSpecVersion(cloudevents.VersionV1)
ce.SetID(uuid.New().String())
ce.SetData("", *data) //nolint:errcheck
d.Data = &ce

return nil
}

func init() {
eventOutCh = make(chan *channel.DataChan, 10)
closeCh = make(chan struct{})
}

func TestMain(m *testing.M) {
server = restapi.InitServer(port, apHost, apPath, storePath, eventOutCh, closeCh, nil)
server = restapi.InitServer(port, apHost, apPath, storePath, eventOutCh, closeCh, onReceiveOverrideFn)
//start http server
server.Start()

Expand All @@ -89,7 +106,7 @@ func TestMain(m *testing.M) {
},
},
}
data.SetVersion("v1") //nolint:errcheck
data.SetVersion("1.0") //nolint:errcheck
cneEvent.SetData(data)
e := cloudevents.Event{
Context: cloudevents.EventContextV1{
Expand Down Expand Up @@ -154,8 +171,7 @@ func TestServer_CreateSubscription(t *testing.T) {
// create subscription
sub := api.NewPubSub(
&types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}},
resource, "1.0")

resource, "2.0")
data, err := json.Marshal(&sub)
assert.Nil(t, err)
assert.NotNil(t, data)
Expand Down Expand Up @@ -184,6 +200,31 @@ func TestServer_CreateSubscription(t *testing.T) {
log.Infof("Subscription:\n%s", ObjSub.String())
}

func TestServer_CreateSubscriptionConflict(t *testing.T) {
// create subscription
sub := api.NewPubSub(
&types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}},
resource, "2.0")
data, err := json.Marshal(&sub)
assert.Nil(t, err)
assert.NotNil(t, data)
/// create new subscription
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "subscriptions"), bytes.NewBuffer(data))
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
assert.Nil(t, err)
bodyString := string(bodyBytes)
log.Print(bodyString)
assert.Equal(t, http.StatusConflict, resp.StatusCode)
}

func TestServer_GetSubscription(t *testing.T) {
// Get Just Created Subscription
req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, "subscriptions", ObjSub.ID), nil)
Expand Down Expand Up @@ -293,58 +334,17 @@ func TestServer_ListPublishers(t *testing.T) {
}

func TestServer_GetCurrentState_withSubscription(t *testing.T) {
// create subscription
sub := api.NewPubSub(
&types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}},
resource, "1.0")

data, err := json.Marshal(&sub)
assert.Nil(t, err)
assert.NotNil(t, data)
/// create new subscription
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "subscriptions"), bytes.NewBuffer(data))
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, ObjSub.Resource, "CurrentState"), nil)
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
assert.Nil(t, err)
bodyString := string(bodyBytes)
log.Print(bodyString)
assert.Equal(t, http.StatusCreated, resp.StatusCode)
err = json.Unmarshal(bodyBytes, &ObjSub)
assert.Nil(t, err)
assert.NotEmpty(t, ObjSub.ID)
assert.NotEmpty(t, ObjSub.URILocation)
assert.NotEmpty(t, ObjSub.EndPointURI)
assert.NotEmpty(t, ObjSub.Resource)
assert.Equal(t, sub.Resource, ObjSub.Resource)
log.Infof("Subscription:\n%s", ObjSub.String())

// try getting event
time.Sleep(2 * time.Second)
req, err = http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, ObjSub.Resource, "CurrentState"), nil)
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err = server.HTTPClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()
s, err2 := io.ReadAll(resp.Body)
assert.Nil(t, err2)
log.Infof("tedt %s ", string(s))
assert.Equal(t, http.StatusOK, resp.StatusCode)

// Delete All Subscriptions
req, err = http.NewRequestWithContext(ctx, "DELETE", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "subscriptions"), nil)
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err = server.HTTPClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()
}

func TestServer_GetCurrentState_withoutSubscription(t *testing.T) {
Expand Down Expand Up @@ -377,6 +377,37 @@ func TestServer_TestPingStatusStatusCode(t *testing.T) {
}

func TestServer_DeleteSubscription(t *testing.T) {
clientIDs := server.GetSubscriberAPI().GetClientIDByResource(ObjSub.Resource)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "DELETE", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, "subscriptions", ObjSub.ID), nil)
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
assert.Equal(t, http.StatusNoContent, resp.StatusCode)
defer resp.Body.Close()
// clean up files on disk
for _, clientID := range clientIDs {
os.Remove(fmt.Sprintf("%s.json", clientID))
}
}

func TestServer_DeleteSubscriptionNotFound(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "DELETE", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, "subscriptions", ObjSub.ID), nil)
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
defer resp.Body.Close()
}

func TestServer_DeleteAllSubscriptions(t *testing.T) {
// Delete All Subscriptions
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -410,7 +441,7 @@ func TestServer_GetNonExistingPublisher(t *testing.T) {
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()
assert.Equal(t, resp.StatusCode, http.StatusBadRequest)
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
assert.Nil(t, err)
}

Expand Down Expand Up @@ -529,7 +560,7 @@ func Test_MultiplePost(t *testing.T) {
},
},
}
data.SetVersion("v1") //nolint:errcheck
data.SetVersion("1.0") //nolint:errcheck
cneEvent.SetData(data)
for i := 0; i < 5; i++ {
go publishEvent(cneEvent)
Expand All @@ -538,6 +569,11 @@ func Test_MultiplePost(t *testing.T) {
}

func TestServer_End(*testing.T) {
for _, clientID := range server.GetSubscriberAPI().GetClientIDByResource(ObjSub.Resource) {
os.Remove(fmt.Sprintf("%s.json", clientID))
}
os.Remove("pub.json")
os.Remove("sub.json")
close(eventOutCh)
close(closeCh)
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ github.com/prometheus/common/model
github.com/prometheus/procfs
github.com/prometheus/procfs/internal/fs
github.com/prometheus/procfs/internal/util
# github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612
# github.com/redhat-cne/sdk-go v1.0.1-0.20240716153735-19a18fd38ee6
## explicit; go 1.22
github.com/redhat-cne/sdk-go/pkg/channel
github.com/redhat-cne/sdk-go/pkg/common
Expand Down