Skip to content

Commit

Permalink
define watching_dir for file watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Mar 5, 2024
1 parent 46d55a8 commit 9945f6a
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 63 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmatcuk/doublestar/v4 v4.0.2 // indirect
github.com/cenkalti/backoff/v3 v3.0.0 // indirect
github.com/cilium/ebpf v0.9.1 // indirect
github.com/containerd/cgroups/v3 v3.0.1 // indirect
Expand Down
5 changes: 2 additions & 3 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@ pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
paths:
include:
- '**\/*-json.log' # remove \
- '/var/lib/docker/containers/**\/*-json.log' # remove \
exclude:
- ef933707fe551f512d0b240558fdd01771f7897cccab75eb4fab0e575393ab79
- '/var/lib/docker/containers/19aa5027343f4*\/*-json.log' # remove \
offsets_file: /data/offsets.yaml
persistence_mode: async
```
Expand Down
5 changes: 2 additions & 3 deletions plugin/input/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
paths:
include:
- '**\/*-json.log' # remove \
- '/var/lib/docker/containers/**\/*-json.log' # remove \
exclude:
- ef933707fe551f512d0b240558fdd01771f7897cccab75eb4fab0e575393ab79
- '/var/lib/docker/containers/19aa5027343f4*\/*-json.log' # remove \
offsets_file: /data/offsets.yaml
persistence_mode: async
```
Expand Down
5 changes: 2 additions & 3 deletions plugin/input/file/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
paths:
include:
- '**\/*-json.log' # remove \
- '/var/lib/docker/containers/**\/*-json.log' # remove \
exclude:
- ef933707fe551f512d0b240558fdd01771f7897cccab75eb4fab0e575393ab79
- '/var/lib/docker/containers/19aa5027343f4*\/*-json.log' # remove \
offsets_file: /data/offsets.yaml
persistence_mode: async
```
Expand Down
5 changes: 2 additions & 3 deletions plugin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@ pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
paths:
include:
- '**\/*-json.log' # remove \
- '/var/lib/docker/containers/**\/*-json.log' # remove \
exclude:
- ef933707fe551f512d0b240558fdd01771f7897cccab75eb4fab0e575393ab79
- '/var/lib/docker/containers/19aa5027343f4*\/*-json.log' # remove \
offsets_file: /data/offsets.yaml
persistence_mode: async
```
Expand Down
4 changes: 2 additions & 2 deletions plugin/input/file/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ func NewJobProvider(config *Config, metrics *metricCollection, sugLogger *zap.Su
if config.DirPattern == "*" {
config.Paths.Include = append(
config.Paths.Include,
filepath.Join("**", config.FilenamePattern),
filepath.Join(config.WatchingDir, filepath.Join("**", config.FilenamePattern)),
)
} else {
config.Paths.Include = append(
config.Paths.Include,
filepath.Join(config.DirPattern, config.FilenamePattern),
filepath.Join(config.WatchingDir, filepath.Join(config.DirPattern, config.FilenamePattern)),
)
}
}
Expand Down
12 changes: 6 additions & 6 deletions plugin/input/file/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestProvierWatcherPaths(t *testing.T) {
OffsetsFile: "offset.json",
},
expectedPathes: Paths{
Include: []string{"**/*"},
Include: []string{"/var/log/**/*"},
},
},
{
Expand All @@ -35,7 +35,7 @@ func TestProvierWatcherPaths(t *testing.T) {
OffsetsFile: "offset.json",
},
expectedPathes: Paths{
Include: []string{"**/error.log"},
Include: []string{"/var/log/**/error.log"},
},
},
{
Expand All @@ -47,7 +47,7 @@ func TestProvierWatcherPaths(t *testing.T) {
OffsetsFile: "offset.json",
},
expectedPathes: Paths{
Include: []string{"nginx-ingress-*/error.log"},
Include: []string{"/var/log/nginx-ingress-*/error.log"},
},
},
{
Expand All @@ -58,7 +58,7 @@ func TestProvierWatcherPaths(t *testing.T) {
OffsetsFile: "offset.json",
},
expectedPathes: Paths{
Include: []string{"nginx-ingress-*/*"},
Include: []string{"/var/log/nginx-ingress-*/*"},
},
},
{
Expand All @@ -69,11 +69,11 @@ func TestProvierWatcherPaths(t *testing.T) {
DirPattern: "nginx-ingress-*",
OffsetsFile: "offset.json",
Paths: Paths{
Include: []string{"access.log"},
Include: []string{"/var/log/access.log"},
},
},
expectedPathes: Paths{
Include: []string{"access.log"},
Include: []string{"/var/log/access.log"},
},
},
}
Expand Down
77 changes: 57 additions & 20 deletions plugin/input/file/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package file
import (
"os"
"path/filepath"
"strings"

"github.com/bmatcuk/doublestar/v4"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -11,7 +12,8 @@ import (
)

type watcher struct {
dir string // dir in which watch for files
commonPath string
basePaths []string
paths Paths
notifyFn notifyFn // function to receive notifications
watcherCh chan notify.EventInfo
Expand All @@ -33,7 +35,6 @@ func NewWatcher(
logger *zap.SugaredLogger,
) *watcher {
return &watcher{
dir: dir,
paths: paths,
notifyFn: notifyFn,
shouldWatchWrites: shouldWatchWrites,
Expand All @@ -43,9 +44,16 @@ func NewWatcher(
}

func (w *watcher) start() {
for _, pattern := range w.paths.Include {
// /var/lib/docker/containers/**/*-json.log -> /var/lib/docker/containers
basePattern, _ := doublestar.SplitPattern(pattern)
w.basePaths = append(w.basePaths, basePattern)
}
w.commonPath = commonPathPrefix(w.basePaths)

w.logger.Infof(
"starting watcher path=%s, pattern_included=%q, pattern_excluded=%q",
w.dir, w.paths.Include, w.paths.Exclude,
w.commonPath, w.paths.Include, w.paths.Exclude,
)

eventsCh := make(chan notify.EventInfo, 256)
Expand All @@ -57,7 +65,7 @@ func (w *watcher) start() {
}

// watch recursively.
err := notify.Watch(filepath.Join(w.dir, "..."), eventsCh, events...)
err := notify.Watch(filepath.Join(w.commonPath, "..."), eventsCh, events...)
if err != nil {
w.logger.Warnf("can't create fs watcher: %s", err.Error())
return
Expand All @@ -66,7 +74,33 @@ func (w *watcher) start() {

go w.watch()

w.tryAddPath(w.dir)
w.tryAddPath(w.commonPath)
}

func commonPathPrefix(paths []string) string {
results := make([][]string, 0, len(paths))
results = append(results, strings.Split(paths[0], string(os.PathSeparator)))
longest := results[0]

cmpWithLongest := func(a []string) {
if len(a) < len(longest) {
longest = longest[:len(a)]
}
for i := 0; i < len(longest); i++ {
if a[i] != longest[i] {
longest = longest[:i]
return
}
}
}

for i := 1; i < len(paths); i++ {
r := strings.Split(paths[i], string(os.PathSeparator))
results = append(results, r)
cmpWithLongest(r)
}

return filepath.Join(string(os.PathSeparator), filepath.Join(longest...))
}

func (w *watcher) stop() {
Expand All @@ -89,7 +123,8 @@ func (w *watcher) tryAddPath(path string) {
continue
}

w.notify(notify.Create, filepath.Join(path, file.Name()))
filename := filepath.Join(path, file.Name())
w.notify(notify.Create, filename)
}
}

Expand All @@ -99,19 +134,8 @@ func (w *watcher) notify(e notify.Event, path string) {
return
}

filename, err := filepath.Abs(filename)
if err != nil {
w.logger.Fatalf("can't get abs file name: %s", err.Error())
return
}

dirRel, _ := filepath.Abs(w.dir)
rel, _ := filepath.Rel(dirRel, filename)

w.logger.Infof("%s %s", e, path)

for _, pattern := range w.paths.Exclude {
match, err := doublestar.PathMatch(pattern, rel)
match, err := doublestar.PathMatch(pattern, path)
if err != nil {
w.logger.Fatalf("wrong paths exclude pattern %q: %s", pattern, err.Error())
return
Expand All @@ -127,12 +151,25 @@ func (w *watcher) notify(e notify.Event, path string) {
}

if stat.IsDir() {
w.tryAddPath(filename)
dirFilename := filename
for {
for _, path := range w.basePaths {
if path == dirFilename {
w.tryAddPath(filename)
}
}
if dirFilename == w.commonPath {
break
}
dirFilename = filepath.Dir(dirFilename)
}
return
}

w.logger.Infof("%s %s", e, path)

for _, pattern := range w.paths.Include {
match, err := doublestar.PathMatch(pattern, rel)
match, err := doublestar.PathMatch(pattern, path)
if err != nil {
w.logger.Fatalf("wrong paths include pattern %q: %s", pattern, err.Error())
return
Expand Down
Loading

0 comments on commit 9945f6a

Please sign in to comment.