diff --git a/filebeat/input/filestream/identifier.go b/filebeat/input/filestream/identifier.go new file mode 100644 index 00000000000..736c66da2f0 --- /dev/null +++ b/filebeat/input/filestream/identifier.go @@ -0,0 +1,139 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "fmt" + "os" + + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/file" +) + +const ( + nativeName = "native" + pathName = "path" + inodeMarkerName = "inode_marker" + + DefaultIdentifierName = nativeName + identitySep = "::" +) + +var ( + identifierFactories = map[string]identifierFactory{ + nativeName: newINodeDeviceIdentifier, + pathName: newPathIdentifier, + inodeMarkerName: newINodeMarkerIdentifier, + } +) + +type identifierFactory func(*common.Config) (fileIdentifier, error) + +type fileIdentifier interface { + GetSource(loginp.FSEvent) fileSource + Name() string +} + +// fileSource implements the Source interface +// It is required to identify and manage file sources. +type fileSource struct { + info os.FileInfo + newPath string + oldPath string + + name string + identifierGenerator string +} + +// Name returns the registry identifier of the file. +func (f fileSource) Name() string { + return f.name +} + +// newFileIdentifier creates a new state identifier for a log input. +func newFileIdentifier(ns *common.ConfigNamespace) (fileIdentifier, error) { + if ns == nil { + return newINodeDeviceIdentifier(nil) + } + + identifierType := ns.Name() + f, ok := identifierFactories[identifierType] + if !ok { + return nil, fmt.Errorf("no such file_identity generator: %s", identifierType) + } + + return f(ns.Config()) +} + +type inodeDeviceIdentifier struct { + name string +} + +func newINodeDeviceIdentifier(_ *common.Config) (fileIdentifier, error) { + return &inodeDeviceIdentifier{ + name: nativeName, + }, nil +} + +func (i *inodeDeviceIdentifier) GetSource(e loginp.FSEvent) fileSource { + return fileSource{ + info: e.Info, + newPath: e.NewPath, + oldPath: e.OldPath, + name: pluginName + identitySep + i.name + identitySep + file.GetOSState(e.Info).String(), + identifierGenerator: i.name, + } +} + +func (i *inodeDeviceIdentifier) Name() string { + return i.name +} + +type pathIdentifier struct { + name string +} + +func newPathIdentifier(_ *common.Config) (fileIdentifier, error) { + return &pathIdentifier{ + name: pathName, + }, nil +} + +func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource { + return fileSource{ + info: e.Info, + newPath: e.NewPath, + oldPath: e.OldPath, + name: pluginName + identitySep + p.name + identitySep + e.NewPath, + identifierGenerator: p.name, + } +} + +func (p *pathIdentifier) Name() string { + return p.name +} + +// mockIdentifier is used for testing +type MockIdentifier struct{} + +func (m *MockIdentifier) GetSource(e loginp.FSEvent) fileSource { + return fileSource{identifierGenerator: "mock"} +} + +func (m *MockIdentifier) Name() string { return "mock" } diff --git a/filebeat/input/filestream/identifier_inode_deviceid.go b/filebeat/input/filestream/identifier_inode_deviceid.go new file mode 100644 index 00000000000..25254d97fdc --- /dev/null +++ b/filebeat/input/filestream/identifier_inode_deviceid.go @@ -0,0 +1,108 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !windows + +package filestream + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "time" + + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/file" + "github.com/elastic/beats/v7/libbeat/logp" +) + +type inodeMarkerIdentifier struct { + log *logp.Logger + name string + markerPath string + + markerFileLastModifitaion time.Time + markerTxt string +} + +func newINodeMarkerIdentifier(cfg *common.Config) (fileIdentifier, error) { + var config struct { + MarkerPath string `config:"path" validate:"required"` + } + err := cfg.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("error while reading configuration of INode + marker file configuration: %v", err) + } + + fi, err := os.Stat(config.MarkerPath) + if err != nil { + return nil, fmt.Errorf("error while opening marker file at %s: %v", config.MarkerPath, err) + } + markerContent, err := ioutil.ReadFile(config.MarkerPath) + if err != nil { + return nil, fmt.Errorf("error while reading marker file at %s: %v", config.MarkerPath, err) + } + return &inodeMarkerIdentifier{ + log: logp.NewLogger("inode_marker_identifier_" + filepath.Base(config.MarkerPath)), + name: inodeMarkerName, + markerPath: config.MarkerPath, + markerFileLastModifitaion: fi.ModTime(), + markerTxt: string(markerContent), + }, nil +} + +func (i *inodeMarkerIdentifier) markerContents() string { + f, err := os.Open(i.markerPath) + if err != nil { + i.log.Errorf("Failed to open marker file %s: %v", i.markerPath, err) + return "" + } + defer f.Close() + + fi, err := f.Stat() + if err != nil { + i.log.Errorf("Failed to fetch file information for %s: %v", i.markerPath, err) + return "" + } + if i.markerFileLastModifitaion.Before(fi.ModTime()) { + contents, err := ioutil.ReadFile(i.markerPath) + if err != nil { + i.log.Errorf("Error while reading contents of marker file: %v", err) + return "" + } + i.markerTxt = string(contents) + } + + return i.markerTxt +} + +func (i *inodeMarkerIdentifier) GetSource(e loginp.FSEvent) fileSource { + osstate := file.GetOSState(e.Info) + return fileSource{ + info: e.Info, + newPath: e.NewPath, + oldPath: e.OldPath, + name: fmt.Sprintf("%s%s%s-%s", i.name, identitySep, osstate.InodeString(), i.markerContents()), + identifierGenerator: i.name, + } +} + +func (i *inodeMarkerIdentifier) Name() string { + return i.name +} diff --git a/filebeat/input/filestream/identifier_inode_deviceid_windows.go b/filebeat/input/filestream/identifier_inode_deviceid_windows.go new file mode 100644 index 00000000000..4ee8d866124 --- /dev/null +++ b/filebeat/input/filestream/identifier_inode_deviceid_windows.go @@ -0,0 +1,30 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build windows + +package filestream + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func newINodeMarkerIdentifier(cfg *common.Config) (fileIdentifier, error) { + return nil, fmt.Errorf("inode_deviceid is not supported on Windows") +} diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 487a5f01c2a..bcd143c1c5a 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -29,6 +29,12 @@ import ( // are actively written by other applications. type filestream struct{} +type state struct { + Source string `json:"source" struct:"source"` + Offset int64 `json:"offset" struct:"offset"` + IdentifierName string `json:"identifier_name" struct:"identifier_name"` +} + const pluginName = "filestream" // Plugin creates a new filestream input plugin for creating a stateful input. diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index 257574b9ca1..94670e18ce7 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -18,19 +18,191 @@ package filestream import ( + "os" + "strings" + "time" + + "github.com/urso/sderr" + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/go-concert/unison" +) + +const ( + prospectorDebugKey = "file_prospector" ) // fileProspector implements the Prospector interface. // It contains a file scanner which returns file system events. // The FS events then trigger either new Harvester runs or updates // the statestore. -type fileProspector struct{} +type fileProspector struct { + filewatcher loginp.FSWatcher + identifier fileIdentifier + ignoreOlder time.Duration + cleanRemoved bool +} + +func newFileProspector( + paths []string, + ignoreOlder time.Duration, + fileWatcherNs, identifierNs *common.ConfigNamespace, +) (loginp.Prospector, error) { + + filewatcher, err := newFileWatcher(paths, fileWatcherNs) + if err != nil { + return nil, err + } + + identifier, err := newFileIdentifier(identifierNs) + if err != nil { + return nil, err + } + + return &fileProspector{ + filewatcher: filewatcher, + identifier: identifier, + ignoreOlder: ignoreOlder, + cleanRemoved: true, + }, nil +} +// Run starts the fileProspector which accepts FS events from a file watcher. func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg *loginp.HarvesterGroup) { - panic("TODO: implement me") + log := ctx.Logger.With("prospector", prospectorDebugKey) + log.Debug("Starting prospector") + defer log.Debug("Prospector has stopped") + + if p.cleanRemoved { + p.cleanRemovedBetweenRuns(log, s) + } + + p.updateIdentifiersBetweenRuns(log, s) + + var tg unison.MultiErrGroup + + tg.Go(func() error { + p.filewatcher.Run(ctx.Cancelation) + return nil + }) + + tg.Go(func() error { + for ctx.Cancelation.Err() == nil { + fe := p.filewatcher.Event() + + if fe.Op == loginp.OpDone { + return nil + } + + src := p.identifier.GetSource(fe) + switch fe.Op { + case loginp.OpCreate: + log.Debugf("A new file %s has been found", fe.NewPath) + + if p.ignoreOlder > 0 { + now := time.Now() + if now.Sub(fe.Info.ModTime()) > p.ignoreOlder { + log.Debugf("Ignore file because ignore_older reached. File %s", fe.NewPath) + break + } + } + + hg.Run(ctx, src) + + case loginp.OpWrite: + log.Debugf("File %s has been updated", fe.NewPath) + + hg.Run(ctx, src) + + case loginp.OpDelete: + log.Debugf("File %s has been removed", fe.OldPath) + + if p.cleanRemoved { + log.Debugf("Remove state for file as file removed: %s", fe.OldPath) + + err := s.Remove(src.Name()) + if err != nil { + log.Errorf("Error while removing state from statestore: %v", err) + } + } + + case loginp.OpRename: + log.Debugf("File %s has been renamed to %s", fe.OldPath, fe.NewPath) + // TODO update state information in the store + + default: + log.Error("Unkown return value %v", fe.Op) + } + } + return nil + }) + + errs := tg.Wait() + if len(errs) > 0 { + log.Error("%s", sderr.WrapAll(errs, "running prospector failed")) + } +} + +func (p *fileProspector) cleanRemovedBetweenRuns(log *logp.Logger, s *statestore.Store) { + keyPrefix := pluginName + "::" + s.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { + if !strings.HasPrefix(string(key), keyPrefix) { + return true, nil + } + + var st state + if err := dec.Decode(&st); err != nil { + log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v", + key, err) + return true, nil + } + + _, err := os.Stat(st.Source) + if err != nil { + s.Remove(key) + } + + return true, nil + }) +} + +func (p *fileProspector) updateIdentifiersBetweenRuns(log *logp.Logger, s *statestore.Store) { + keyPrefix := pluginName + "::" + s.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { + if !strings.HasPrefix(string(key), keyPrefix) { + return true, nil + } + + var st state + if err := dec.Decode(&st); err != nil { + log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v", key, err) + return true, nil + } + + if st.IdentifierName == p.identifier.Name() { + return true, nil + } + + fi, err := os.Stat(st.Source) + if err != nil { + return true, nil + } + newKey := p.identifier.GetSource(loginp.FSEvent{NewPath: st.Source, Info: fi}).Name() + st.IdentifierName = p.identifier.Name() + + err = s.Set(newKey, st) + if err != nil { + log.Errorf("Failed to add updated state for '%v', cursor state will be ignored. Error was: %+v", key, err) + return true, nil + } + s.Remove(key) + + return true, nil + }) } func (p *fileProspector) Test() error {