Skip to content

Commit

Permalink
Merge pull request #495 from nats-io/pdp/repair-sub-count
Browse files Browse the repository at this point in the history
Honor sub count limits for raw/dump
  • Loading branch information
ripienaar committed Jun 10, 2022
2 parents 1608c05 + 2fe2ee3 commit a02afd5
Showing 1 changed file with 30 additions and 27 deletions.
57 changes: 30 additions & 27 deletions cli/sub_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ func (c *subCmd) subscribe(_ *fisk.ParseContext) error {

ctr++
if dump {
// Output format 1/3: dumping, to stdout or files

stdout := c.dump == "-"
outFile := ""
if !stdout {
Expand All @@ -167,10 +169,7 @@ func (c *subCmd) subscribe(_ *fisk.ParseContext) error {
jm, err := json.Marshal(msg)
if err != nil {
log.Printf("Could not JSON encode message: %s", err)
return
}

if stdout {
} else if stdout {
os.Stdout.WriteString(fmt.Sprintf("%s\000", jm))
} else {
err = os.WriteFile(outFile, jm, 0600)
Expand All @@ -183,40 +182,44 @@ func (c *subCmd) subscribe(_ *fisk.ParseContext) error {
}
}

return
} else if c.raw {
// Output format 2/3: raw

fmt.Println(string(m.Data))
return
}

if info == nil {
if m.Reply != "" {
fmt.Printf("[#%d] Received on %q with reply %q\n", ctr, m.Subject, m.Reply)
} else {
fmt.Printf("[#%d] Received on %q\n", ctr, m.Subject)
}
} else if jetStream {
fmt.Printf("[#%d] Received JetStream message: stream: %s seq %d / subject: %s / time: %v\n", ctr, info.Stream(), info.StreamSequence(), m.Subject, info.TimeStamp().Format(time.RFC3339))
} else {
fmt.Printf("[#%d] Received JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d\n", ctr, info.Stream(), info.Consumer(), m.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence())
}
// Output format 3/3: pretty

if len(m.Header) > 0 {
for h, vals := range m.Header {
for _, val := range vals {
fmt.Printf("%s: %s\n", h, val)
if info == nil {
if m.Reply != "" {
fmt.Printf("[#%d] Received on %q with reply %q\n", ctr, m.Subject, m.Reply)
} else {
fmt.Printf("[#%d] Received on %q\n", ctr, m.Subject)
}
} else if jetStream {
fmt.Printf("[#%d] Received JetStream message: stream: %s seq %d / subject: %s / time: %v\n", ctr, info.Stream(), info.StreamSequence(), m.Subject, info.TimeStamp().Format(time.RFC3339))
} else {
fmt.Printf("[#%d] Received JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d\n", ctr, info.Stream(), info.Consumer(), m.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence())
}

fmt.Println()
}
if len(m.Header) > 0 {
for h, vals := range m.Header {
for _, val := range vals {
fmt.Printf("%s: %s\n", h, val)
}
}

if !c.headersOnly {
fmt.Println(string(m.Data))
if !strings.HasSuffix(string(m.Data), "\n") {
fmt.Println()
}
}

if !c.headersOnly {
fmt.Println(string(m.Data))
if !strings.HasSuffix(string(m.Data), "\n") {
fmt.Println()
}
}

} // output format type dispatch

if ctr == c.limit {
sub.Unsubscribe()
Expand Down

0 comments on commit a02afd5

Please sign in to comment.