Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

file input plugin: define watching_dir from include patterns #567

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/alicebob/miniredis/v2 v2.30.5
github.com/bitly/go-simplejson v0.5.1
github.com/bufbuild/protocompile v0.13.0
github.com/bmatcuk/doublestar/v4 v4.0.2
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cespare/xxhash/v2 v2.2.0
github.com/euank/go-kmsg-parser v2.0.0+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pg
github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q=
github.com/bufbuild/protocompile v0.13.0 h1:6cwUB0Y2tSvmNxsbunwzmIto3xOlJOV7ALALuVOs92M=
github.com/bufbuild/protocompile v0.13.0/go.mod h1:dr++fGGeMPWHv7jPeT06ZKukm45NJscd7rUxQVzEKRk=
github.com/bmatcuk/doublestar/v4 v4.0.2 h1:X0krlUVAVmtr2cRoTqR8aDMrDqnB36ht8wpWTiQ3jsA=
github.com/bmatcuk/doublestar/v4 v4.0.2/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc=
github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c=
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
Expand Down
12 changes: 0 additions & 12 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,6 @@ But update events don't work with symlinks, so watcher also periodically manuall

> ⚠ Use add_file_name plugin if you want to add filename to events.

**Reading docker container log files:**
```yaml
pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
offsets_file: /data/offsets.yaml
filename_pattern: "*-json.log"
persistence_mode: async
```

[More details...](plugin/input/file/README.md)
## http
Reads events from HTTP requests with the body delimited by a new line.
Expand Down
12 changes: 0 additions & 12 deletions plugin/input/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,6 @@ But update events don't work with symlinks, so watcher also periodically manuall

> ⚠ Use add_file_name plugin if you want to add filename to events.

**Reading docker container log files:**
```yaml
pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
offsets_file: /data/offsets.yaml
filename_pattern: "*-json.log"
persistence_mode: async
```

[More details...](plugin/input/file/README.md)
## http
Reads events from HTTP requests with the body delimited by a new line.
Expand Down
15 changes: 15 additions & 0 deletions plugin/input/file/README.idoc.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
# File plugin
@introduction

**Reading docker container log files:**
```yaml
pipelines:
example_docker_pipeline:
input:
type: file
paths:
include:
- '/var/lib/docker/containers/**/*-json.log'
exclude:
- '/var/lib/docker/containers/19aa5027343f4*/*-json.log'
offsets_file: /data/offsets.yaml
persistence_mode: async
```

### Config params
@config-params|description

Expand Down
54 changes: 33 additions & 21 deletions plugin/input/file/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,22 @@ pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
paths:
include:
- '/var/lib/docker/containers/**/*-json.log'
exclude:
- '/var/lib/docker/containers/19aa5027343f4*/*-json.log'
offsets_file: /data/offsets.yaml
filename_pattern: "*-json.log"
persistence_mode: async
```

### Config params
**`watching_dir`** *`string`* *`required`*
**`paths`** *`Paths`*

The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
`/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
different directories, it's recommended to setup separate pipelines for each.
Set paths in glob format

* `include` *`[]string`*
* `exclude` *`[]string`*

<br>

Expand All @@ -48,20 +51,6 @@ The filename to store offsets of processed files. Offsets are loaded only on ini

<br>

**`filename_pattern`** *`string`* *`default=*`*

Files that don't meet this pattern will be ignored.
> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.

<br>

**`dir_pattern`** *`string`* *`default=*`*

Dirs that don't meet this pattern will be ignored.
> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.

<br>

**`persistence_mode`** *`string`* *`default=async`* *`options=async|sync`*

It defines how to save the offsets file:
Expand Down Expand Up @@ -147,6 +136,29 @@ Example: ```filename: '{{ .filename }}'```

<br>

**`watching_dir`** **Deprecated format**

