Skip to content

Commit

Permalink
Separate concerns
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Sep 26, 2024
1 parent b53999b commit 180311c
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 55 deletions.
25 changes: 15 additions & 10 deletions pkg/stanza/operator/input/windows/config_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,24 @@ func (c *Config) Build(set component.TelemetrySettings) (operator.Operator, erro
}

input := &Input{
InputOperator: inputOperator,
buffer: NewBuffer(),
channel: c.Channel,
maxReads: c.MaxReads,
startAt: c.StartAt,
pollInterval: c.PollInterval,
raw: c.Raw,
supressRenderingInfo: c.SupressRenderingInfo,
excludeProviders: excludeProvidersSet(c.ExcludeProviders),
remote: c.Remote,
InputOperator: inputOperator,
buffer: NewBuffer(),
channel: c.Channel,
maxReads: c.MaxReads,
startAt: c.StartAt,
pollInterval: c.PollInterval,
raw: c.Raw,
excludeProviders: excludeProvidersSet(c.ExcludeProviders),
remote: c.Remote,
}
input.startRemoteSession = input.defaultStartRemoteSession

if c.SupressRenderingInfo {
input.processEvent = input.processEventWithoutRenderingInfo
} else {
input.processEvent = input.processEventWithRenderingInfo
}

return input, nil
}

Expand Down
109 changes: 64 additions & 45 deletions pkg/stanza/operator/input/windows/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,23 @@ import (
// Input is an operator that creates entries using the windows event log api.
type Input struct {
helper.InputOperator
bookmark Bookmark
buffer Buffer
channel string
maxReads int
startAt string
raw bool
supressRenderingInfo bool
excludeProviders map[string]struct{}
pollInterval time.Duration
persister operator.Persister
publisherCache publisherCache
cancel context.CancelFunc
wg sync.WaitGroup
subscription Subscription
remote RemoteConfig
remoteSessionHandle windows.Handle
startRemoteSession func() error
bookmark Bookmark
buffer Buffer
channel string
maxReads int
startAt string
raw bool
excludeProviders map[string]struct{}
pollInterval time.Duration
persister operator.Persister
publisherCache publisherCache
cancel context.CancelFunc
wg sync.WaitGroup
subscription Subscription
remote RemoteConfig
remoteSessionHandle windows.Handle
startRemoteSession func() error
processEvent func(context.Context, Event)
}

// newInput creates a new Input operator.
Expand Down Expand Up @@ -231,51 +231,70 @@ func (i *Input) read(ctx context.Context) int {
return len(events)
}

// processEvent will process and send an event retrieved from windows event log.
func (i *Input) processEvent(ctx context.Context, event Event) {
func (i *Input) getPublisherName(event Event) (name string, excluded bool) {
providerName, err := event.GetPublisherName(i.buffer)
if err != nil {
i.Logger().Error("Failed to get provider name", zap.Error(err))
return
return "", true
}
if _, exclude := i.excludeProviders[providerName]; exclude {
return "", true
}

return providerName, false
}

func (i *Input) processSimple(ctx context.Context, event Event) {
simpleEvent, err := event.RenderSimple(i.buffer)
if err != nil {
i.Logger().Error("Failed to render simple event", zap.Error(err))
return
}
i.sendEvent(ctx, simpleEvent)
}

if i.supressRenderingInfo {
simpleEvent, simpleErr := event.RenderSimple(i.buffer)
if simpleErr != nil {
i.Logger().Error("Failed to render simple event", zap.Error(simpleErr))
return
}
i.sendEvent(ctx, simpleEvent)
func (i *Input) processDeep(ctx context.Context, event Event, publisher Publisher) {
deepEvent, err := event.RenderDeep(i.buffer, publisher)
if err == nil {
i.sendEvent(ctx, deepEvent)
return
}
i.Logger().Error("Failed to render formatted event", zap.Error(err))
i.processSimple(ctx, event)
}

publisher, openPublisherErr := i.publisherCache.get(providerName)
if openPublisherErr != nil {
// Do not return. Log error here and try to send as simple event later.
// processEvent will process and send an event retrieved from windows event log.
func (i *Input) processEventWithoutRenderingInfo(ctx context.Context, event Event) {
if len(i.excludeProviders) == 0 {
i.processSimple(ctx, event)
return
}
if _, exclude := i.getPublisherName(event); exclude {
return
}
i.processSimple(ctx, event)
}

func (i *Input) processEventWithRenderingInfo(ctx context.Context, event Event) {
providerName, exclude := i.getPublisherName(event)
if exclude {
return
}

publisher, err := i.publisherCache.get(providerName)
if err != nil {
i.Logger().Warn(
"Failed to open event source, respective log entries cannot be formatted",
zap.String("provider", providerName), zap.Error(openPublisherErr))
} else if publisher.Valid() {
deepEvent, deepErr := event.RenderDeep(i.buffer, publisher)
if deepErr != nil {
// Do not return. Log error here and try to send as simple event later.
i.Logger().Error("Failed to render formatted event", zap.Error(deepErr))
} else {
i.sendEvent(ctx, deepEvent)
return
}
zap.String("provider", providerName), zap.Error(err))
i.processSimple(ctx, event)
return
}

// Since we coudn't render the event deeply, send it as a simple event.
simpleEvent, err := event.RenderSimple(i.buffer)
if err != nil {
i.Logger().Error("Failed to render simple event as fallback", zap.Error(err))
if publisher.Valid() {
i.processDeep(ctx, event, publisher)
return
}
i.sendEvent(ctx, simpleEvent)
i.processSimple(ctx, event)
}

// sendEvent will send EventXML as an entry to the operator's output.
Expand Down

0 comments on commit 180311c

Please sign in to comment.