From eb8eca7e41256fc21c908340ee06b8c8175e43aa Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Tue, 16 Aug 2022 09:04:46 -0400 Subject: [PATCH] Support for Elastic Agent V2 status (#1747) * Support for Elastic Agent V2 status * Make 'make check-ci' happy * Add a check that 'components' is valid array * Rename variable to better reflect it's meaning --- internal/pkg/api/handleCheckin.go | 64 ++++++++++++++++++++++++++++++- internal/pkg/api/schema.go | 10 +++-- internal/pkg/checkin/bulk.go | 51 ++++++++++++++---------- internal/pkg/checkin/bulk_test.go | 25 ++++++++---- internal/pkg/dl/constants.go | 2 + internal/pkg/model/schema.go | 10 +++++ model/schema.json | 9 +++++ 7 files changed, 137 insertions(+), 34 deletions(-) diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 721ee4538..d3c1323e4 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -193,6 +193,12 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r return err } + // Compare agent_components content and update if different + rawComponents, err := parseComponents(zlog, agent, &req) + if err != nil { + return err + } + // Resolve AckToken from request, fallback on the agent record seqno, err := ct.resolveSeqNo(ctx, zlog, req, agent) if err != nil { @@ -237,7 +243,7 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r defer longPoll.Stop() // Initial update on checkin, and any user fields that might have changed - err = ct.bc.CheckIn(agent.Id, req.Status, rawMeta, seqno, ver) + err = ct.bc.CheckIn(agent.Id, req.Status, req.Message, rawMeta, rawComponents, seqno, ver) if err != nil { zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed") } @@ -277,7 +283,7 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r zlog.Trace().Msg("fire long poll") break LOOP case <-tick.C: - err := ct.bc.CheckIn(agent.Id, req.Status, nil, nil, ver) + err := ct.bc.CheckIn(agent.Id, req.Status, req.Message, nil, rawComponents, nil, ver) if err != nil { zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed") } @@ -555,6 +561,60 @@ func parseMeta(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([] return outMeta, nil } +func parseComponents(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]byte, error) { + + // Quick comparison first; compare the JSON payloads. + // If the data is not consistently normalized, this short-circuit will not work. + if bytes.Equal(req.Components, agent.Components) { + zlog.Trace().Msg("quick comparing agent components data is equal") + return nil, nil + } + + // Deserialize the request components data + var reqComponents interface{} + if len(req.Components) > 0 { + if err := json.Unmarshal(req.Components, &reqComponents); err != nil { + return nil, errors.Wrap(err, "parseComponents request") + } + // Validate that components is an array + if _, ok := reqComponents.([]interface{}); !ok { + return nil, errors.Wrap(errors.New("components property is not array"), "parseComponents request") + } + } + + // If empty, don't step on existing data + if reqComponents == nil { + return nil, nil + } + + // Deserialize the agent's components copy + var agentComponents interface{} + if len(agent.Components) > 0 { + if err := json.Unmarshal(agent.Components, &agentComponents); err != nil { + return nil, errors.Wrap(err, "parseComponents local") + } + } + + var outComponents []byte + + // Compare the deserialized meta structures and return the bytes to update if different + if !reflect.DeepEqual(reqComponents, agentComponents) { + + zlog.Trace(). + RawJSON("oldComponents", agent.Components). + RawJSON("newComponents", req.Components). + Msg("local components data is not equal") + + zlog.Info(). + RawJSON("req.Components", req.Components). + Msg("applying new components data") + + outComponents = req.Components + } + + return outComponents, nil +} + func calcPollDuration(zlog zerolog.Logger, cfg *config.Server, setupDuration time.Duration) (time.Duration, time.Duration) { pollDuration := cfg.Timeouts.CheckinLongPoll diff --git a/internal/pkg/api/schema.go b/internal/pkg/api/schema.go index 7a7e3a0f3..5d04d563e 100644 --- a/internal/pkg/api/schema.go +++ b/internal/pkg/api/schema.go @@ -78,10 +78,12 @@ type EnrollResponse struct { } type CheckinRequest struct { - Status string `json:"status"` - AckToken string `json:"ack_token,omitempty"` - Events []Event `json:"events"` - LocalMeta json.RawMessage `json:"local_metadata"` + Status string `json:"status"` + AckToken string `json:"ack_token,omitempty"` + Events []Event `json:"events"` + LocalMeta json.RawMessage `json:"local_metadata"` + Message string `json:"message"` // V2 Agent message + Components json.RawMessage `json:"components,omitempty"` // V2 Agent components } type CheckinResponse struct { diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index 3d510c8f0..8ece0547f 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -33,18 +33,20 @@ func WithFlushInterval(d time.Duration) Opt { } type extraT struct { - meta []byte - seqNo sqn.SeqNo - ver string + meta []byte + seqNo sqn.SeqNo + ver string + components []byte } // Minimize the size of this structure. // There will be 10's of thousands of items // in the map at any point. type pendingT struct { - ts string - status string - extra *extraT + ts string + status string + message string + extra *extraT } // Bulk will batch pending checkins and update elasticsearch at a set interval. @@ -98,25 +100,27 @@ func (bc *Bulk) timestamp() string { // CheckIn will add the agent (identified by id) to the pending set. // The pending agents are sent to elasticsearch as a bulk update at each flush interval. // WARNING: Bulk will take ownership of fields, so do not use after passing in. -func (bc *Bulk) CheckIn(id string, status string, meta []byte, seqno sqn.SeqNo, newVer string) error { +func (bc *Bulk) CheckIn(id string, status, message string, meta []byte, components []byte, seqno sqn.SeqNo, newVer string) error { // Separate out the extra data to minimize // the memory footprint of the 90% case of just // updating the timestamp. var extra *extraT - if meta != nil || seqno.IsSet() || newVer != "" { + if meta != nil || seqno.IsSet() || newVer != "" || components != nil { extra = &extraT{ - meta: meta, - seqNo: seqno, - ver: newVer, + meta: meta, + seqNo: seqno, + ver: newVer, + components: components, } } bc.mut.Lock() bc.pending[id] = pendingT{ - ts: bc.timestamp(), - status: status, - extra: extra, + ts: bc.timestamp(), + status: status, + message: message, + extra: extra, } bc.mut.Unlock() @@ -180,9 +184,10 @@ func (bc *Bulk) flush(ctx context.Context) error { body, ok = simpleCache[pendingData] if !ok { fields := bulk.UpdateFields{ - dl.FieldLastCheckin: pendingData.ts, - dl.FieldUpdatedAt: nowTimestamp, - dl.FieldLastCheckinStatus: pendingData.status, + dl.FieldLastCheckin: pendingData.ts, + dl.FieldUpdatedAt: nowTimestamp, + dl.FieldLastCheckinStatus: pendingData.status, + dl.FieldLastCheckinMessage: pendingData.message, } if body, err = fields.Marshal(); err != nil { return err @@ -192,9 +197,10 @@ func (bc *Bulk) flush(ctx context.Context) error { } else { fields := bulk.UpdateFields{ - dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp - dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp - dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status + dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp + dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp + dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status + dl.FieldLastCheckinMessage: pendingData.message, // Set the status message } // If the agent version is not empty it needs to be updated @@ -213,6 +219,11 @@ func (bc *Bulk) flush(ctx context.Context) error { fields[dl.FieldLocalMetadata] = json.RawMessage(pendingData.extra.meta) } + // Update components if provided + if pendingData.extra.components != nil { + fields[dl.FieldComponents] = json.RawMessage(pendingData.extra.components) + } + // If seqNo changed, set the field appropriately if pendingData.extra.seqNo.IsSet() { fields[dl.FieldActionSeqNo] = pendingData.extra.seqNo diff --git a/internal/pkg/checkin/bulk_test.go b/internal/pkg/checkin/bulk_test.go index 4254c1c89..af18eefd8 100644 --- a/internal/pkg/checkin/bulk_test.go +++ b/internal/pkg/checkin/bulk_test.go @@ -79,12 +79,13 @@ func matchOp(tb testing.TB, c bulkcase, ts time.Time) func(ops []bulk.MultiOp) b } type bulkcase struct { - desc string - id string - status string - meta []byte - seqno sqn.SeqNo - ver string + desc string + id string + status string + meta []byte + components []byte + seqno sqn.SeqNo + ver string } func TestBulkSimple(t *testing.T) { @@ -98,6 +99,7 @@ func TestBulkSimple(t *testing.T) { "online", nil, nil, + nil, "", }, { @@ -105,6 +107,7 @@ func TestBulkSimple(t *testing.T) { "singleFieldId", "online", []byte(`{"hey":"now"}`), + []byte(`[{"id":"winlog-default"}]`), nil, "", }, @@ -113,6 +116,7 @@ func TestBulkSimple(t *testing.T) { "multiFieldId", "online", []byte(`{"hey":"now","brown":"cow"}`), + []byte(`[{"id":"winlog-default","type":"winlog"}]`), nil, ver, }, @@ -121,6 +125,7 @@ func TestBulkSimple(t *testing.T) { "multiFieldNestedId", "online", []byte(`{"hey":"now","wee":{"little":"doggie"}}`), + []byte(`[{"id":"winlog-default","type":"winlog"}]`), nil, "", }, @@ -129,6 +134,7 @@ func TestBulkSimple(t *testing.T) { "simpleseqno", "online", nil, + nil, sqn.SeqNo{1, 2, 3, 4}, ver, }, @@ -137,6 +143,7 @@ func TestBulkSimple(t *testing.T) { "simpleseqno", "online", []byte(`{"uncle":"fester"}`), + []byte(`[{"id":"log-default"}]`), sqn.SeqNo{5, 6, 7, 8}, ver, }, @@ -146,6 +153,7 @@ func TestBulkSimple(t *testing.T) { "unusual", nil, nil, + nil, "", }, { @@ -154,6 +162,7 @@ func TestBulkSimple(t *testing.T) { "", nil, nil, + nil, "", }, } @@ -165,7 +174,7 @@ func TestBulkSimple(t *testing.T) { mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchOp(t, c, start)), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once() bc := NewBulk(mockBulk) - if err := bc.CheckIn(c.id, c.status, c.meta, c.seqno, c.ver); err != nil { + if err := bc.CheckIn(c.id, c.status, "", c.meta, c.components, c.seqno, c.ver); err != nil { t.Fatal(err) } @@ -203,7 +212,7 @@ func benchmarkBulk(n int, flush bool, b *testing.B) { for i := 0; i < b.N; i++ { for _, id := range ids { - err := bc.CheckIn(id, "", nil, nil, "") + err := bc.CheckIn(id, "", "", nil, nil, nil, "") if err != nil { b.Fatal(err) } diff --git a/internal/pkg/dl/constants.go b/internal/pkg/dl/constants.go index 14c5bc7a6..c0f1f2c0b 100644 --- a/internal/pkg/dl/constants.go +++ b/internal/pkg/dl/constants.go @@ -33,7 +33,9 @@ const ( FieldCoordinatorIdx = "coordinator_idx" FieldLastCheckin = "last_checkin" FieldLastCheckinStatus = "last_checkin_status" + FieldLastCheckinMessage = "last_checkin_message" FieldLocalMetadata = "local_metadata" + FieldComponents = "components" FieldPolicyRevisionIdx = "policy_revision_idx" FieldPolicyCoordinatorIdx = "policy_coordinator_idx" FieldDefaultAPIKey = "default_api_key" diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index fca90db0b..51856533e 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -124,6 +124,9 @@ type Agent struct { Active bool `json:"active"` Agent *AgentMetadata `json:"agent,omitempty"` + // Elastic Agent components detailed status information + Components json.RawMessage `json:"components,omitempty"` + // API key the Elastic Agent uses to authenticate with elasticsearch DefaultAPIKey string `json:"default_api_key,omitempty"` @@ -139,6 +142,9 @@ type Agent struct { // Date/time the Elastic Agent checked in last time LastCheckin string `json:"last_checkin,omitempty"` + // Last checkin message + LastCheckinMessage string `json:"last_checkin_message,omitempty"` + // Lst checkin status LastCheckinStatus string `json:"last_checkin_status,omitempty"` @@ -243,6 +249,10 @@ type Artifact struct { type Body struct { } +// Components Elastic Agent components detailed status information +type Components struct { +} + // Data The opaque payload. type Data struct { } diff --git a/model/schema.json b/model/schema.json index 09eda21a9..fe390db9e 100644 --- a/model/schema.json +++ b/model/schema.json @@ -454,6 +454,15 @@ "description": "Lst checkin status", "type": "string" }, + "last_checkin_message": { + "description": "Last checkin message", + "type": "string" + }, + "components": { + "description": "Elastic Agent components detailed status information", + "type": "object", + "format": "raw" + }, "default_api_key_id": { "description": "ID of the API key the Elastic Agent uses to authenticate with elasticsearch", "type": "string"