Skip to content

Commit

Permalink
O-RAN V3 Rest Api: Status Notification
Browse files Browse the repository at this point in the history
Signed-off-by: Jack Ding <jackding@gmail.com>
  • Loading branch information
jzding committed Jul 12, 2024
1 parent da39815 commit b1356a1
Show file tree
Hide file tree
Showing 18 changed files with 123 additions and 86 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ require (
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.0
github.com/prometheus/client_golang v1.14.0
github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9
github.com/redhat-cne/sdk-go v1.0.1-unpublished
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.8.0
golang.org/x/net v0.7.0
)

replace github.com/redhat-cne/sdk-go v1.0.1-unpublished => ../sdk-go

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI
github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo=
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9 h1:qDOGSHOtHRszd8FnM0GZVUvbIvHhZrw5GeccXYPwT04=
github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9/go.mod h1:q9LxxPbK1tGpDbQm/KIPujqdP0bK1hhuHrIXV3vuUrM=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
Expand Down
2 changes: 1 addition & 1 deletion server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestMain(m *testing.M) {
Subject: func(s string) *string { return &s }("topic"),
}.AsV1(),
}
_ = e.SetData(cloudevents.ApplicationJSON, cneEvent)
_ = e.SetData("", cneEvent)
func() {
defer func() {
if err := recover(); err != nil {
Expand Down
67 changes: 33 additions & 34 deletions v2/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
return
}
sub := pubsub.PubSub{}
sub.SetVersion(API_VERSION)
if err = json.Unmarshal(bodyBytes, &sub); err != nil {
respondWithStatusCode(w, http.StatusBadRequest, fmt.Sprintf("marshalling error %v", err))
localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1)
Expand All @@ -67,26 +68,18 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1)
return
}
if subExists, ok := s.pubSubAPI.HasSubscription(sub.GetResource()); ok {
clientIDs := s.subscriberAPI.GetClientIDByResource(sub.GetResource())
if len(clientIDs) != 0 {
respondWithStatusCode(w, http.StatusConflict,
fmt.Sprintf("subscription (id: %s) with same resource already exists, skipping creation",
subExists.GetID()))
fmt.Sprintf("subscription (clientID: %s) with same resource already exists, skipping creation",
clientIDs[0]))
return
}

id := uuid.New().String()
sub.SetID(id)
sub.SetVersion(API_VERSION)
sub.SetURILocation(fmt.Sprintf("http://localhost:%d%s%s/%s", s.port, s.apiPath, "subscriptions", sub.ID)) //nolint:errcheck

// TODO: cleanup: local pubsub is no longer needed since we are using configMap
newSub, err := s.pubSubAPI.CreateSubscription(sub)
if err != nil {
respondWithStatusCode(w, http.StatusNotFound, fmt.Sprintf("error creating subscription %v", err))
localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1)
return
}
addr := newSub.GetResource()
sub.SetURILocation(fmt.Sprintf("http://%s:%d%s%s/%s", s.apiHost, s.port, s.apiPath, "subscriptions", sub.ID)) //nolint:errcheck
addr := sub.GetResource()

// this is placeholder not sending back to report
out := channel.DataChan{
Expand Down Expand Up @@ -119,7 +112,8 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
}

restClient := restclient.New()
out.Data.SetID(newSub.ID) // set ID to the subscriptionID
// make sure event ID is unique
out.Data.SetID(uuid.New().String())
status, err := restClient.PostCloudEvent(sub.EndPointURI, *out.Data)
if err != nil {
respondWithStatusCode(w, http.StatusBadRequest,
Expand All @@ -139,7 +133,7 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
subs := subscriber.New(s.getClientIDFromURI(endPointURI))
_ = subs.SetEndPointURI(endPointURI)

subs.AddSubscription(newSub)
subs.AddSubscription(sub)
subs.Action = channel.NEW
cevent, _ := subs.CreateCloudEvents()
cevent.SetSource(addr)
Expand All @@ -162,10 +156,10 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1)
} else {
out.Status = channel.SUCCESS
_ = out.Data.SetData(cloudevents.ApplicationJSON, updatedObj)
_ = out.Data.SetData("", updatedObj)
log.Infof("subscription created successfully.")
localmetrics.UpdateSubscriptionCount(localmetrics.ACTIVE, 1)
respondWithJSON(w, http.StatusCreated, newSub)
respondWithJSON(w, http.StatusCreated, sub)
}

s.dataOut <- &out
Expand Down Expand Up @@ -256,12 +250,15 @@ func (s *Server) getSubscriptionByID(w http.ResponseWriter, r *http.Request) {
respondWithStatusCode(w, http.StatusNotFound, "")
return
}
sub, err := s.pubSubAPI.GetSubscription(subscriptionID)
if err != nil {
respondWithStatusCode(w, http.StatusNotFound, "")
return

for _, c := range s.subscriberAPI.GetClientIDBySubID(subscriptionID) {
sub, err := s.subscriberAPI.GetSubscription(c, subscriptionID)
if err == nil {
respondWithJSON(w, http.StatusOK, sub)
return
}
}
respondWithJSON(w, http.StatusOK, sub)
respondWithStatusCode(w, http.StatusNotFound, "")
}

func (s *Server) getPublisherByID(w http.ResponseWriter, r *http.Request) {
Expand All @@ -278,9 +275,11 @@ func (s *Server) getPublisherByID(w http.ResponseWriter, r *http.Request) {
}
respondWithJSON(w, http.StatusOK, pub)
}

