diff --git a/README.md b/README.md index 7729b5b5..6dc08886 100644 --- a/README.md +++ b/README.md @@ -281,6 +281,67 @@ newyork: ☀️ +2°C Now the `nats` CLI parses the subject, extracts the `{london,newyork}` from the subjects and calls `curl`, replacing `{{2}}` with the body of the 2nd subject token - `{london,newyork}`. +## Translating message data using a converter command + +Additional to the raw output of messages using `nats sub` and `nats stream view` you can also translate the message data by running it through a command. + +The command receives the message data as raw bytes through stdin and the output of the command will be the shown output for the message. There is the additional possibility to add the filter subject by using `{{Subject}}` as part of the arguments for the tranlation command. + +#### Examples for using the translation feature: + +Here we use the [jq](https://github.com/stedolan/jq) tool to format our json message paylot into a more readable format: + +We subscribe to a subject that will receive json data. +``` +nats sub --translate 'jq .' cli.json +``` +Now we publish some example data. +``` +nats pub cli.json '{"task":"demo","duration":60}' +``` + +The Output will show the message formatted. +``` +23:54:35 Subscribing on cli.json +[#1] Received on "cli.json" +{ + "task": "demo", + "duration": 60 +} +``` + +Another example is creating hex dumps from any message to avoid terminal corruption. + +By changing the subscription into: + +``` +go run ./nats/ sub --translate 'xxd' cli.json +``` + +We will get the following output for the same published msg: +``` +00:02:56 Subscribing on cli.json +[#1] Received on "cli.json" +00000000: 7b22 7461 736b 223a 2264 656d 6f22 2c22 {"task":"demo"," +00000010: 6475 7261 7469 6f6e 223a 3630 7d duration":60} +``` + +#### Examples for using the translation feature with template: + +A somewhat artificial example using the subject as argument would be: +``` +nats sub --translate "sed 's/\(.*\)/{{Subject}}: \1/'" cli.json +``` + +Output +``` +00:22:19 Subscribing on cli.json +[#1] Received on "cli.json" +cli.json: {"task":"demo","duration":60} +``` + +The translation feature makes it possible to write specialized or universal translators to aid in debugging messages in streams or core nats. + ## Benchmarking and Latency Testing Benchmarking and latency testing is key requirement for evaluating the production preparedness of your NATS network. diff --git a/cli/filter_data_through_cmd.go b/cli/filter_data_through_cmd.go new file mode 100644 index 00000000..9a038630 --- /dev/null +++ b/cli/filter_data_through_cmd.go @@ -0,0 +1,62 @@ +package cli + +import ( + "bytes" + "fmt" + "github.com/google/shlex" + "os/exec" + "strings" + "text/template" +) + +func outPutMSGBody(data []byte, filter, subject, stream string) { + if len(data) == 0 { + fmt.Println("nil body") + return + } + + data, err := filterDataThroughCmd(data, filter, subject, stream) + if err != nil { + // using q here so raw binary data will be escaped + fmt.Printf("%q\nError while translating msg body: %s\n\n", data, err.Error()) + return + } + output := string(data) + fmt.Println(output) + if !strings.HasSuffix(output, "\n") { + fmt.Println() + } +} + +func filterDataThroughCmd(data []byte, filter, subject, stream string) ([]byte, error) { + if filter == "" { + return data, nil + } + funcMap := template.FuncMap{ + "Subject": func() string { return subject }, + "Stream": func() string { return stream }, + } + + tmpl, err := template.New("translate").Funcs(funcMap).Parse(filter) + if err != nil { + return nil, err + } + var builder strings.Builder + err = tmpl.Execute(&builder, nil) + if err != nil { + return nil, err + } + + parts, err := shlex.Split(builder.String()) + if err != nil { + return nil, fmt.Errorf("the filter command line could not be parsed: %w", err) + } + cmd := parts[0] + args := parts[1:] + + runner := exec.Command(cmd, args...) + // pass the message as string to stdin + runner.Stdin = bytes.NewReader(data) + // maybe we want to do something on error? + return runner.CombinedOutput() +} diff --git a/cli/stream_command.go b/cli/stream_command.go index 4cebe1f6..bdc8d6c5 100644 --- a/cli/stream_command.go +++ b/cli/stream_command.go @@ -134,6 +134,7 @@ type streamCmd struct { vwStartDelta time.Duration vwPageSize int vwRaw bool + vwTranslate string vwSubject string dryRun bool @@ -300,6 +301,7 @@ func configureStreamCommand(app commandHost) { strView.Flag("id", "Start at a specific message Sequence").IntVar(&c.vwStartId) strView.Flag("since", "Delivers messages received since a duration like 1d3h5m2s").DurationVar(&c.vwStartDelta) strView.Flag("raw", "Show the raw data received").UnNegatableBoolVar(&c.vwRaw) + strView.Flag("translate", "Translate the message data by running it through the given command before output").StringVar(&c.vwTranslate) strView.Flag("subject", "Filter the stream using a subject").StringVar(&c.vwSubject) strGet := str.Command("get", "Retrieves a specific message from a Stream").Action(c.getAction) @@ -307,6 +309,7 @@ func configureStreamCommand(app commandHost) { strGet.Arg("id", "Message Sequence to retrieve").Int64Var(&c.msgID) strGet.Flag("last-for", "Retrieves the message for a specific subject").Short('S').PlaceHolder("SUBJECT").StringVar(&c.filterSubject) strGet.Flag("json", "Produce JSON output").Short('j').UnNegatableBoolVar(&c.json) + strGet.Flag("translate", "Translate the message data by running it through the given command before output").StringVar(&c.vwTranslate) strBackup := str.Command("backup", "Creates a backup of a Stream over the NATS network").Alias("snapshot").Action(c.backupAction) strBackup.Arg("stream", "Stream to backup").Required().StringVar(&c.stream) @@ -702,15 +705,7 @@ func (c *streamCmd) viewAction(_ *fisk.ParseContext) error { } fmt.Println() - if len(msg.Data) == 0 { - fmt.Println("nil body") - } else { - fmt.Println(string(msg.Data)) - if !strings.HasSuffix(string(msg.Data), "\n") { - fmt.Println() - } - } - + outPutMSGBody(msg.Data, c.vwTranslate, msg.Subject, meta.Stream()) } if last { @@ -2705,9 +2700,7 @@ func (c *streamCmd) getAction(_ *fisk.ParseContext) (err error) { } fmt.Println() } - - fmt.Println(string(item.Data)) - fmt.Println() + outPutMSGBody(item.Data, c.vwTranslate, item.Subject, c.stream) return nil } diff --git a/cli/sub_command.go b/cli/sub_command.go index 8aebe9a8..e21a02ad 100644 --- a/cli/sub_command.go +++ b/cli/sub_command.go @@ -38,6 +38,7 @@ type subCmd struct { queue string durable string raw bool + translate string jsAck bool inbox bool match bool @@ -67,6 +68,7 @@ func configureSubCommand(app commandHost) { act.Flag("queue", "Subscribe to a named queue group").StringVar(&c.queue) act.Flag("durable", "Use a durable consumer (requires JetStream)").StringVar(&c.durable) act.Flag("raw", "Show the raw data received").Short('r').UnNegatableBoolVar(&c.raw) + act.Flag("translate", "Translate the message data by running it through the given command before output").StringVar(&c.translate) act.Flag("ack", "Acknowledge JetStream message that have the correct metadata").BoolVar(&c.jsAck) act.Flag("match-replies", "Match replies to requests").UnNegatableBoolVar(&c.match) act.Flag("inbox", "Subscribes to a generate inbox").Short('i').UnNegatableBoolVar(&c.inbox) @@ -499,7 +501,7 @@ func printMsg(c *subCmd, msg *nats.Msg, reply *nats.Msg, ctr uint) { fmt.Printf("[#%d] Received JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d\n", ctr, info.Stream(), info.Consumer(), msg.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence()) } - prettyPrintMsg(msg, c.headersOnly) + prettyPrintMsg(msg, c.headersOnly, c.translate) if reply != nil { if info == nil { @@ -510,7 +512,7 @@ func printMsg(c *subCmd, msg *nats.Msg, reply *nats.Msg, ctr uint) { fmt.Printf("[#%d] Matched reply JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d\n", ctr, info.Stream(), info.Consumer(), reply.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence()) } - prettyPrintMsg(reply, c.headersOnly) + prettyPrintMsg(reply, c.headersOnly, c.translate) } @@ -541,7 +543,7 @@ func dumpMsg(msg *nats.Msg, stdout bool, filepath string, ctr uint) { } } -func prettyPrintMsg(msg *nats.Msg, headersOnly bool) { +func prettyPrintMsg(msg *nats.Msg, headersOnly bool, filter string) { if len(msg.Header) > 0 { for h, vals := range msg.Header { for _, val := range vals { @@ -553,9 +555,6 @@ func prettyPrintMsg(msg *nats.Msg, headersOnly bool) { } if !headersOnly { - fmt.Println(string(msg.Data)) - if !strings.HasSuffix(string(msg.Data), "\n") { - fmt.Println() - } + outPutMSGBody(msg.Data, filter, msg.Subject, "") } } diff --git a/dependencies.md b/dependencies.md index 31ac75e6..4c112d49 100644 --- a/dependencies.md +++ b/dependencies.md @@ -26,6 +26,7 @@ This file lists the dependencies used in this repository. | github.com/nats-io/nats.go | Apache License 2.0 | | github.com/nats-io/nkeys | Apache License 2.0 | | github.com/nats-io/nuid | Apache License 2.0 | +| github.com/google/shlex | Apache License 2.0 | | github.com/tylertreat/hdrhistogram-writer | Apache License 2.0 | | github.com/xeipuuv/gojsonpointer | Apache License 2.0 | | github.com/xeipuuv/gojsonreference | Apache License 2.0 | diff --git a/go.mod b/go.mod index 9bd85b0c..776e7447 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/fatih/color v1.15.0 github.com/ghodss/yaml v1.0.0 github.com/google/go-cmp v0.5.9 + github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/gosuri/uiprogress v0.0.1 github.com/guptarohit/asciigraph v0.5.5 github.com/jedib0t/go-pretty/v6 v6.4.6 @@ -33,8 +34,8 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/golang/protobuf v1.5.3 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/golang/protobuf v1.5.2 // indirect github.com/gosuri/uilive v0.0.4 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect diff --git a/go.sum b/go.sum index d78c1d42..648ebb5d 100644 --- a/go.sum +++ b/go.sum @@ -10,8 +10,8 @@ github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDe github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/choria-io/fisk v0.4.0 h1:V+mh3OpcmE0uBcihUA18dRJeL1J8YUCCLbZZQRa7GMY= github.com/choria-io/fisk v0.4.0/go.mod h1:3Rc9XxqKC4y9wBf2GfQ4ovJ1VKELAWcU0J33M/Zgjvs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -34,12 +34,14 @@ github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGw github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= +github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/gosuri/uilive v0.0.4 h1:hUEBpQDj8D8jXgtCdBu7sWsy5sbW/5GhuO8KBwJ2jyY= github.com/gosuri/uilive v0.0.4/go.mod h1:V/epo5LjjlDE5RJUcqx8dbw+zc93y5Ya3yg8tfZ74VI= github.com/gosuri/uiprogress v0.0.1 h1:0kpv/XY/qTmFWl/SkaJykZXrBBzwwadmW8fRb7RJSxw=