Skip to content

Commit

Permalink
Update gnmi-native to support subscribe poll mode
Browse files Browse the repository at this point in the history
  • Loading branch information
ganglyu committed Jul 8, 2024
1 parent 7801415 commit 32ba317
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 8 deletions.
3 changes: 3 additions & 0 deletions gnmi_server/client_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) {

if origin == "openconfig" {
dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions, sdc.TranslWildcardOption{})
} else if IsNativeOrigin(origin) {
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, gnmipb.Encoding_JSON_IETF, "")
} else if len(origin) != 0 {
return grpc.Errorf(codes.Unimplemented, "Unsupported origin: %s", origin)
} else if target == "" {
Expand All @@ -177,6 +179,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) {
/* For any other target or no target create new Transl Client. */
dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions, sdc.TranslWildcardOption{})
}
defer dc.Close()

if err != nil {
return grpc.Errorf(codes.NotFound, "%v", err)
Expand Down
33 changes: 31 additions & 2 deletions patches/gnmi_cli.all.patch
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,15 @@
)

func init() {
@@ -278,6 +283,22 @@
@@ -93,6 +98,7 @@
flag.StringVar(&cfg.DisplayIndent, "display_indent", " ", "Output line, per nesting-level indent.")
flag.StringVar(&cfg.DisplayType, "display_type", "group", "Display output type (g, group, s, single, p, proto).")
flag.StringVar(&q.Target, "target", "", "Name of the gNMI target.")
+ flag.StringVar(&q.Origin, "origin", "", "Name of the gNMI origin.")
flag.DurationVar(&q.Timeout, "timeout", 30*time.Second, "Terminate query if no RPC is established within the timeout duration.")
flag.StringVar(&cfg.Timestamp, "timestamp", "", "Specify timestamp formatting in output. One of (<empty string>, on, raw, <FORMAT>) where <empty string> is disabled, on is human readable, raw is int64 nanos since epoch, and <FORMAT> is according to golang time.Format(<FORMAT>)")
flag.BoolVar(&cfg.DisplaySize, "display_size", false, "Display the total size of query response.")
@@ -278,6 +284,22 @@
if len(*queryFlag) == 0 {
return errors.New("--query must be set")
}
Expand All @@ -48,7 +56,16 @@
if err != nil {
--- ./github.com/openconfig/gnmi/client/query.go 2019-11-22 14:03:29.839103602 -0800
+++ ./github.com/openconfig/gnmi/client/query.go 2019-10-11 13:48:49.226145599 -0700
@@ -172,6 +172,10 @@
@@ -131,6 +131,8 @@
// clients will only handle the first element.
Addrs []string
AddressChains [][]string
+ // Origin is the target of the query.
+ Origin string
// Target is the target of the query. Maybe empty if the query is performed
// against an end target vs. a collector.
Target string
@@ -172,6 +174,10 @@
// SubReq is an optional field. If not nil, gnmi client implementation uses
// it rather than generating from client.Query while sending gnmi Subscribe RPC.
SubReq *gpb.SubscribeRequest
Expand All @@ -70,3 +87,15 @@
default:
return nil, fmt.Errorf("non-scalar type %+v", tv.Value)
}

--- ./github.com/openconfig/gnmi/client/gnmi/client.go 2019-11-22 14:03:29.847103498 -0800
+++ ./github.com/openconfig/gnmi/client/gnmi/client.go 2019-10-11 13:48:49.234145530 -0700
@@ -246,7 +246,7 @@
s := &gpb.SubscribeRequest_Subscribe{
Subscribe: &gpb.SubscriptionList{
Mode: getType(q.Type),
- Prefix: &gpb.Path{Target: q.Target},
+ Prefix: &gpb.Path{Target: q.Target, Origin: q.Origin},
},
}
if q.UpdatesOnly {
44 changes: 43 additions & 1 deletion sonic_data_client/mixed_db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1525,7 +1525,49 @@ func (c *MixedDbClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, w *s
}

func (c *MixedDbClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) {
return
c.w = w
defer c.w.Done()
c.q = q
c.channel = poll

for {
_, more := <-c.channel
if !more {
log.V(1).Infof("%v poll channel closed, exiting pollDb routine", c)
return
}
t1 := time.Now()
for _, gnmiPath := range c.paths {
tblPaths, err := c.getDbtablePath(gnmiPath, nil)
if err != nil {
log.V(2).Infof("Unable to get table path due to err: %v", err)
return
}
val, err := c.tableData2TypedValue(tblPaths, nil)
if err != nil {
log.V(2).Infof("Unable to create gnmi TypedValue due to err: %v", err)
return
}

spbv := &spb.Value{
Prefix: c.prefix,
Path: gnmiPath,
Timestamp: time.Now().UnixNano(),
SyncResponse: false,
Val: val,
}
c.q.Put(Value{spbv})
log.V(6).Infof("Added spbv #%v", spbv)
}

c.q.Put(Value{
&spb.Value{
Timestamp: time.Now().UnixNano(),
SyncResponse: true,
},
})
log.V(4).Infof("Sync done, poll time taken: %v ms", int64(time.Since(t1)/time.Millisecond))
}
}

func (c *MixedDbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) {
Expand Down
18 changes: 17 additions & 1 deletion test/test_gnmi_configdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import json
import time
from utils import gnmi_set, gnmi_get, gnmi_dump
from utils import gnmi_set, gnmi_get, gnmi_dump, gnmi_subscribe_poll

import pytest

Expand Down Expand Up @@ -412,3 +412,19 @@ def test_gnmi_replace_invalid_01(self):
ret, msg = gnmi_set([], [], update_list)
assert ret != 0, "Failed to detect invalid replace path"
assert "Invalid elem length" in msg, msg

def test_gnmi_poll_01(self):
path = "/CONFIG_DB/localhost/DEVICE_METADATA"
cnt = 3
interval = 1
ret, msg = gnmi_subscribe_poll(path, interval, cnt, timeout=0)
assert ret == 0, 'Fail to subscribe: ' + msg
assert msg.count("DEVICE_METADATA") == cnt, 'Invalid result: ' + msg

def test_gnmi_poll_invalid_01(self):
path = "/CONFIG_DB/localhost/INVALID_TABLE"
cnt = 3
interval = 1
ret, msg = gnmi_subscribe_poll(path, interval, cnt, timeout=10)
assert ret == 0, 'Fail to subscribe: ' + msg
assert "rpc error" in msg, 'Invalid result: ' + msg
19 changes: 15 additions & 4 deletions test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
def run_cmd(cmd):
res = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
res.wait()
if res.returncode:
msg = str(res.stderr.read(), encoding='utf-8')
else:
msg = str(res.stdout.read(), encoding='utf-8')
msg = str(res.stderr.read(), encoding='utf-8') + str(res.stdout.read(), encoding='utf-8')
return res.returncode, msg

def gnmi_set(delete_list, update_list, replace_list):
Expand Down Expand Up @@ -157,6 +154,20 @@ def gnmi_capabilities():
ret, msg = run_cmd(cmd)
return ret, msg

def gnmi_subscribe_poll(gnmi_path, interval, count, timeout):
path = os.getcwd()
cmd = path + '/build/bin/gnmi_cli '
cmd += '-client_types=gnmi -a 127.0.0.1:8080 -logtostderr -insecure '
# Use sonic-db as default origin
cmd += '-origin=sonic-db '
if timeout:
cmd += '-streaming_timeout=10 '
cmd += '-query_type=polling '
cmd += '-polling_interval %us -count %u ' % (interval, count)
cmd += '-q %s' % (gnmi_path)
ret, msg = run_cmd(cmd)
return ret, msg

def gnmi_dump(name):
path = os.getcwd()
cmd = 'sudo ' + path + '/build/bin/gnmi_dump'
Expand Down

0 comments on commit 32ba317

Please sign in to comment.