Skip to content

Commit

Permalink
Set Event Source according to O-RAN Spec
Browse files Browse the repository at this point in the history
Signed-off-by: Jack Ding <jackding@gmail.com>
  • Loading branch information
jzding committed Jul 16, 2024
1 parent eecb75b commit 3561d54
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 61 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.0
github.com/prometheus/client_golang v1.14.0
github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612
github.com/redhat-cne/sdk-go v1.0.1-0.20240716153735-19a18fd38ee6
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.8.0
golang.org/x/net v0.7.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI
github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo=
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612 h1:TnnP33wqdtZ4GCp8WYHVFVywWxrcGonc0ijGCpfqTdU=
github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612/go.mod h1:q9LxxPbK1tGpDbQm/KIPujqdP0bK1hhuHrIXV3vuUrM=
github.com/redhat-cne/sdk-go v1.0.1-0.20240716153735-19a18fd38ee6 h1:TYW1M1qUC51tN5nIy3cRwNFDn/M7E81jbtIthzyTwQ8=
github.com/redhat-cne/sdk-go v1.0.1-0.20240716153735-19a18fd38ee6/go.mod h1:q9LxxPbK1tGpDbQm/KIPujqdP0bK1hhuHrIXV3vuUrM=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
Expand Down
2 changes: 2 additions & 0 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ func Test_MultiplePost(t *testing.T) {
}

func TestServer_End(*testing.T) {
os.Remove("pub.json")
os.Remove("sub.json")
close(eventOutCh)
close(closeCh)
}
Expand Down
9 changes: 4 additions & 5 deletions v2/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
respondWithStatusCode(w, http.StatusNotFound, "event not found")
localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1)
log.Error("event not found")
return
}

restClient := restclient.New()
Expand Down Expand Up @@ -400,7 +401,7 @@ func (s *Server) publishEvent(w http.ResponseWriter, r *http.Request) {
respondWithError(w, fmt.Sprintf("no publisher data for id %s found to publish event for", cneEvent.ID))
return
}
ceEvent, err := cneEvent.NewCloudEventV2(&pub)
ceEvent, err := cneEvent.NewCloudEventV2()
if err != nil {
localmetrics.UpdateEventPublishedCount(pub.Resource, localmetrics.FAIL, 1)
respondWithError(w, err.Error())
Expand Down Expand Up @@ -467,7 +468,6 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) {
}

e, _ := out.CreateCloudEvents(CURRENTSTATE)
e.SetSource(resourceAddress)
// statusReceiveOverrideFn must return value for
if s.statusReceiveOverrideFn != nil {
if statusErr := s.statusReceiveOverrideFn(*e, &out); statusErr != nil {
Expand Down Expand Up @@ -501,10 +501,9 @@ func (s *Server) pingForSubscribedEventStatus(w http.ResponseWriter, r *http.Req
cneEvent.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time)
cneEvent.SetDataContentType(cloudevents.ApplicationJSON)
cneEvent.SetData(cne.Data{

Version: "v1",
Version: cne.APISchemaVersion,
})
ceEvent, err := cneEvent.NewCloudEventV2(&sub)
ceEvent, err := cneEvent.NewCloudEventV2()

if err != nil {
respondWithError(w, err.Error())
Expand Down
5 changes: 5 additions & 0 deletions v2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,3 +388,8 @@ func (s *Server) Shutdown() {
func (s *Server) SetOnStatusReceiveOverrideFn(fn func(e cloudevents.Event, dataChan *channel.DataChan) error) {
s.statusReceiveOverrideFn = fn
}

// GetSubscriberAPI ...
func (s *Server) GetSubscriberAPI() *subscriberApi.API {
return s.subscriberAPI
}
136 changes: 86 additions & 50 deletions v2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,40 @@ var (
eventOutCh chan *channel.DataChan
closeCh chan struct{}
wg sync.WaitGroup
port = 8989
port = 8990
apHost = "localhost"
apPath = "/routes/cne/v1/"
apPath = "/api/ocloudNotifications/v2/"
resource = "test/test"
resourceNoneSubscribed = "test/nonesubscribed"
storePath = "."
ObjSub pubsub.PubSub
ObjPub pubsub.PubSub
)

func onReceiveOverrideFn(_ cloudevents.Event, d *channel.DataChan) error {
data := &event.Data{
Version: event.APISchemaVersion,
Values: []event.DataValue{},
}
ce := cloudevents.NewEvent(cloudevents.VersionV1)
ce.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time)
ce.SetType("dummyType")
ce.SetSource("dummySource")
ce.SetSpecVersion(cloudevents.VersionV1)
ce.SetID(uuid.New().String())
ce.SetData("", *data) //nolint:errcheck
d.Data = &ce

return nil
}

func init() {
eventOutCh = make(chan *channel.DataChan, 10)
closeCh = make(chan struct{})
}

func TestMain(m *testing.M) {
server = restapi.InitServer(port, apHost, apPath, storePath, eventOutCh, closeCh, nil)
server = restapi.InitServer(port, apHost, apPath, storePath, eventOutCh, closeCh, onReceiveOverrideFn)
//start http server
server.Start()

Expand All @@ -89,7 +106,7 @@ func TestMain(m *testing.M) {
},
},
}
data.SetVersion("v1") //nolint:errcheck
data.SetVersion("1.0") //nolint:errcheck
cneEvent.SetData(data)
e := cloudevents.Event{
Context: cloudevents.EventContextV1{
Expand Down Expand Up @@ -154,8 +171,7 @@ func TestServer_CreateSubscription(t *testing.T) {
// create subscription
sub := api.NewPubSub(
&types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}},
resource, "1.0")

resource, "2.0")
data, err := json.Marshal(&sub)
assert.Nil(t, err)
assert.NotNil(t, data)
Expand Down Expand Up @@ -184,6 +200,31 @@ func TestServer_CreateSubscription(t *testing.T) {
log.Infof("Subscription:\n%s", ObjSub.String())
}

func TestServer_CreateSubscriptionConflict(t *testing.T) {
// create subscription
sub := api.NewPubSub(
&types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}},
resource, "2.0")
data, err := json.Marshal(&sub)
assert.Nil(t, err)
assert.NotNil(t, data)
/// create new subscription
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "subscriptions"), bytes.NewBuffer(data))
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
assert.Nil(t, err)
bodyString := string(bodyBytes)
log.Print(bodyString)
assert.Equal(t, http.StatusConflict, resp.StatusCode)
}

