Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Elastic Agent V2 status #1747

Merged
merged 4 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r
return err
}

// Compare agent_components content and update if different
rawComponents, err := parseComponents(zlog, agent, &req)
if err != nil {
return err
}

// Resolve AckToken from request, fallback on the agent record
seqno, err := ct.resolveSeqNo(ctx, zlog, req, agent)
if err != nil {
Expand Down Expand Up @@ -237,7 +243,7 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r
defer longPoll.Stop()

// Initial update on checkin, and any user fields that might have changed
err = ct.bc.CheckIn(agent.Id, req.Status, rawMeta, seqno, ver)
err = ct.bc.CheckIn(agent.Id, req.Status, req.Message, rawMeta, rawComponents, seqno, ver)
if err != nil {
zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed")
}
Expand Down Expand Up @@ -277,7 +283,7 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r
zlog.Trace().Msg("fire long poll")
break LOOP
case <-tick.C:
err := ct.bc.CheckIn(agent.Id, req.Status, nil, nil, ver)
err := ct.bc.CheckIn(agent.Id, req.Status, req.Message, nil, rawComponents, nil, ver)
if err != nil {
zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed")
}
Expand Down Expand Up @@ -555,6 +561,60 @@ func parseMeta(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]
return outMeta, nil
}

func parseComponents(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]byte, error) {

// Quick comparison first; compare the JSON payloads.
// If the data is not consistently normalized, this short-circuit will not work.
if bytes.Equal(req.Components, agent.Components) {
zlog.Trace().Msg("quick comparing agent components data is equal")
return nil, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always forgot the bytes.Equal does check the len before doing the bytes check, I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach is copied from parseMeta, works fine for nils. It seems like the compiler is optimized for this as well
https://cs.opensource.google/go/go/+/refs/tags/go1.19:src/bytes/bytes.go;l=19


// Deserialize the request components data
var reqComponents interface{}
if len(req.Components) > 0 {
if err := json.Unmarshal(req.Components, &reqComponents); err != nil {
return nil, errors.Wrap(err, "parseComponents request")
}
// Validate that components is an array
if _, ok := reqComponents.([]interface{}); !ok {
return nil, errors.Wrap(errors.New("components property is not array"), "parseComponents request")
}
}

// If empty, don't step on existing data
if reqComponents == nil {
return nil, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aleksmaus We should probably assert the data we are receiving from Elastic Agent before sending that to Elasticsearch? Maybe we should also put some kind of limit, maybe a size based limit on the payload field to reduce the risk or document size explosion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach is basically copied from parseMeta that is used currently for "local_metadata" handling which is another blob that agent can send whatever to the fleet server.
There is already a limit check on the checking body size, default is defaultCheckinMaxBody = 1024 * 1024.
There is already the parsing of the components happens in the parseComponents which validates that is a parsable JSON. I can add a check if the components is an array.

Do we need to add field by field check? What are guarantees which fields are required vs which fields are optional?

Copy link
Member Author

@aleksmaus aleksmaus Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add strongly typed parsing to the "components" property tomorrow, only things that are specified in the V2 at the moment will be stored for the components, and it will need to be extended for anything extra in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, once everything is V2 we should be able to be more strict on any incoming data, not sure during the transition period we can do this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Will leave it as is for now until we stabilize the schema, can tighten validation later.


// Deserialize the agent's components copy
var agentComponents interface{}
if len(agent.Components) > 0 {
if err := json.Unmarshal(agent.Components, &agentComponents); err != nil {
return nil, errors.Wrap(err, "parseComponents local")
}
}

var outMeta []byte

// Compare the deserialized meta structures and return the bytes to update if different
if !reflect.DeepEqual(reqComponents, agentComponents) {

zlog.Trace().
RawJSON("oldComponents", agent.Components).
RawJSON("newComponents", req.Components).
Msg("local components data is not equal")

zlog.Info().
RawJSON("req.Components", req.Components).
Msg("applying new components data")

outMeta = req.Components
}

return outMeta, nil
}

func calcPollDuration(zlog zerolog.Logger, cfg *config.Server, setupDuration time.Duration) (time.Duration, time.Duration) {

pollDuration := cfg.Timeouts.CheckinLongPoll
Expand Down
10 changes: 6 additions & 4 deletions internal/pkg/api/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ type EnrollResponse struct {
}

type CheckinRequest struct {
Status string `json:"status"`
AckToken string `json:"ack_token,omitempty"`
Events []Event `json:"events"`
LocalMeta json.RawMessage `json:"local_metadata"`
Status string `json:"status"`
AckToken string `json:"ack_token,omitempty"`
Events []Event `json:"events"`
LocalMeta json.RawMessage `json:"local_metadata"`
Message string `json:"message"` // V2 Agent message
Components json.RawMessage `json:"components,omitempty"` // V2 Agent components
}

type CheckinResponse struct {
Expand Down
51 changes: 31 additions & 20 deletions internal/pkg/checkin/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,20 @@ func WithFlushInterval(d time.Duration) Opt {
}

type extraT struct {
meta []byte
seqNo sqn.SeqNo
ver string
meta []byte
seqNo sqn.SeqNo
ver string
components []byte
}

// Minimize the size of this structure.
// There will be 10's of thousands of items
// in the map at any point.
type pendingT struct {
ts string
status string
extra *extraT
ts string
status string
message string
extra *extraT
}

// Bulk will batch pending checkins and update elasticsearch at a set interval.
Expand Down Expand Up @@ -98,25 +100,27 @@ func (bc *Bulk) timestamp() string {
// CheckIn will add the agent (identified by id) to the pending set.
// The pending agents are sent to elasticsearch as a bulk update at each flush interval.
// WARNING: Bulk will take ownership of fields, so do not use after passing in.
func (bc *Bulk) CheckIn(id string, status string, meta []byte, seqno sqn.SeqNo, newVer string) error {
func (bc *Bulk) CheckIn(id string, status, message string, meta []byte, components []byte, seqno sqn.SeqNo, newVer string) error {
// Separate out the extra data to minimize
// the memory footprint of the 90% case of just
// updating the timestamp.
var extra *extraT
if meta != nil || seqno.IsSet() || newVer != "" {
if meta != nil || seqno.IsSet() || newVer != "" || components != nil {
extra = &extraT{
meta: meta,
seqNo: seqno,
ver: newVer,
meta: meta,
seqNo: seqno,
ver: newVer,
components: components,
}
}

bc.mut.Lock()

bc.pending[id] = pendingT{
ts: bc.timestamp(),
status: status,
extra: extra,
ts: bc.timestamp(),
status: status,
message: message,
extra: extra,
}

bc.mut.Unlock()
Expand Down Expand Up @@ -180,9 +184,10 @@ func (bc *Bulk) flush(ctx context.Context) error {
body, ok = simpleCache[pendingData]
if !ok {
fields := bulk.UpdateFields{
dl.FieldLastCheckin: pendingData.ts,
dl.FieldUpdatedAt: nowTimestamp,
dl.FieldLastCheckinStatus: pendingData.status,
dl.FieldLastCheckin: pendingData.ts,
dl.FieldUpdatedAt: nowTimestamp,
dl.FieldLastCheckinStatus: pendingData.status,
dl.FieldLastCheckinMessage: pendingData.message,
}
if body, err = fields.Marshal(); err != nil {
return err
Expand All @@ -192,9 +197,10 @@ func (bc *Bulk) flush(ctx context.Context) error {
} else {

fields := bulk.UpdateFields{
dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp
dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp
dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status
dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp
dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp
dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status
dl.FieldLastCheckinMessage: pendingData.message, // Set the status message
}

// If the agent version is not empty it needs to be updated
Expand All @@ -213,6 +219,11 @@ func (bc *Bulk) flush(ctx context.Context) error {
fields[dl.FieldLocalMetadata] = json.RawMessage(pendingData.extra.meta)
}

// Update components if provided
if pendingData.extra.components != nil {
fields[dl.FieldComponents] = json.RawMessage(pendingData.extra.components)
}

// If seqNo changed, set the field appropriately
if pendingData.extra.seqNo.IsSet() {
fields[dl.FieldActionSeqNo] = pendingData.extra.seqNo
Expand Down
25 changes: 17 additions & 8 deletions internal/pkg/checkin/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ func matchOp(tb testing.TB, c bulkcase, ts time.Time) func(ops []bulk.MultiOp) b
}

type bulkcase struct {
desc string
id string
status string
meta []byte
seqno sqn.SeqNo
ver string
desc string
id string
status string
meta []byte
components []byte
seqno sqn.SeqNo
ver string
}

func TestBulkSimple(t *testing.T) {
Expand All @@ -98,13 +99,15 @@ func TestBulkSimple(t *testing.T) {
"online",
nil,
nil,
nil,
"",
},
{
"Singled field case",
"singleFieldId",
"online",
[]byte(`{"hey":"now"}`),
[]byte(`[{"id":"winlog-default"}]`),
nil,
"",
},
Expand All @@ -113,6 +116,7 @@ func TestBulkSimple(t *testing.T) {
"multiFieldId",
"online",
[]byte(`{"hey":"now","brown":"cow"}`),
[]byte(`[{"id":"winlog-default","type":"winlog"}]`),
nil,
ver,
},
Expand All @@ -121,6 +125,7 @@ func TestBulkSimple(t *testing.T) {
"multiFieldNestedId",
"online",
[]byte(`{"hey":"now","wee":{"little":"doggie"}}`),
[]byte(`[{"id":"winlog-default","type":"winlog"}]`),
nil,
"",
},
Expand All @@ -129,6 +134,7 @@ func TestBulkSimple(t *testing.T) {
"simpleseqno",
"online",
nil,
nil,
sqn.SeqNo{1, 2, 3, 4},
ver,
},
Expand All @@ -137,6 +143,7 @@ func TestBulkSimple(t *testing.T) {
"simpleseqno",
"online",
[]byte(`{"uncle":"fester"}`),
[]byte(`[{"id":"log-default"}]`),
sqn.SeqNo{5, 6, 7, 8},
ver,
},
Expand All @@ -146,6 +153,7 @@ func TestBulkSimple(t *testing.T) {
"unusual",
nil,
nil,
nil,
"",
},
{
Expand All @@ -154,6 +162,7 @@ func TestBulkSimple(t *testing.T) {
"",
nil,
nil,
nil,
"",
},
}
Expand All @@ -165,7 +174,7 @@ func TestBulkSimple(t *testing.T) {
mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchOp(t, c, start)), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once()
bc := NewBulk(mockBulk)

if err := bc.CheckIn(c.id, c.status, c.meta, c.seqno, c.ver); err != nil {
if err := bc.CheckIn(c.id, c.status, "", c.meta, c.components, c.seqno, c.ver); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -203,7 +212,7 @@ func benchmarkBulk(n int, flush bool, b *testing.B) {
for i := 0; i < b.N; i++ {

for _, id := range ids {
err := bc.CheckIn(id, "", nil, nil, "")
err := bc.CheckIn(id, "", "", nil, nil, nil, "")
if err != nil {
b.Fatal(err)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/dl/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ const (
FieldCoordinatorIdx = "coordinator_idx"
FieldLastCheckin = "last_checkin"
FieldLastCheckinStatus = "last_checkin_status"
FieldLastCheckinMessage = "last_checkin_message"
FieldLocalMetadata = "local_metadata"
FieldComponents = "components"
FieldPolicyRevisionIdx = "policy_revision_idx"
FieldPolicyCoordinatorIdx = "policy_coordinator_idx"
FieldDefaultAPIKey = "default_api_key"
Expand Down
10 changes: 10 additions & 0 deletions internal/pkg/model/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions model/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,15 @@
"description": "Lst checkin status",
"type": "string"
},
"last_checkin_message": {
"description": "Last checkin message",
"type": "string"
},
"components": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joshdover Do we want to store the status, message, and components as they change across time in a datastream so we can track the changes of an Elastic Agent across time? I think from an alerting standpoint it would be beneficial but from a scalability it would add more data and load.

"description": "Elastic Agent components detailed status information",
"type": "object",
"format": "raw"
},
"default_api_key_id": {
"description": "ID of the API key the Elastic Agent uses to authenticate with elasticsearch",
"type": "string"
Expand Down