Skip to content

Commit

Permalink
Support for Elastic Agent V2 status (#1747)
Browse files Browse the repository at this point in the history
* Support for Elastic Agent V2 status

* Make 'make check-ci' happy

* Add a check that 'components' is valid array

* Rename variable to better reflect it's meaning
  • Loading branch information
aleksmaus authored Aug 16, 2022
1 parent 181295d commit eb8eca7
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 34 deletions.
64 changes: 62 additions & 2 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r
return err
}

// Compare agent_components content and update if different
rawComponents, err := parseComponents(zlog, agent, &req)
if err != nil {
return err
}

// Resolve AckToken from request, fallback on the agent record
seqno, err := ct.resolveSeqNo(ctx, zlog, req, agent)
if err != nil {
Expand Down Expand Up @@ -237,7 +243,7 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r
defer longPoll.Stop()

// Initial update on checkin, and any user fields that might have changed
err = ct.bc.CheckIn(agent.Id, req.Status, rawMeta, seqno, ver)
err = ct.bc.CheckIn(agent.Id, req.Status, req.Message, rawMeta, rawComponents, seqno, ver)
if err != nil {
zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed")
}
Expand Down Expand Up @@ -277,7 +283,7 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r
zlog.Trace().Msg("fire long poll")
break LOOP
case <-tick.C:
err := ct.bc.CheckIn(agent.Id, req.Status, nil, nil, ver)
err := ct.bc.CheckIn(agent.Id, req.Status, req.Message, nil, rawComponents, nil, ver)
if err != nil {
zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed")
}
Expand Down Expand Up @@ -555,6 +561,60 @@ func parseMeta(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]
return outMeta, nil
}

func parseComponents(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]byte, error) {

// Quick comparison first; compare the JSON payloads.
// If the data is not consistently normalized, this short-circuit will not work.
if bytes.Equal(req.Components, agent.Components) {
zlog.Trace().Msg("quick comparing agent components data is equal")
return nil, nil
}

// Deserialize the request components data
var reqComponents interface{}
if len(req.Components) > 0 {
if err := json.Unmarshal(req.Components, &reqComponents); err != nil {
return nil, errors.Wrap(err, "parseComponents request")
}
// Validate that components is an array
if _, ok := reqComponents.([]interface{}); !ok {
return nil, errors.Wrap(errors.New("components property is not array"), "parseComponents request")
}
}

// If empty, don't step on existing data
if reqComponents == nil {
return nil, nil
}

// Deserialize the agent's components copy
var agentComponents interface{}
if len(agent.Components) > 0 {
if err := json.Unmarshal(agent.Components, &agentComponents); err != nil {
return nil, errors.Wrap(err, "parseComponents local")
}
}

var outComponents []byte

// Compare the deserialized meta structures and return the bytes to update if different
if !reflect.DeepEqual(reqComponents, agentComponents) {

zlog.Trace().
RawJSON("oldComponents", agent.Components).
RawJSON("newComponents", req.Components).
Msg("local components data is not equal")

zlog.Info().
RawJSON("req.Components", req.Components).
Msg("applying new components data")

outComponents = req.Components
}

return outComponents, nil
}