func (s *Server) getSubscriptions(w http.ResponseWriter, _ *http.Request) {
b, err := s.pubSubAPI.GetSubscriptionsFromFile()
b, err := s.subscriberAPI.GetSubscriptions()
if err != nil {
log.Errorf("error loading subscriber data %v", err)
respondWithError(w, "error loading subscriber data")
return
}
Expand Down Expand Up @@ -322,21 +321,21 @@ func (s *Server) deleteSubscription(w http.ResponseWriter, r *http.Request) {
return
}

if err := s.pubSubAPI.DeleteSubscription(subscriptionID); err != nil {
localmetrics.UpdateSubscriptionCount(localmetrics.FAILDELETE, 1)
respondWithStatusCode(w, http.StatusNotFound, err.Error())
clientIDs := s.subscriberAPI.GetClientIDBySubID(subscriptionID)
if len(clientIDs) == 0 {
respondWithStatusCode(w, http.StatusNotFound, "")
return
}

// update configMap
for _, c := range s.subscriberAPI.GetClientIDBySubID(subscriptionID) {
for _, c := range clientIDs {
if err := s.subscriberAPI.DeleteSubscription(c, subscriptionID); err != nil {
localmetrics.UpdateSubscriptionCount(localmetrics.FAILDELETE, 1)
respondWithStatusCode(w, http.StatusNotFound, err.Error())
return
}
}

// update configMap
for _, subs := range s.subscriberAPI.SubscriberStore.Store {
cevent, _ := subs.CreateCloudEvents()
out := channel.DataChan{
Expand Down Expand Up @@ -441,12 +440,12 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) {
}
}
} else {
respondWithError(w, "subscription not found")
respondWithStatusCode(w, http.StatusNotFound, "subscription not found")
return
}

if sub == nil {
respondWithError(w, "subscription not found")
respondWithStatusCode(w, http.StatusNotFound, "subscription not found")
return
}

Expand All @@ -471,14 +470,14 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) {
// statusReceiveOverrideFn must return value for
if s.statusReceiveOverrideFn != nil {
if statusErr := s.statusReceiveOverrideFn(*e, &out); statusErr != nil {
respondWithError(w, statusErr.Error())
respondWithStatusCode(w, http.StatusNotFound, statusErr.Error())
} else if out.Data != nil {
respondWithJSON(w, http.StatusOK, *out.Data)
} else {
respondWithError(w, "event not found")
respondWithStatusCode(w, http.StatusNotFound, "event not found")
}
} else {
respondWithError(w, "onReceive function not defined")
respondWithStatusCode(w, http.StatusNotFound, "onReceive function not defined")
}
}

Expand Down
4 changes: 3 additions & 1 deletion v2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const (
// Server defines rest routes server object
type Server struct {
port int
apiHost string
apiPath string
//use dataOut chanel to write to configMap
dataOut chan<- *channel.DataChan
Expand Down Expand Up @@ -138,12 +139,13 @@ type swaggReqAccepted struct { //nolint:deadcode,unused
}

// InitServer is used to supply configurations for rest routes server
func InitServer(port int, apiPath, storePath string,
func InitServer(port int, apiHost, apiPath, storePath string,
dataOut chan<- *channel.DataChan, closeCh <-chan struct{},
onStatusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error) *Server {
once.Do(func() {
ServerInstance = &Server{
port: port,
apiHost: apiHost,
apiPath: apiPath,
dataOut: dataOut,
closeCh: closeCh,
Expand Down
11 changes: 6 additions & 5 deletions v2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
types2 "github.com/cloudevents/sdk-go/v2/types"
"github.com/google/uuid"

restapi "github.com/redhat-cne/rest-api"
restapi "github.com/redhat-cne/rest-api/v2"
"github.com/redhat-cne/sdk-go/pkg/channel"
"github.com/redhat-cne/sdk-go/pkg/event"
"github.com/redhat-cne/sdk-go/pkg/event/ptp"
Expand All @@ -50,6 +50,7 @@ var (
closeCh chan struct{}
wg sync.WaitGroup
port = 8989
apHost = "localhost"
apPath = "/routes/cne/v1/"
resource = "test/test"
resourceNoneSubscribed = "test/nonesubscribed"
Expand All @@ -64,7 +65,7 @@ func init() {
}

func TestMain(m *testing.M) {
server = restapi.InitServer(port, apPath, storePath, eventOutCh, closeCh)
server = restapi.InitServer(port, apHost, apPath, storePath, eventOutCh, closeCh, nil)
//start http server
server.Start()

Expand Down Expand Up @@ -100,7 +101,7 @@ func TestMain(m *testing.M) {
Subject: func(s string) *string { return &s }("topic"),
}.AsV1(),
}
_ = e.SetData(cloudevents.ApplicationJSON, cneEvent)
_ = e.SetData("", cneEvent)
func() {
defer func() {
if err := recover(); err != nil {
Expand Down Expand Up @@ -362,7 +363,7 @@ func TestServer_GetCurrentState_withoutSubscription(t *testing.T) {
s, err2 := io.ReadAll(resp.Body)
assert.Nil(t, err2)
log.Infof("tedt %s ", string(s))
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}

func TestServer_TestPingStatusStatusCode(t *testing.T) {
Expand Down Expand Up @@ -424,7 +425,7 @@ func TestServer_GetNonExistingSubscription(t *testing.T) {
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}

func TestServer_TestDummyStatusCode(t *testing.T) {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b1356a1

Please sign in to comment.