Skip to content

Commit

Permalink
Adding the possibility to translate the msg.Data before output.
Browse files Browse the repository at this point in the history
  • Loading branch information
oderwat committed Mar 18, 2023
1 parent bcc57a2 commit fcbc1b5
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 10 deletions.
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,67 @@ newyork: ☀️ +2°C
Now the `nats` CLI parses the subject, extracts the `{london,newyork}` from the subjects and calls `curl`, replacing
`{{2}}` with the body of the 2nd subject token - `{london,newyork}`.

## Translating message data using a converter command

Additional to the raw output of messages using `nats sub` and `nats stream view` you can also translate the message data by running it through a command.

The command receives the message data as raw bytes through stdin and the output of the command will be the shown output for the message. There is the additional possibility to add the filter subject by using `{{subject}}` as part of the arguments for the tranlation command.

#### Examples for using the translation feature:

Here we use the [jq](https://github.com/stedolan/jq) tool to format our json message paylot into a more readable format:

We subscribe to a subject that will receive json data.
```
nats sub --translate 'jq .' cli.json
```
Now we publish some example data.
```
nats pub cli.json '{"task":"demo","duration":60}'
```

The Output will show the message formatted.
```
23:54:35 Subscribing on cli.json
[#1] Received on "cli.json"
{
"task": "demo",
"duration": 60
}
```

Another example is creating hex dumps from any message to avoid terminal corruption.

By changing the subscription into:

```
go run ./nats/ sub --translate 'xxd' cli.json
```

We will get the following output for the same published msg:
```
00:02:56 Subscribing on cli.json
[#1] Received on "cli.json"
00000000: 7b22 7461 736b 223a 2264 656d 6f22 2c22 {"task":"demo","
00000010: 6475 7261 7469 6f6e 223a 3630 7d duration":60}
```

#### Examples for using the translation feature with template:

A somewhat artificial example using the subject as argument would be:
```
nats sub --translate "sed 's/\(.*\)/{{subject}}: \1/'" cli.json
```

Output
```
00:22:19 Subscribing on cli.json
[#1] Received on "cli.json"
cli.json: {"task":"demo","duration":60}
```

The translation feature makes it possible to write specialized or universal translators to aid in debugging messages in streams or core nats.

## Benchmarking and Latency Testing

Benchmarking and latency testing is key requirement for evaluating the production preparedness of your NATS network.
Expand Down
93 changes: 93 additions & 0 deletions cli/filter_data_through_cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package cli

import (
"bytes"
"errors"
"fmt"
"os/exec"
"strings"
)

func filterDataThroughCmd(data []byte, filter string, subject string) ([]byte, error) {
parts, err := parseCommandLine(filter)
if err != nil {
return nil, fmt.Errorf("the filter command line could not be parsed: %w", err)
}
cmd := parts[0]
args := parts[1:]
// maybe we want to replace some strings to give the filter something to reason about?
for idx, arg := range args {
args[idx] = strings.ReplaceAll(arg, "{{subject}}", subject)
}
runner := exec.Command(cmd, args...)
// pass the message as string to stdin
runner.Stdin = bytes.NewReader(data)
// maybe we want to do something on error?
return runner.CombinedOutput()
}

func parseCommandLine(command string) ([]string, error) {
// copied from https://stackoverflow.com/questions/34118732/parse-a-command-line-string-into-flags-and-arguments-in-golang
// could be optimized but I think it is good enough for this use case
var args []string
state := "start"
current := ""
quote := "\""
escapeNext := true
for _, c := range command {

if state == "quotes" {
if string(c) != quote {
current += string(c)
} else {
args = append(args, current)
current = ""
state = "start"
}
continue
}

if escapeNext {
current += string(c)
escapeNext = false
continue
}

if c == '\\' {
escapeNext = true
continue
}

if c == '"' || c == '\'' {
state = "quotes"
quote = string(c)
continue
}

if state == "arg" {
if c == ' ' || c == '\t' {
args = append(args, current)
current = ""
state = "start"
} else {
current += string(c)
}
continue
}

if c != ' ' && c != '\t' {
state = "arg"
current += string(c)
}
}

if state == "quotes" {
return []string{}, errors.New(fmt.Sprintf("Unclosed quote in command line: %s", command))
}

if current != "" {
args = append(args, current)
}

return args, nil
}
30 changes: 25 additions & 5 deletions cli/stream_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ type streamCmd struct {
vwStartDelta time.Duration
vwPageSize int
vwRaw bool
vwTranslate string
vwSubject string

dryRun bool
Expand Down Expand Up @@ -299,6 +300,7 @@ func configureStreamCommand(app commandHost) {
strView.Flag("id", "Start at a specific message Sequence").IntVar(&c.vwStartId)
strView.Flag("since", "Delivers messages received since a duration like 1d3h5m2s").DurationVar(&c.vwStartDelta)
strView.Flag("raw", "Show the raw data received").UnNegatableBoolVar(&c.vwRaw)
strView.Flag("translate", "Translate the message data by running it through the given command before output").StringVar(&c.vwTranslate)
strView.Flag("subject", "Filter the stream using a subject").StringVar(&c.vwSubject)

strGet := str.Command("get", "Retrieves a specific message from a Stream").Action(c.getAction)
Expand Down Expand Up @@ -704,9 +706,18 @@ func (c *streamCmd) viewAction(_ *fisk.ParseContext) error {
if len(msg.Data) == 0 {
fmt.Println("nil body")
} else {
fmt.Println(string(msg.Data))
if !strings.HasSuffix(string(msg.Data), "\n") {
fmt.Println()
data := msg.Data
var errFilter error
if c.vwTranslate != "" {
data, errFilter = filterDataThroughCmd(data, c.vwTranslate, msg.Subject)
}
if errFilter != nil {
fmt.Printf("%s\nError: %s\n\n", data, errFilter.Error())
} else {
fmt.Println(string(data))
if !strings.HasSuffix(string(data), "\n") {
fmt.Println()
}
}
}

Expand Down Expand Up @@ -2705,8 +2716,17 @@ func (c *streamCmd) getAction(_ *fisk.ParseContext) (err error) {
fmt.Println()
}

fmt.Println(string(item.Data))
fmt.Println()
data := item.Data
var errFilter error
if c.vwTranslate != "" {
data, errFilter = filterDataThroughCmd(data, c.vwTranslate, item.Subject)
}
if errFilter != nil {
fmt.Printf("%s\nError: %s\n\n", data, errFilter.Error())
} else {
fmt.Println(string(data))
fmt.Println()
}
return nil
}

Expand Down
22 changes: 17 additions & 5 deletions cli/sub_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type subCmd struct {
queue string
durable string
raw bool
translate string
jsAck bool
inbox bool
match bool
Expand Down Expand Up @@ -62,6 +63,7 @@ func configureSubCommand(app commandHost) {
act.Flag("queue", "Subscribe to a named queue group").StringVar(&c.queue)
act.Flag("durable", "Use a durable consumer (requires JetStream)").StringVar(&c.durable)
act.Flag("raw", "Show the raw data received").Short('r').UnNegatableBoolVar(&c.raw)
act.Flag("translate", "Translate the message data by running it through the given command before output").StringVar(&c.translate)
act.Flag("ack", "Acknowledge JetStream message that have the correct metadata").BoolVar(&c.jsAck)
act.Flag("match-replies", "Match replies to requests").UnNegatableBoolVar(&c.match)
act.Flag("inbox", "Subscribes to a generate inbox").Short('i').UnNegatableBoolVar(&c.inbox)
Expand Down Expand Up @@ -402,7 +404,7 @@ func printMsg(c *subCmd, msg *nats.Msg, reply *nats.Msg, ctr uint) {
fmt.Printf("[#%d] Received JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d\n", ctr, info.Stream(), info.Consumer(), msg.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence())
}

prettyPrintMsg(msg, c.headersOnly)
prettyPrintMsg(msg, c.headersOnly, c.translate)

if reply != nil {
if info == nil {
Expand All @@ -413,7 +415,7 @@ func printMsg(c *subCmd, msg *nats.Msg, reply *nats.Msg, ctr uint) {
fmt.Printf("[#%d] Matched reply JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d\n", ctr, info.Stream(), info.Consumer(), reply.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence())
}

prettyPrintMsg(reply, c.headersOnly)
prettyPrintMsg(reply, c.headersOnly, c.translate)

}

Expand Down Expand Up @@ -444,7 +446,7 @@ func dumpMsg(msg *nats.Msg, stdout bool, filepath string, ctr uint) {
}
}

func prettyPrintMsg(msg *nats.Msg, headersOnly bool) {
func prettyPrintMsg(msg *nats.Msg, headersOnly bool, filter string) {
if len(msg.Header) > 0 {
for h, vals := range msg.Header {
for _, val := range vals {
Expand All @@ -456,8 +458,18 @@ func prettyPrintMsg(msg *nats.Msg, headersOnly bool) {
}

if !headersOnly {
fmt.Println(string(msg.Data))
if !strings.HasSuffix(string(msg.Data), "\n") {
data := msg.Data
if filter != "" {
var err error
data, err = filterDataThroughCmd(msg.Data, filter, msg.Subject)
if err != nil {
fmt.Printf("%s\nError: %s\n\n", data, err.Error())
return
}
}
output := string(data)
fmt.Println(output)
if !strings.HasSuffix(output, "\n") {
fmt.Println()
}
}
Expand Down

0 comments on commit fcbc1b5

Please sign in to comment.