func TestServer_GetSubscription(t *testing.T) {
// Get Just Created Subscription
req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, "subscriptions", ObjSub.ID), nil)
Expand Down Expand Up @@ -293,58 +334,17 @@ func TestServer_ListPublishers(t *testing.T) {
}

func TestServer_GetCurrentState_withSubscription(t *testing.T) {
// create subscription
sub := api.NewPubSub(
&types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}},
resource, "1.0")

data, err := json.Marshal(&sub)
assert.Nil(t, err)
assert.NotNil(t, data)
/// create new subscription
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "subscriptions"), bytes.NewBuffer(data))
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, ObjSub.Resource, "CurrentState"), nil)
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
assert.Nil(t, err)
bodyString := string(bodyBytes)
log.Print(bodyString)
assert.Equal(t, http.StatusCreated, resp.StatusCode)
err = json.Unmarshal(bodyBytes, &ObjSub)
assert.Nil(t, err)
assert.NotEmpty(t, ObjSub.ID)
assert.NotEmpty(t, ObjSub.URILocation)
assert.NotEmpty(t, ObjSub.EndPointURI)
assert.NotEmpty(t, ObjSub.Resource)
assert.Equal(t, sub.Resource, ObjSub.Resource)
log.Infof("Subscription:\n%s", ObjSub.String())

// try getting event
time.Sleep(2 * time.Second)
req, err = http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, ObjSub.Resource, "CurrentState"), nil)
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err = server.HTTPClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()
s, err2 := io.ReadAll(resp.Body)
assert.Nil(t, err2)
log.Infof("tedt %s ", string(s))
assert.Equal(t, http.StatusOK, resp.StatusCode)

// Delete All Subscriptions
req, err = http.NewRequestWithContext(ctx, "DELETE", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "subscriptions"), nil)
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err = server.HTTPClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()
}

func TestServer_GetCurrentState_withoutSubscription(t *testing.T) {
Expand Down Expand Up @@ -377,6 +377,37 @@ func TestServer_TestPingStatusStatusCode(t *testing.T) {
}

func TestServer_DeleteSubscription(t *testing.T) {
clientIDs := server.GetSubscriberAPI().GetClientIDByResource(ObjSub.Resource)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "DELETE", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, "subscriptions", ObjSub.ID), nil)
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
assert.Equal(t, http.StatusNoContent, resp.StatusCode)
defer resp.Body.Close()
// clean up files on disk
for _, clientID := range clientIDs {
os.Remove(fmt.Sprintf("%s.json", clientID))
}
}

func TestServer_DeleteSubscriptionNotFound(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "DELETE", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, "subscriptions", ObjSub.ID), nil)
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
defer resp.Body.Close()
}

func TestServer_DeleteAllSubscriptions(t *testing.T) {
// Delete All Subscriptions
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -410,7 +441,7 @@ func TestServer_GetNonExistingPublisher(t *testing.T) {
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()
assert.Equal(t, resp.StatusCode, http.StatusBadRequest)
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
assert.Nil(t, err)
}

Expand Down Expand Up @@ -529,7 +560,7 @@ func Test_MultiplePost(t *testing.T) {
},
},
}
data.SetVersion("v1") //nolint:errcheck
data.SetVersion("1.0") //nolint:errcheck
cneEvent.SetData(data)
for i := 0; i < 5; i++ {
go publishEvent(cneEvent)
Expand All @@ -538,6 +569,11 @@ func Test_MultiplePost(t *testing.T) {
}

func TestServer_End(*testing.T) {
for _, clientID := range server.GetSubscriberAPI().GetClientIDByResource(ObjSub.Resource) {
os.Remove(fmt.Sprintf("%s.json", clientID))
}
os.Remove("pub.json")
os.Remove("sub.json")
close(eventOutCh)
close(closeCh)
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ github.com/prometheus/common/model
github.com/prometheus/procfs
github.com/prometheus/procfs/internal/fs
github.com/prometheus/procfs/internal/util
# github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612
# github.com/redhat-cne/sdk-go v1.0.1-0.20240716153735-19a18fd38ee6
## explicit; go 1.22
github.com/redhat-cne/sdk-go/pkg/channel
github.com/redhat-cne/sdk-go/pkg/common
Expand Down

0 comments on commit 3561d54

Please sign in to comment.