The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
`/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
different directories, it's recommended to setup separate pipelines for each.

<br>

**`filename_pattern`** *`string`* *`default=*`*

Files that don't meet this pattern will be ignored.
> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.

<br>

**`dir_pattern`** *`string`* *`default=*`*

Dirs that don't meet this pattern will be ignored.
> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.

<br>


### Meta params
**`filename`**
Expand Down
59 changes: 30 additions & 29 deletions plugin/input/file/file.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package file

Check failure on line 1 in plugin/input/file/file.go

View workflow job for this annotation

GitHub Actions / lint

: # github.com/ozontech/file.d/plugin/input/file [github.com/ozontech/file.d/plugin/input/file.test]

import (
"net/http"
Expand Down Expand Up @@ -34,18 +34,6 @@
> By default the plugin is notified only on file creations. Note that following for changes is more CPU intensive.

> ⚠ Use add_file_name plugin if you want to add filename to events.

**Reading docker container log files:**
```yaml
pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
offsets_file: /data/offsets.yaml
filename_pattern: "*-json.log"
persistence_mode: async
```
}*/

type Plugin struct {
Expand Down Expand Up @@ -81,17 +69,22 @@
offsetsOpReset // * `reset` – resets an offset to the beginning of the file
)

type Paths struct {
Include []string `json:"include"`
Exclude []string `json:"exclude"`
}

type Config struct {
// ! config-params
// ^ config-params

// > @3@4@5@6
// >
// > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
// > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
// > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
// > different directories, it's recommended to setup separate pipelines for each.
WatchingDir string `json:"watching_dir" required:"true"` // *
// > Set paths in glob format
// >
// > * `include` *`[]string`*
// > * `exclude` *`[]string`*
Paths Paths `json:"paths"` // *

// > @3@4@5@6
// >
Expand All @@ -100,18 +93,6 @@
OffsetsFile string `json:"offsets_file" required:"true"` // *
OffsetsFileTmp string

// > @3@4@5@6
// >
// > Files that don't meet this pattern will be ignored.
// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
FilenamePattern string `json:"filename_pattern" default:"*"` // *

// > @3@4@5@6
// >
// > Dirs that don't meet this pattern will be ignored.
// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
DirPattern string `json:"dir_pattern" default:"*"` // *

// > @3@4@5@6
// >
// > It defines how to save the offsets file:
Expand Down Expand Up @@ -184,6 +165,26 @@
// >
// > Example: ```filename: '{{ .filename }}'```
Meta cfg.MetaTemplates `json:"meta"` // *

// > **Deprecated format**
// >
// > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
// > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
// > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
// > different directories, it's recommended to setup separate pipelines for each.
WatchingDir string `json:"watching_dir"` // *

// > @3@4@5@6
// >
// > Files that don't meet this pattern will be ignored.
// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
FilenamePattern string `json:"filename_pattern" default:"*"` // *

// > @3@4@5@6
// >
// > Dirs that don't meet this pattern will be ignored.
// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
DirPattern string `json:"dir_pattern" default:"*"` // *
}

var offsetFiles = make(map[string]string)
Expand Down
12 changes: 9 additions & 3 deletions plugin/input/file/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,16 @@ func (ir *InfoRegistry) Info(w http.ResponseWriter, r *http.Request) {

_, _ = w.Write([]byte(jobsInfo))

watcherInfo := logger.Header("watch_dirs")
watcherInfo := logger.Header("watch_paths")
for _, source := range plugin.jobProvider.watcher.basePaths {
watcherInfo += fmt.Sprintf(
"%s\n",
source,
)
}
watcherInfo += fmt.Sprintf(
"%s\n",
plugin.jobProvider.watcher.path,
"commonPath: %s\n",
plugin.jobProvider.watcher.commonPath,
)
_, _ = w.Write([]byte(watcherInfo))

Expand Down
17 changes: 15 additions & 2 deletions plugin/input/file/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,23 @@ func NewJobProvider(config *Config, metrics *metricCollection, sugLogger *zap.Su
numberOfCurrentJobsMetric: metrics.numberOfCurrentJobsMetric,
}

if len(config.Paths.Include) == 0 {
if config.DirPattern == "*" {
config.Paths.Include = append(
config.Paths.Include,
filepath.Join(config.WatchingDir, filepath.Join("**", config.FilenamePattern)),
)
} else {
config.Paths.Include = append(
config.Paths.Include,
filepath.Join(config.WatchingDir, filepath.Join(config.DirPattern, config.FilenamePattern)),
)
}
}

jp.watcher = NewWatcher(
config.WatchingDir,
config.FilenamePattern,
config.DirPattern,
config.Paths,
jp.processNotification,
config.ShouldWatchChanges,
metrics.notifyChannelLengthMetric,
Expand Down
92 changes: 92 additions & 0 deletions plugin/input/file/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,96 @@
os.Remove(linkName)
jp.maintenanceSymlinks()
require.Equal(t, 0, len(jp.symlinks))
"runtime"
"testing"

"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/test"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

Check failure on line 58 in plugin/input/file/provider_test.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected ), expected } (typecheck)

Check failure on line 58 in plugin/input/file/provider_test.go

View workflow job for this annotation

GitHub Actions / lint

expected statement, found ')' (typecheck)

Check failure on line 58 in plugin/input/file/provider_test.go

View workflow job for this annotation

GitHub Actions / test

expected statement, found ')'

Check failure on line 58 in plugin/input/file/provider_test.go

View workflow job for this annotation

GitHub Actions / test (-race)

expected statement, found ')'

func TestProvierWatcherPaths(t *testing.T) {
tests := []struct {
name string
config *Config
expectedPathes Paths
}{
{
name: "default config",
config: &Config{
WatchingDir: "/var/log/",
OffsetsFile: "offset.json",
},
expectedPathes: Paths{
Include: []string{"/var/log/**/*"},
},
},
{
name: "filename pattern config",
config: &Config{
WatchingDir: "/var/log/",
FilenamePattern: "error.log",
OffsetsFile: "offset.json",
},
expectedPathes: Paths{
Include: []string{"/var/log/**/error.log"},
},
},
{
name: "filename and dir pattern config",
config: &Config{
WatchingDir: "/var/log/",
FilenamePattern: "error.log",
DirPattern: "nginx-ingress-*",
OffsetsFile: "offset.json",
},
expectedPathes: Paths{
Include: []string{"/var/log/nginx-ingress-*/error.log"},
},
},
{
name: "dir pattern config",
config: &Config{
WatchingDir: "/var/log/",
DirPattern: "nginx-ingress-*",
OffsetsFile: "offset.json",
},
expectedPathes: Paths{
Include: []string{"/var/log/nginx-ingress-*/*"},
},
},
{
name: "ignore filename and dir pattern",
config: &Config{
WatchingDir: "/var/log/",
FilenamePattern: "error.log",
DirPattern: "nginx-ingress-*",
OffsetsFile: "offset.json",
Paths: Paths{
Include: []string{"/var/log/access.log"},
},
},
expectedPathes: Paths{
Include: []string{"/var/log/access.log"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctl := metric.NewCtl("test", prometheus.NewRegistry())
config := tt.config
test.NewConfig(config, map[string]int{"gomaxprocs": runtime.GOMAXPROCS(0)})
metrics := newMetricCollection(
ctl.RegisterCounter("worker1", "help_test"),
ctl.RegisterCounter("worker2", "help_test"),
ctl.RegisterGauge("worker3", "help_test"),
ctl.RegisterGauge("worker4", "help_test"),
)
jp := NewJobProvider(config, metrics, &zap.SugaredLogger{})

assert.Equal(t, tt.expectedPathes, jp.watcher.paths)
})
}
}
Loading
Loading