Skip to content

Commit

Permalink
Modified to return data byte by byte.
Browse files Browse the repository at this point in the history
This is a specific hack intended for termshark and isn't really suitable for a
pull request. I want to be able to monitor a pcap file for changes and send
those changes to stdout without necessarily waiting for a newline. This is
inefficient because I send the data byte by byte, and could certaainly be
improved. But since tail has figured out EOF issues, I decided it's easier to
start here than write a binary tail program myself.
  • Loading branch information
gcla committed May 5, 2019
1 parent a1dbeea commit fb58b0d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 59 deletions.
3 changes: 1 addition & 2 deletions cmd/gotail/gotail.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func args2config() (tail.Config, int64) {
if config.ReOpen {
config.Follow = true
}
config.MaxLineSize = maxlinesize
return config, n
}

Expand Down Expand Up @@ -56,7 +55,7 @@ func tailFile(filename string, config tail.Config, done chan bool) {
fmt.Println(err)
return
}
for line := range t.Lines {
for line := range t.Bytes {
fmt.Println(line.Text)
}
err = t.Wait()
Expand Down
76 changes: 19 additions & 57 deletions tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"io/ioutil"
"log"
"os"
"strings"
"sync"
"time"

Expand All @@ -25,17 +24,12 @@ var (
ErrStop = errors.New("tail should now stop")
)

type Line struct {
Text string
type Chunk struct {
Text []byte
Time time.Time
Err error // Error from tail
}

// NewLine returns a Line with present time.
func NewLine(text string) *Line {
return &Line{text, time.Now(), nil}
}

// SeekInfo represents arguments to `os.Seek`
type SeekInfo struct {
Offset int64
Expand Down Expand Up @@ -65,8 +59,7 @@ type Config struct {
RateLimiter *ratelimiter.LeakyBucket

// Generic IO
Follow bool // Continue looking for new lines (tail -f)
MaxLineSize int // If non-zero, split longer lines into multiple lines
Follow bool // Continue looking for new lines (tail -f)

// Logger, when nil, is set to tail.DefaultLogger
// To disable logging: set field to tail.DiscardingLogger
Expand All @@ -75,7 +68,7 @@ type Config struct {

type Tail struct {
Filename string
Lines chan *Line
Bytes chan *Chunk
Config

file *os.File
Expand Down Expand Up @@ -107,7 +100,7 @@ func TailFile(filename string, config Config) (*Tail, error) {

t := &Tail{
Filename: filename,
Lines: make(chan *Line),
Bytes: make(chan *Chunk),
Config: config,
}

Expand Down Expand Up @@ -173,7 +166,7 @@ func (tail *Tail) StopAtEOF() error {
var errStopAtEOF = errors.New("tail: stop at eof")

func (tail *Tail) close() {
close(tail.Lines)
close(tail.Bytes)
tail.closeFile()
}

Expand Down Expand Up @@ -207,20 +200,11 @@ func (tail *Tail) reopen() error {
return nil
}

func (tail *Tail) readLine() (string, error) {
func (tail *Tail) readLine() ([]byte, error) {
tail.lk.Lock()
line, err := tail.reader.ReadString('\n')
tail.lk.Unlock()
if err != nil {
// Note ReadString "returns the data read before the error" in
// case of an error, including EOF, so we return it as is. The
// caller is expected to process it if err is EOF.
return line, err
}

line = strings.TrimRight(line, "\n")

return line, err
defer tail.lk.Unlock()
b, err := tail.reader.ReadByte()
return []byte{b}, err
}

func (tail *Tail) tailFileSync() {
Expand Down Expand Up @@ -269,13 +253,13 @@ func (tail *Tail) tailFileSync() {

// Process `line` even if err is EOF.
if err == nil {
cooloff := !tail.sendLine(line)
cooloff := !tail.sendChunk(line)
if cooloff {
// Wait a second before seeking till the end of
// file when rate limit is reached.
msg := ("Too much log activity; waiting a second " +
"before resuming tailing")
tail.Lines <- &Line{msg, time.Now(), errors.New(msg)}
tail.Bytes <- &Chunk{[]byte(msg), time.Now(), errors.New(msg)}
select {
case <-time.After(time.Second):
case <-tail.Dying():
Expand All @@ -288,13 +272,13 @@ func (tail *Tail) tailFileSync() {
}
} else if err == io.EOF {
if !tail.Follow {
if line != "" {
tail.sendLine(line)
if len(line) > 0 {
tail.sendChunk(line)
}
return
}

if tail.Follow && line != "" {
if tail.Follow && len(line) > 0 {
// this has the potential to never return the last line if
// it's not followed by a newline; seems a fair trade here
err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0})
Expand Down Expand Up @@ -380,12 +364,7 @@ func (tail *Tail) waitForChanges() error {
}

func (tail *Tail) openReader() {
if tail.MaxLineSize > 0 {
// add 2 to account for newline characters
tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize+2)
} else {
tail.reader = bufio.NewReader(tail.file)
}
tail.reader = bufio.NewReader(tail.file)
}

func (tail *Tail) seekEnd() error {
Expand All @@ -402,29 +381,12 @@ func (tail *Tail) seekTo(pos SeekInfo) error {
return nil
}

// sendLine sends the line(s) to Lines channel, splitting longer lines
// sendChunk sends the line(s) to Lines channel, splitting longer lines
// if necessary. Return false if rate limit is reached.
func (tail *Tail) sendLine(line string) bool {
func (tail *Tail) sendChunk(line []byte) bool {
now := time.Now()
lines := []string{line}

// Split longer lines
if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
lines = util.PartitionString(line, tail.MaxLineSize)
}

for _, line := range lines {
tail.Lines <- &Line{line, now, nil}
}

if tail.Config.RateLimiter != nil {
ok := tail.Config.RateLimiter.Pour(uint16(len(lines)))
if !ok {
tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.\n",
tail.Filename)
return false
}
}
tail.Bytes <- &Chunk{line, now, nil}

return true
}
Expand Down

0 comments on commit fb58b0d

Please sign in to comment.