Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add start_time and minimum_execution_duration fields to actions #1381

Merged
merged 9 commits into from
May 24, 2022
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ linters:
- goconst # finds repeated strings that could be replaced by a constant
- dupl # tool for code clone detection
- forbidigo # forbids identifiers matched by reg exps
- gomoddirectives # manage the use of 'replace', 'retract', and 'excludes' directives in go.mod.
#- gomoddirectives # manage the use of 'replace', 'retract', and 'excludes' directives in go.mod.
- gosimple # linter for Go source code that specializes in simplifying a code
- misspell # finds commonly misspelled English words in comments
- nakedret # finds naked returns in functions greater than a specified function length
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@

- Fleet server now supports Logstash type outputs for managed agents. {pull}1188[1188]
- Use the darwin-aarch64 as the suffix for Darwin Arm64 release {pull}1267[1267]
- Add start_time and minimum_execution_duration to actions to allow fleet-server to schedule agent actions. {pull}1381[1381]
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.2.0 // indirect
go.elastic.co/apm/module/apmgrpc v1.15.0 // indirect
go.elastic.co/apm/module/apmhttp v1.15.0 // indirect
go.elastic.co/fastjson v1.1.0 // indirect
Expand Down
67 changes: 50 additions & 17 deletions internal/pkg/action/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package action
import (
"context"
"sync"
"time"

"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
Expand All @@ -18,7 +19,7 @@ import (
)

type Sub struct {
agentId string
agentID string
seqNo sqn.SeqNo
ch chan []model.Action
}
Expand Down Expand Up @@ -52,21 +53,21 @@ func (d *Dispatcher) Run(ctx context.Context) (err error) {
}
}

func (d *Dispatcher) Subscribe(agentId string, seqNo sqn.SeqNo) *Sub {
func (d *Dispatcher) Subscribe(agentID string, seqNo sqn.SeqNo) *Sub {
cbCh := make(chan []model.Action, 1)

sub := Sub{
agentId: agentId,
agentID: agentID,
seqNo: seqNo,
ch: cbCh,
}

d.mx.Lock()
d.subs[agentId] = sub
d.subs[agentID] = sub
sz := len(d.subs)
d.mx.Unlock()

log.Trace().Str(logger.AgentId, agentId).Int("sz", sz).Msg("Subscribed to action dispatcher")
log.Trace().Str(logger.AgentId, agentID).Int("sz", sz).Msg("Subscribed to action dispatcher")

return &sub
}
Expand All @@ -77,11 +78,11 @@ func (d *Dispatcher) Unsubscribe(sub *Sub) {
}

d.mx.Lock()
delete(d.subs, sub.agentId)
delete(d.subs, sub.agentID)
sz := len(d.subs)
d.mx.Unlock()

log.Trace().Str(logger.AgentId, sub.agentId).Int("sz", sz).Msg("Unsubscribed from action dispatcher")
log.Trace().Str(logger.AgentId, sub.agentID).Int("sz", sz).Msg("Unsubscribed from action dispatcher")
}

func (d *Dispatcher) process(ctx context.Context, hits []es.HitT) {
Expand All @@ -96,31 +97,63 @@ func (d *Dispatcher) process(ctx context.Context, hits []es.HitT) {
log.Error().Err(err).Msg("Failed to unmarshal action document")
break
}
for _, agentId := range action.Agents {
arr := agentActions[agentId]
numAgents := len(action.Agents)
for i, agentID := range action.Agents {
arr := agentActions[agentID]
actionNoAgents := action
actionNoAgents.StartTime = offsetStartTime(action.StartTime, action.Expiration, action.MinimumExecutionDuration, i, numAgents)
actionNoAgents.Agents = nil
arr = append(arr, actionNoAgents)
agentActions[agentId] = arr
agentActions[agentID] = arr
}
}

for agentId, actions := range agentActions {
d.dispatch(ctx, agentId, actions)
for agentID, actions := range agentActions {
d.dispatch(ctx, agentID, actions)
}
}

func (d *Dispatcher) getSub(agentId string) (Sub, bool) {
// offsetStartTime will return a new start time between start:end-dur based on index i and the total number of agents
// An empty string will be returned if start or exp are empty or if there is an error parsing inputs
// As we expect i < total the latest return time will always be < exp-dur
func offsetStartTime(start, exp, dur string, i, total int) string {
if start == "" || exp == "" {
return ""
}
startTS, err := time.Parse(time.RFC3339, start) // TODO what format does a date-time string use?
michel-laterman marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we move the validation to the original unpack of the action instead? If there is an unserialization error this means that the contract between fleet and fleet-server is broken. I don't think we should even try to process that event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To summarize a quick conversation. We should catch this on the json unpacking, however the model.Action is generated by elastic/go-json-schema-generate which does not support mapping a (json) string/date-time to go's time.Time. The upstream generator we forked our code from has this as an open issue a-h/generate#69 so if we get around to it we should contribute it back

if err != nil {
log.Error().Err(err).Msg("unable to parse start_time string")
return ""
}
expTS, err := time.Parse(time.RFC3339, exp)
if err != nil {
log.Error().Err(err).Msg("unable to parse expiration string")
return ""
}
var d time.Duration
if dur != "" {
d, err = time.ParseDuration(dur)
if err != nil {
log.Error().Err(err).Msg("unable to parse minimum_execution_period string")
return ""
}
}
d = expTS.Add(-1 * d).Sub(startTS) // the valid scheduling range is: d = exp - dur - start
startTS = startTS.Add((d * time.Duration(i)) / time.Duration(total)) // adjust start to a position within the range
return startTS.Format(time.RFC3339)
}

func (d *Dispatcher) getSub(agentID string) (Sub, bool) {
d.mx.RLock()
sub, ok := d.subs[agentId]
sub, ok := d.subs[agentID]
d.mx.RUnlock()
return sub, ok
}

func (d *Dispatcher) dispatch(ctx context.Context, agentId string, acdocs []model.Action) {
sub, ok := d.getSub(agentId)
func (d *Dispatcher) dispatch(_ context.Context, agentID string, acdocs []model.Action) {
sub, ok := d.getSub(agentID)
if !ok {
log.Debug().Str(logger.AgentId, agentId).Msg("Agent is not currently connected. Not dispatching actions.")
log.Debug().Str(logger.AgentId, agentID).Msg("Agent is not currently connected. Not dispatching actions.")
return
}
select {
Expand Down
Loading