Skip to content

Commit

Permalink
Merge branch 'file_stat_gnoi' of https://github.com/isabelmsft/sonic-…
Browse files Browse the repository at this point in the history
…gnmi into file_stat_gnoi
  • Loading branch information
isabelmsft committed Jul 30, 2024
2 parents 14bf34a + 8944bb1 commit 328e631
Show file tree
Hide file tree
Showing 11 changed files with 1,073 additions and 48 deletions.
3 changes: 3 additions & 0 deletions gnmi_server/client_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) {

if origin == "openconfig" {
dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions, sdc.TranslWildcardOption{})
} else if IsNativeOrigin(origin) {
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, gnmipb.Encoding_JSON_IETF, "")
} else if len(origin) != 0 {
return grpc.Errorf(codes.Unimplemented, "Unsupported origin: %s", origin)
} else if target == "" {
Expand All @@ -177,6 +179,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) {
/* For any other target or no target create new Transl Client. */
dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions, sdc.TranslWildcardOption{})
}
defer dc.Close()

if err != nil {
return grpc.Errorf(codes.NotFound, "%v", err)
Expand Down
14 changes: 12 additions & 2 deletions gnmi_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,22 @@ func (srv *Server) Serve() error {
return srv.s.Serve(srv.lis)
}

func (srv *Server) ForceStop() {
s := srv.s
if s == nil {
log.Errorf("ForceStop() failed: not initialized")
return
}
s.Stop()
}

func (srv *Server) Stop() {
s := srv.s
if s == nil {
log.Errorf("Stop() failed: not initialized")
return
}
s.Stop()
s.GracefulStop()
}

// Address returns the port the Server is listening to.
Expand Down Expand Up @@ -553,9 +562,10 @@ func (s *Server) Set(ctx context.Context, req *gnmipb.SetRequest) (*gnmipb.SetRe
err = dc.Set(req.GetDelete(), req.GetReplace(), req.GetUpdate())
if err != nil {
common_utils.IncCounter(common_utils.GNMI_SET_FAIL)
} else {
s.SaveStartupConfig()
}

s.SaveStartupConfig()
return &gnmipb.SetResponse{
Prefix: req.GetPrefix(),
Response: results,
Expand Down
26 changes: 19 additions & 7 deletions gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3419,7 +3419,7 @@ func TestClientConnections(t *testing.T) {
func TestConnectionDataSet(t *testing.T) {
s := createServer(t, 8081)
go runServer(t, s)
defer s.Stop()
defer s.ForceStop()

tests := []struct {
desc string
Expand Down Expand Up @@ -3491,16 +3491,16 @@ func TestConnectionsKeepAlive(t *testing.T) {
defer s.Stop()

tests := []struct {
desc string
q client.Query
want []client.Notification
poll int
desc string
q client.Query
want []client.Notification
poll int
}{
{
desc: "Testing KeepAlive with goroutine count",
poll: 3,
q: client.Query{
Target: "COUNTERS_DB",
Target: "COUNTERS_DB",
Type: client.Poll,
Queries: []client.Path{{"COUNTERS", "Ethernet*"}},
TLS: &tls.Config{InsecureSkipVerify: true},
Expand All @@ -3511,12 +3511,14 @@ func TestConnectionsKeepAlive(t *testing.T) {
},
},
}
for _, tt := range tests {
for _, tt := range(tests) {
var clients []*cacheclient.CacheClient
for i := 0; i < 5; i++ {
t.Run(tt.desc, func(t *testing.T) {
q := tt.q
q.Addrs = []string{"127.0.0.1:8081"}
c := client.New()
clients = append(clients, c)
wg := new(sync.WaitGroup)
wg.Add(1)

Expand All @@ -3538,6 +3540,9 @@ func TestConnectionsKeepAlive(t *testing.T) {
}
})
}
for _, cacheClient := range(clients) {
cacheClient.Close()
}
}
}

Expand Down Expand Up @@ -3941,6 +3946,13 @@ func TestNilServerStop(t *testing.T) {
s.Stop()
}

func TestNilServerForceStop(t *testing.T) {
// Create a server with nil grpc server, such that s.ForceStop is called with nil value
t.Log("Expecting s.ForceStop to log error as server is nil")
s := &Server{}
s.ForceStop()
}

func TestInvalidServer(t *testing.T) {
s := createInvalidServer(t, 9000)
if s != nil {
Expand Down
33 changes: 31 additions & 2 deletions patches/gnmi_cli.all.patch
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,15 @@
)

func init() {
@@ -278,6 +283,22 @@
@@ -93,6 +98,7 @@
flag.StringVar(&cfg.DisplayIndent, "display_indent", " ", "Output line, per nesting-level indent.")
flag.StringVar(&cfg.DisplayType, "display_type", "group", "Display output type (g, group, s, single, p, proto).")
flag.StringVar(&q.Target, "target", "", "Name of the gNMI target.")
+ flag.StringVar(&q.Origin, "origin", "", "Name of the gNMI origin.")
flag.DurationVar(&q.Timeout, "timeout", 30*time.Second, "Terminate query if no RPC is established within the timeout duration.")
flag.StringVar(&cfg.Timestamp, "timestamp", "", "Specify timestamp formatting in output. One of (<empty string>, on, raw, <FORMAT>) where <empty string> is disabled, on is human readable, raw is int64 nanos since epoch, and <FORMAT> is according to golang time.Format(<FORMAT>)")
flag.BoolVar(&cfg.DisplaySize, "display_size", false, "Display the total size of query response.")
@@ -278,6 +284,22 @@
if len(*queryFlag) == 0 {
return errors.New("--query must be set")
}
Expand All @@ -48,7 +56,16 @@
if err != nil {
--- ./github.com/openconfig/gnmi/client/query.go 2019-11-22 14:03:29.839103602 -0800
+++ ./github.com/openconfig/gnmi/client/query.go 2019-10-11 13:48:49.226145599 -0700
@@ -172,6 +172,10 @@
@@ -131,6 +131,8 @@
// clients will only handle the first element.
Addrs []string
AddressChains [][]string
+ // Origin is the target of the query.
+ Origin string
// Target is the target of the query. Maybe empty if the query is performed
// against an end target vs. a collector.
Target string
@@ -172,6 +174,10 @@
// SubReq is an optional field. If not nil, gnmi client implementation uses
// it rather than generating from client.Query while sending gnmi Subscribe RPC.
SubReq *gpb.SubscribeRequest
Expand All @@ -70,3 +87,15 @@
default:
return nil, fmt.Errorf("non-scalar type %+v", tv.Value)
}

--- ./github.com/openconfig/gnmi/client/gnmi/client.go 2019-11-22 14:03:29.847103498 -0800
+++ ./github.com/openconfig/gnmi/client/gnmi/client.go 2019-10-11 13:48:49.234145530 -0700
@@ -246,7 +246,7 @@
s := &gpb.SubscribeRequest_Subscribe{
Subscribe: &gpb.SubscriptionList{
Mode: getType(q.Type),
- Prefix: &gpb.Path{Target: q.Target},
+ Prefix: &gpb.Path{Target: q.Target, Origin: q.Origin},
},
}
if q.UpdatesOnly {
123 changes: 122 additions & 1 deletion sonic_data_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"encoding/json"
"fmt"

"github.com/Workiva/go-datastructures/queue"
"github.com/agiledragon/gomonkey/v2"
"github.com/jipanyang/gnxi/utils/xpath"
"github.com/sonic-net/sonic-gnmi/swsscommon"
"github.com/sonic-net/sonic-gnmi/test_utils"
Expand Down Expand Up @@ -445,6 +447,94 @@ func TestParseDatabase(t *testing.T) {
}
}

func TestSubscribeInternal(t *testing.T) {
// Test StreamRun
{
pq := queue.NewPriorityQueue(1, false)
w := sync.WaitGroup{}
stop := make(chan struct{}, 1)
client := MixedDbClient {}
req := gnmipb.SubscriptionList{
Subscription: nil,
}
path, _ := xpath.ToGNMIPath("/abc/dummy")
client.paths = append(client.paths, path)
client.dbkey = swsscommon.NewSonicDBKey()
defer swsscommon.DeleteSonicDBKey(client.dbkey)
RedisDbMap = nil
stop <- struct{}{}
w.Add(1)
client.StreamRun(pq, stop, &w, &req)
}

// Test streamSampleSubscription
{
pq := queue.NewPriorityQueue(1, false)
w := sync.WaitGroup{}
client := MixedDbClient {}
sub := gnmipb.Subscription{
SampleInterval: 1000,
}
client.q = pq
client.w = &w
client.w.Add(1)
client.synced.Add(1)
client.streamSampleSubscription(&sub, false)
}

// Test streamSampleSubscription
{
pq := queue.NewPriorityQueue(1, false)
w := sync.WaitGroup{}
client := MixedDbClient {}
path, _ := xpath.ToGNMIPath("/abc/dummy")
sub := gnmipb.Subscription{
SampleInterval: 1000000000,
Path: path,
}
RedisDbMap = nil
client.q = pq
client.w = &w
client.w.Add(1)
client.synced.Add(1)
client.dbkey = swsscommon.NewSonicDBKey()
defer swsscommon.DeleteSonicDBKey(client.dbkey)
client.streamSampleSubscription(&sub, false)
}

// Test dbFieldSubscribe
{
pq := queue.NewPriorityQueue(1, false)
w := sync.WaitGroup{}
client := MixedDbClient {}
path, _ := xpath.ToGNMIPath("/abc/dummy")
RedisDbMap = nil
client.q = pq
client.w = &w
client.w.Add(1)
client.synced.Add(1)
client.dbkey = swsscommon.NewSonicDBKey()
defer swsscommon.DeleteSonicDBKey(client.dbkey)
client.dbFieldSubscribe(path, true, time.Second)
}

// Test dbTableKeySubscribe
{
pq := queue.NewPriorityQueue(1, false)
w := sync.WaitGroup{}
client := MixedDbClient {}
path, _ := xpath.ToGNMIPath("/abc/dummy")
RedisDbMap = nil
client.q = pq
client.w = &w
client.w.Add(1)
client.synced.Add(1)
client.dbkey = swsscommon.NewSonicDBKey()
defer swsscommon.DeleteSonicDBKey(client.dbkey)
client.dbTableKeySubscribe(path, time.Second, true)
}
}

func mockGetFunc() ([]byte, error) {
return nil, errors.New("mock error")
}
Expand Down Expand Up @@ -557,7 +647,7 @@ func TestRetryHelper(t *testing.T) {
exeCount++
if returnError {
returnError = false
return fmt.Errorf("connection_reset")
return fmt.Errorf("zmq connection break, endpoint: tcp://127.0.0.1:2234")
}
return nil
})
Expand All @@ -574,6 +664,37 @@ func TestRetryHelper(t *testing.T) {
swsscommon.DeleteZmqServer(zmqServer)
}

func TestRetryHelperReconnect(t *testing.T) {
// create ZMQ server
zmqServer := swsscommon.NewZmqServer("tcp://*:2234")

// when config table is empty, will authorize with PopulateAuthStruct
zmqClientRemoved := false
mockremoveZmqClient := gomonkey.ApplyFunc(removeZmqClient, func(zmqClient swsscommon.ZmqClient) (error) {
zmqClientRemoved = true
return nil
})
defer mockremoveZmqClient.Reset()

// create ZMQ client side
zmqAddress := "tcp://127.0.0.1:2234"
zmqClient := swsscommon.NewZmqClient(zmqAddress)
exeCount := 0
RetryHelper(
zmqClient,
func () (err error) {
exeCount++
return fmt.Errorf("zmq connection break, endpoint: tcp://127.0.0.1:2234")
})

if !zmqClientRemoved {
t.Errorf("RetryHelper does not remove ZMQ client for reconnect")
}

swsscommon.DeleteZmqClient(zmqClient)
swsscommon.DeleteZmqServer(zmqServer)
}

func TestGetDpuAddress(t *testing.T) {
// prepare data according to design doc
// Design doc: https://github.com/sonic-net/SONiC/blob/master/doc/smart-switch/ip-address-assigment/smart-switch-ip-address-assignment.md?plain=1
Expand Down
Loading

0 comments on commit 328e631

Please sign in to comment.