Skip to content

Commit

Permalink
implementation of file prospector
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Oct 5, 2020
1 parent c912167 commit 2667073
Show file tree
Hide file tree
Showing 4 changed files with 444 additions and 2 deletions.
100 changes: 100 additions & 0 deletions filebeat/input/filestream/active_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filestream

import (
"sync"
"time"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/unison"
)

// activeFileMonitor checks the state of every opened file.
// If a file is renamed or removed, it cancels the harvester.
// This has to run separately from reading the file
// to avoid blocking when the output cannot accept events.
type activeFileMonitor struct {
sync.RWMutex
log *logp.Logger
files map[string]fileSource

interval time.Duration
closeRenamed bool
closeRemoved bool
}

func newActiveFileMonitor(cfg stateChangeCloserConfig) *activeFileMonitor {
return &activeFileMonitor{
log: logp.NewLogger("active_file_monitor"),
files: make(map[string]fileSource, 0),
interval: cfg.CheckInterval,
closeRenamed: cfg.Renamed,
closeRemoved: cfg.Removed,
}
}

func (m *activeFileMonitor) addFile(path string, src fileSource) bool {
m.Lock()
defer m.Unlock()

m.log.Debug("Adding new file to monitor %s", path)
if _, ok := m.files[path]; ok {
return false
}
m.files[path] = src
return true
}

func (m *activeFileMonitor) run(ctx unison.Canceler, hg *loginp.HarvesterGroup) {
ticker := time.NewTicker(m.interval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.Lock()
for path, src := range m.files {
isSame, err := isSameFile(path, src.info)
if m.closeBecauseRemoved(path, err) || m.closeBecauseRenamed(path, isSame) {
hg.Cancel(src)
delete(m.files, path)
}
}

m.Unlock()
}
}
}

func (m *activeFileMonitor) closeBecauseRemoved(path string, err error) bool {
mustClose := m.closeRemoved && err != nil
if mustClose {
m.log.Debugf("File %s must be closed as it has been removed and close.removed is enabled", path)
}
return mustClose
}

func (m *activeFileMonitor) closeBecauseRenamed(path string, isSame bool) bool {
mustClose := m.closeRenamed && !isSame
if mustClose {
m.log.Debugf("File %s must be closed as it has been renamed and close.renamed is enabled", path)
}
return mustClose
}
139 changes: 139 additions & 0 deletions filebeat/input/filestream/identifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filestream

import (
"fmt"
"os"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/file"
)

const (
nativeName = "native"
pathName = "path"
inodeMarkerName = "inode_marker"

DefaultIdentifierName = nativeName
identitySep = "::"
)

var (
identifierFactories = map[string]identifierFactory{
nativeName: newINodeDeviceIdentifier,
pathName: newPathIdentifier,
//inodeMarkerName: newINodeMarkerIdentifier, TODO
}
)

type identifierFactory func(*common.Config) (fileIdentifier, error)

type fileIdentifier interface {
GetSource(loginp.FSEvent) fileSource
Name() string
}

// fileSource implements the Source interface
// It is required to identify and manage file sources.
type fileSource struct {
info os.FileInfo
newPath string
oldPath string

name string
identifierGenerator string
}

// Name returns the registry identifier of the file.
func (f fileSource) Name() string {
return f.name
}

// newFileIdentifier creates a new state identifier for a log input.
func newFileIdentifier(ns *common.ConfigNamespace) (fileIdentifier, error) {
if ns == nil {
return newINodeDeviceIdentifier(nil)
}

identifierType := ns.Name()
f, ok := identifierFactories[identifierType]
if !ok {
return nil, fmt.Errorf("no such file_identity generator: %s", identifierType)
}

return f(ns.Config())
}

type inodeDeviceIdentifier struct {
name string
}

func newINodeDeviceIdentifier(_ *common.Config) (fileIdentifier, error) {
return &inodeDeviceIdentifier{
name: nativeName,
}, nil
}

func (i *inodeDeviceIdentifier) GetSource(e loginp.FSEvent) fileSource {
return fileSource{
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
name: pluginName + identitySep + i.name + identitySep + file.GetOSState(e.Info).String(),
identifierGenerator: i.name,
}
}

func (i *inodeDeviceIdentifier) Name() string {
return i.name
}

type pathIdentifier struct {
name string
}

func newPathIdentifier(_ *common.Config) (fileIdentifier, error) {
return &pathIdentifier{
name: pathName,
}, nil
}

func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource {
return fileSource{
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
name: pluginName + identitySep + p.name + identitySep + e.NewPath,
identifierGenerator: p.name,
}
}

func (p *pathIdentifier) Name() string {
return p.name
}

// mockIdentifier is used for testing
type MockIdentifier struct{}

func (m *MockIdentifier) GetSource(e loginp.FSEvent) fileSource {
return fileSource{identifierGenerator: "mock"}
}

func (m *MockIdentifier) Name() string { return "mock" }
6 changes: 6 additions & 0 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ import (
// are actively written by other applications.
type filestream struct{}

type state struct {
Source string `json:"source" struct:"source"`
Offset int64 `json:"offset" struct:"offset"`
IdentifierName string `json:"identifier_name" struct:"identifier_name"`
}

const pluginName = "filestream"

// Plugin creates a new filestream input plugin for creating a stateful input.
Expand Down
Loading

0 comments on commit 2667073

Please sign in to comment.