func calcPollDuration(zlog zerolog.Logger, cfg *config.Server, setupDuration time.Duration) (time.Duration, time.Duration) {

pollDuration := cfg.Timeouts.CheckinLongPoll
Expand Down
10 changes: 6 additions & 4 deletions internal/pkg/api/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ type EnrollResponse struct {
}

type CheckinRequest struct {
Status string `json:"status"`
AckToken string `json:"ack_token,omitempty"`
Events []Event `json:"events"`
LocalMeta json.RawMessage `json:"local_metadata"`
Status string `json:"status"`
AckToken string `json:"ack_token,omitempty"`
Events []Event `json:"events"`
LocalMeta json.RawMessage `json:"local_metadata"`
Message string `json:"message"` // V2 Agent message
Components json.RawMessage `json:"components,omitempty"` // V2 Agent components
}

type CheckinResponse struct {
Expand Down
51 changes: 31 additions & 20 deletions internal/pkg/checkin/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,20 @@ func WithFlushInterval(d time.Duration) Opt {
}

type extraT struct {
meta []byte
seqNo sqn.SeqNo
ver string
meta []byte
seqNo sqn.SeqNo
ver string
components []byte
}

// Minimize the size of this structure.
// There will be 10's of thousands of items
// in the map at any point.
type pendingT struct {
ts string
status string
extra *extraT
ts string
status string
message string
extra *extraT
}

// Bulk will batch pending checkins and update elasticsearch at a set interval.
Expand Down Expand Up @@ -98,25 +100,27 @@ func (bc *Bulk) timestamp() string {
// CheckIn will add the agent (identified by id) to the pending set.
// The pending agents are sent to elasticsearch as a bulk update at each flush interval.
// WARNING: Bulk will take ownership of fields, so do not use after passing in.
func (bc *Bulk) CheckIn(id string, status string, meta []byte, seqno sqn.SeqNo, newVer string) error {
func (bc *Bulk) CheckIn(id string, status, message string, meta []byte, components []byte, seqno sqn.SeqNo, newVer string) error {
// Separate out the extra data to minimize
// the memory footprint of the 90% case of just
// updating the timestamp.
var extra *extraT
if meta != nil || seqno.IsSet() || newVer != "" {
if meta != nil || seqno.IsSet() || newVer != "" || components != nil {
extra = &extraT{
meta: meta,
seqNo: seqno,
ver: newVer,
meta: meta,
seqNo: seqno,
ver: newVer,
components: components,
}
}

bc.mut.Lock()

bc.pending[id] = pendingT{
ts: bc.timestamp(),
status: status,
extra: extra,
ts: bc.timestamp(),
status: status,
message: message,
extra: extra,
}

bc.mut.Unlock()
Expand Down Expand Up @@ -180,9 +184,10 @@ func (bc *Bulk) flush(ctx context.Context) error {
body, ok = simpleCache[pendingData]
if !ok {
fields := bulk.UpdateFields{
dl.FieldLastCheckin: pendingData.ts,
dl.FieldUpdatedAt: nowTimestamp,
dl.FieldLastCheckinStatus: pendingData.status,
dl.FieldLastCheckin: pendingData.ts,
dl.FieldUpdatedAt: nowTimestamp,
dl.FieldLastCheckinStatus: pendingData.status,
dl.FieldLastCheckinMessage: pendingData.message,
}
if body, err = fields.Marshal(); err != nil {
return err
Expand All @@ -192,9 +197,10 @@ func (bc *Bulk) flush(ctx context.Context) error {
} else {

fields := bulk.UpdateFields{
dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp
dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp
dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status
dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp
dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp
dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status
dl.FieldLastCheckinMessage: pendingData.message, // Set the status message
}

// If the agent version is not empty it needs to be updated
Expand All @@ -213,6 +219,11 @@ func (bc *Bulk) flush(ctx context.Context) error {
fields[dl.FieldLocalMetadata] = json.RawMessage(pendingData.extra.meta)
}

// Update components if provided
if pendingData.extra.components != nil {
fields[dl.FieldComponents] = json.RawMessage(pendingData.extra.components)
}

// If seqNo changed, set the field appropriately
if pendingData.extra.seqNo.IsSet() {
fields[dl.FieldActionSeqNo] = pendingData.extra.seqNo
Expand Down
25 changes: 17 additions & 8 deletions internal/pkg/checkin/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ func matchOp(tb testing.TB, c bulkcase, ts time.Time) func(ops []bulk.MultiOp) b
}

type bulkcase struct {
desc string
id string
status string
meta []byte
seqno sqn.SeqNo
ver string
desc string
id string
status string
meta []byte
components []byte
seqno sqn.SeqNo
ver string
}

func TestBulkSimple(t *testing.T) {
Expand All @@ -98,13 +99,15 @@ func TestBulkSimple(t *testing.T) {
"online",
nil,
nil,
nil,
"",
},
{
"Singled field case",
"singleFieldId",
"online",
[]byte(`{"hey":"now"}`),
[]byte(`[{"id":"winlog-default"}]`),
nil,
"",
},
Expand All @@ -113,6 +116,7 @@ func TestBulkSimple(t *testing.T) {
"multiFieldId",
"online",
[]byte(`{"hey":"now","brown":"cow"}`),
[]byte(`[{"id":"winlog-default","type":"winlog"}]`),
nil,
ver,
},
Expand All @@ -121,6 +125,7 @@ func TestBulkSimple(t *testing.T) {
"multiFieldNestedId",
"online",
[]byte(`{"hey":"now","wee":{"little":"doggie"}}`),
[]byte(`[{"id":"winlog-default","type":"winlog"}]`),
nil,
"",
},
Expand All @@ -129,6 +134,7 @@ func TestBulkSimple(t *testing.T) {
"simpleseqno",
"online",
nil,
nil,
sqn.SeqNo{1, 2, 3, 4},
ver,
},
Expand All @@ -137,6 +143,7 @@ func TestBulkSimple(t *testing.T) {
"simpleseqno",
"online",
[]byte(`{"uncle":"fester"}`),
[]byte(`[{"id":"log-default"}]`),
sqn.SeqNo{5, 6, 7, 8},
ver,
},
Expand All @@ -146,6 +153,7 @@ func TestBulkSimple(t *testing.T) {
"unusual",
nil,
nil,
nil,
"",
},
{
Expand All @@ -154,6 +162,7 @@ func TestBulkSimple(t *testing.T) {
"",
nil,
nil,
nil,
"",
},
}
Expand All @@ -165,7 +174,7 @@ func TestBulkSimple(t *testing.T) {
mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchOp(t, c, start)), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once()
bc := NewBulk(mockBulk)

if err := bc.CheckIn(c.id, c.status, c.meta, c.seqno, c.ver); err != nil {
if err := bc.CheckIn(c.id, c.status, "", c.meta, c.components, c.seqno, c.ver); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -203,7 +212,7 @@ func benchmarkBulk(n int, flush bool, b *testing.B) {
for i := 0; i < b.N; i++ {

for _, id := range ids {
err := bc.CheckIn(id, "", nil, nil, "")
err := bc.CheckIn(id, "", "", nil, nil, nil, "")
if err != nil {
b.Fatal(err)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/dl/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ const (
FieldCoordinatorIdx = "coordinator_idx"
FieldLastCheckin = "last_checkin"
FieldLastCheckinStatus = "last_checkin_status"
FieldLastCheckinMessage = "last_checkin_message"
FieldLocalMetadata = "local_metadata"
FieldComponents = "components"
FieldPolicyRevisionIdx = "policy_revision_idx"
FieldPolicyCoordinatorIdx = "policy_coordinator_idx"
FieldDefaultAPIKey = "default_api_key"
Expand Down
10 changes: 10 additions & 0 deletions internal/pkg/model/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions model/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,15 @@
"description": "Lst checkin status",
"type": "string"
},
"last_checkin_message": {
"description": "Last checkin message",
"type": "string"
},
"components": {
"description": "Elastic Agent components detailed status information",
"type": "object",
"format": "raw"
},
"default_api_key_id": {
"description": "ID of the API key the Elastic Agent uses to authenticate with elasticsearch",
"type": "string"
Expand Down

0 comments on commit eb8eca7

Please sign in to comment.