From 266707307f5b2093df18503581d54702aa42da8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Fri, 2 Oct 2020 16:19:36 +0200 Subject: [PATCH] implementation of file prospector --- filebeat/input/filestream/active_monitor.go | 100 ++++++++++ filebeat/input/filestream/identifier.go | 139 ++++++++++++++ filebeat/input/filestream/input.go | 6 + filebeat/input/filestream/prospector.go | 201 +++++++++++++++++++- 4 files changed, 444 insertions(+), 2 deletions(-) create mode 100644 filebeat/input/filestream/active_monitor.go create mode 100644 filebeat/input/filestream/identifier.go diff --git a/filebeat/input/filestream/active_monitor.go b/filebeat/input/filestream/active_monitor.go new file mode 100644 index 00000000000..c78f8a0634e --- /dev/null +++ b/filebeat/input/filestream/active_monitor.go @@ -0,0 +1,100 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "sync" + "time" + + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/go-concert/unison" +) + +// activeFileMonitor checks the state of every opened file. +// If a file is renamed or removed, it cancels the harvester. +// This has to run separately from reading the file +// to avoid blocking when the output cannot accept events. +type activeFileMonitor struct { + sync.RWMutex + log *logp.Logger + files map[string]fileSource + + interval time.Duration + closeRenamed bool + closeRemoved bool +} + +func newActiveFileMonitor(cfg stateChangeCloserConfig) *activeFileMonitor { + return &activeFileMonitor{ + log: logp.NewLogger("active_file_monitor"), + files: make(map[string]fileSource, 0), + interval: cfg.CheckInterval, + closeRenamed: cfg.Renamed, + closeRemoved: cfg.Removed, + } +} + +func (m *activeFileMonitor) addFile(path string, src fileSource) bool { + m.Lock() + defer m.Unlock() + + m.log.Debug("Adding new file to monitor %s", path) + if _, ok := m.files[path]; ok { + return false + } + m.files[path] = src + return true +} + +func (m *activeFileMonitor) run(ctx unison.Canceler, hg *loginp.HarvesterGroup) { + ticker := time.NewTicker(m.interval) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.Lock() + for path, src := range m.files { + isSame, err := isSameFile(path, src.info) + if m.closeBecauseRemoved(path, err) || m.closeBecauseRenamed(path, isSame) { + hg.Cancel(src) + delete(m.files, path) + } + } + + m.Unlock() + } + } +} + +func (m *activeFileMonitor) closeBecauseRemoved(path string, err error) bool { + mustClose := m.closeRemoved && err != nil + if mustClose { + m.log.Debugf("File %s must be closed as it has been removed and close.removed is enabled", path) + } + return mustClose +} + +func (m *activeFileMonitor) closeBecauseRenamed(path string, isSame bool) bool { + mustClose := m.closeRenamed && !isSame + if mustClose { + m.log.Debugf("File %s must be closed as it has been renamed and close.renamed is enabled", path) + } + return mustClose +} diff --git a/filebeat/input/filestream/identifier.go b/filebeat/input/filestream/identifier.go new file mode 100644 index 00000000000..74edb1bb509 --- /dev/null +++ b/filebeat/input/filestream/identifier.go @@ -0,0 +1,139 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "fmt" + "os" + + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/file" +) + +const ( + nativeName = "native" + pathName = "path" + inodeMarkerName = "inode_marker" + + DefaultIdentifierName = nativeName + identitySep = "::" +) + +var ( + identifierFactories = map[string]identifierFactory{ + nativeName: newINodeDeviceIdentifier, + pathName: newPathIdentifier, + //inodeMarkerName: newINodeMarkerIdentifier, TODO + } +) + +type identifierFactory func(*common.Config) (fileIdentifier, error) + +type fileIdentifier interface { + GetSource(loginp.FSEvent) fileSource + Name() string +} + +// fileSource implements the Source interface +// It is required to identify and manage file sources. +type fileSource struct { + info os.FileInfo + newPath string + oldPath string + + name string + identifierGenerator string +} + +// Name returns the registry identifier of the file. +func (f fileSource) Name() string { + return f.name +} + +// newFileIdentifier creates a new state identifier for a log input. +func newFileIdentifier(ns *common.ConfigNamespace) (fileIdentifier, error) { + if ns == nil { + return newINodeDeviceIdentifier(nil) + } + + identifierType := ns.Name() + f, ok := identifierFactories[identifierType] + if !ok { + return nil, fmt.Errorf("no such file_identity generator: %s", identifierType) + } + + return f(ns.Config()) +} + +type inodeDeviceIdentifier struct { + name string +} + +func newINodeDeviceIdentifier(_ *common.Config) (fileIdentifier, error) { + return &inodeDeviceIdentifier{ + name: nativeName, + }, nil +} + +func (i *inodeDeviceIdentifier) GetSource(e loginp.FSEvent) fileSource { + return fileSource{ + info: e.Info, + newPath: e.NewPath, + oldPath: e.OldPath, + name: pluginName + identitySep + i.name + identitySep + file.GetOSState(e.Info).String(), + identifierGenerator: i.name, + } +} + +func (i *inodeDeviceIdentifier) Name() string { + return i.name +} + +type pathIdentifier struct { + name string +} + +func newPathIdentifier(_ *common.Config) (fileIdentifier, error) { + return &pathIdentifier{ + name: pathName, + }, nil +} + +func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource { + return fileSource{ + info: e.Info, + newPath: e.NewPath, + oldPath: e.OldPath, + name: pluginName + identitySep + p.name + identitySep + e.NewPath, + identifierGenerator: p.name, + } +} + +func (p *pathIdentifier) Name() string { + return p.name +} + +// mockIdentifier is used for testing +type MockIdentifier struct{} + +func (m *MockIdentifier) GetSource(e loginp.FSEvent) fileSource { + return fileSource{identifierGenerator: "mock"} +} + +func (m *MockIdentifier) Name() string { return "mock" } diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 487a5f01c2a..bcd143c1c5a 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -29,6 +29,12 @@ import ( // are actively written by other applications. type filestream struct{} +type state struct { + Source string `json:"source" struct:"source"` + Offset int64 `json:"offset" struct:"offset"` + IdentifierName string `json:"identifier_name" struct:"identifier_name"` +} + const pluginName = "filestream" // Plugin creates a new filestream input plugin for creating a stateful input. diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index 257574b9ca1..d4a6241e87e 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -18,19 +18,216 @@ package filestream import ( + "os" + "strings" + "time" + + "github.com/urso/sderr" + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/go-concert/unison" +) + +const ( + prospectorDebugKey = "file_prospector" ) // fileProspector implements the Prospector interface. // It contains a file scanner which returns file system events. // The FS events then trigger either new Harvester runs or updates // the statestore. -type fileProspector struct{} +type fileProspector struct { + filewatcher loginp.FSWatcher + identifier fileIdentifier + ignoreOlder time.Duration + cleanRemoved bool + monitor *activeFileMonitor +} + +func newFileProspector( + paths []string, + ignoreOlder time.Duration, + closerConfig stateChangeCloserConfig, + fileWatcherNs, identifierNs *common.ConfigNamespace, +) (loginp.Prospector, error) { + filewatcher, err := newFileWatcher(paths, fileWatcherNs) + if err != nil { + return nil, err + } + + identifier, err := newFileIdentifier(identifierNs) + if err != nil { + return nil, err + } + + return &fileProspector{ + filewatcher: filewatcher, + identifier: identifier, + ignoreOlder: ignoreOlder, + cleanRemoved: true, + monitor: newActiveFileMonitor(closerConfig), + }, nil +} + +// Run starts the fileProspector which accepts FS events from a file watcher. func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg *loginp.HarvesterGroup) { - panic("TODO: implement me") + log := ctx.Logger.With("prospector", prospectorDebugKey) + log.Debug("Starting prospector") + defer log.Debug("Prospector has stopped") + + if p.cleanRemoved { + p.cleanRemovedBetweenRuns(log, s) + } + + p.updateIdentifiersBetweenRuns(log, s) + + var tg unison.MultiErrGroup + + tg.Go(func() error { + p.filewatcher.Run(ctx.Cancelation) + return nil + }) + + tg.Go(func() error { + for ctx.Cancelation.Err() == nil { + fe := p.filewatcher.Event() + + if fe.Op == loginp.OpDone { + return nil + } + + src := p.identifier.GetSource(fe) + switch fe.Op { + case loginp.OpCreate: + log.Debugf("A new file %s has been found", fe.NewPath) + + if p.ignoreOlder > 0 { + now := time.Now() + if now.Sub(fe.Info.ModTime()) > p.ignoreOlder { + log.Debugf("Ignore file because ignore_older reached. File %s", fe.NewPath) + break + } + } + + p.startReading(ctx, hg, src, fe.NewPath) + + case loginp.OpWrite: + log.Debugf("File %s has been updated", fe.NewPath) + + p.startReading(ctx, hg, src, fe.NewPath) + + case loginp.OpDelete: + log.Debugf("File %s has been removed", fe.OldPath) + + if p.cleanRemoved { + log.Debugf("Remove state for file as file removed: %s", fe.OldPath) + + err := s.Remove(src.Name()) + if err != nil { + log.Errorf("Error while removing state from statestore: %v", err) + } + } + + case loginp.OpRename: + log.Debugf("File %s has been renamed to %s", fe.OldPath, fe.NewPath) + // TODO update state information in the store + + default: + log.Error("Unkown return value %v", fe.Op) + } + } + return nil + }) + + tg.Go(func() error { + p.monitor.run(ctx.Cancelation, hg) + return nil + }) + + errs := tg.Wait() + if len(errs) > 0 { + log.Error("%s", sderr.WrapAll(errs, "running prospector failed")) + } +} + +func (p *fileProspector) cleanRemovedBetweenRuns(log *logp.Logger, s *statestore.Store) { + keyPrefix := pluginName + "::" + s.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { + if !strings.HasPrefix(string(key), keyPrefix) { + return true, nil + } + + var st state + if err := dec.Decode(&st); err != nil { + log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v", + key, err) + return true, nil + } + + _, err := os.Stat(st.Source) + if err != nil { + s.Remove(key) + } + + return true, nil + }) +} + +func (p *fileProspector) updateIdentifiersBetweenRuns(log *logp.Logger, s *statestore.Store) { + keyPrefix := pluginName + "::" + s.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { + if !strings.HasPrefix(string(key), keyPrefix) { + return true, nil + } + + var st state + if err := dec.Decode(&st); err != nil { + log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v", key, err) + return true, nil + } + + if st.IdentifierName == p.identifier.Name() { + return true, nil + } + + fi, err := os.Stat(st.Source) + if err != nil { + return true, nil + } + newKey := p.identifier.GetSource(loginp.FSEvent{NewPath: st.Source, Info: fi}).Name() + st.IdentifierName = p.identifier.Name() + + err = s.Set(newKey, st) + if err != nil { + log.Errorf("Failed to add updated state for '%v', cursor state will be ignored. Error was: %+v", key, err) + return true, nil + } + s.Remove(key) + + return true, nil + }) +} + +func (p *fileProspector) startReading(ctx input.Context, hg *loginp.HarvesterGroup, s fileSource, path string) { + p.monitor.addFile(path, s) + hg.Run(ctx, s) +} + +// isSameFile checks if the given File path corresponds with the FileInfo given +// It is used to check if the file has been renamed. +func isSameFile(path string, info os.FileInfo) (bool, error) { + fileInfo, err := os.Stat(path) + + if err != nil { + return false, err + } + + return os.SameFile(fileInfo, info), nil } func (p *fileProspector) Test() error {