From ca9041fced78f04541fe7dbc7e8495706608355d Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Tue, 20 Sep 2022 17:08:10 +0200 Subject: [PATCH] Fix and reintroduce "Allow multiple ES outputs as long as they are the same ES" (#1879) * Revert "Revert "Fix v8.5.0 migration painless script" (#1878)" This reverts commit ef9ca2bd78641e48e8ac2356989dfbfeebc4efa6. * Revert "Revert "Allow multiple ES outputs as long as they are the same ES (#1684)"" This reverts commit bb696ac0be53b6ef4a9143eff12a5a3263400585. * avoid new API keys being marked for invalidation Co-authored-by: Michal Pristas He fixed the merge conflicts after Bulk API Keys update (#1779), commit 46ac14bafa6a52366ca2ef528b2bef18636fc43d, got merged --- cmd/fleet/main.go | 14 +- internal/pkg/api/handleAck.go | 111 +++--- internal/pkg/api/handleAck_test.go | 55 ++- internal/pkg/api/handleCheckin.go | 15 +- internal/pkg/api/handleEnroll.go | 11 +- internal/pkg/apikey/apikey.go | 67 +++- .../pkg/apikey/apikey_integration_test.go | 98 ++++- internal/pkg/apikey/create.go | 1 - internal/pkg/apikey/get.go | 74 ---- internal/pkg/apikey/invalidate.go | 1 - internal/pkg/apikey/metadata.go | 20 +- internal/pkg/bulk/opBulk.go | 16 +- internal/pkg/coordinator/monitor.go | 17 +- .../coordinator/monitor_integration_test.go | 23 +- internal/pkg/dl/agent.go | 22 +- internal/pkg/dl/agent_integration_test.go | 45 +++ internal/pkg/dl/constants.go | 33 +- internal/pkg/dl/migration.go | 290 ++++++++++---- internal/pkg/dl/migration_integration_test.go | 295 +++++++++++++++ internal/pkg/es/bulk_update_api_key.go | 95 +++-- internal/pkg/es/error.go | 18 +- internal/pkg/model/ext.go | 28 +- internal/pkg/model/ext_test.go | 55 ++- internal/pkg/model/schema.go | 53 ++- internal/pkg/policy/parsed_policy.go | 10 +- internal/pkg/policy/parsed_policy_test.go | 1 - internal/pkg/policy/policy_output.go | 356 +++++++++++------- .../policy/policy_output_integration_test.go | 200 ++++++++++ internal/pkg/policy/policy_output_test.go | 135 +++++-- internal/pkg/testing/esutil/bootstrap.go | 2 +- internal/pkg/testing/setup.go | 16 +- model/schema.json | 89 ++++- 32 files changed, 1708 insertions(+), 558 deletions(-) delete mode 100644 internal/pkg/apikey/get.go create mode 100644 internal/pkg/dl/migration_integration_test.go create mode 100644 internal/pkg/policy/policy_output_integration_test.go diff --git a/cmd/fleet/main.go b/cmd/fleet/main.go index a28ec50c3..976564cb8 100644 --- a/cmd/fleet/main.go +++ b/cmd/fleet/main.go @@ -821,17 +821,21 @@ func (f *FleetServer) runSubsystems(ctx context.Context, cfg *config.Config, g * remoteVersion, err := ver.CheckCompatibility(ctx, esCli, f.bi.Version) if err != nil { if len(remoteVersion) != 0 { - return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w", f.bi.Version, remoteVersion, err) + return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w", + f.bi.Version, remoteVersion, err) } return fmt.Errorf("failed version compatibility check with elasticsearch: %w", err) } - // Run migrations; current safe to do in background. That may change in the future. - g.Go(loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error { + // Run migrations + loggedMigration := loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error { return dl.Migrate(ctx, bulker) - })) + }) + if err = loggedMigration(); err != nil { + return fmt.Errorf("failed to run subsystems: %w", err) + } - // Run schduler for periodic GC/cleanup + // Run scheduler for periodic GC/cleanup gcCfg := cfg.Inputs[0].Server.GC sched, err := scheduler.New(gc.Schedules(bulker, gcCfg.ScheduleInterval, gcCfg.CleanupAfterExpiredInterval)) if err != nil { diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 8ba12cea1..c69a65b2c 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -15,6 +15,11 @@ import ( "strings" "time" + "github.com/julienschmidt/httprouter" + "github.com/pkg/errors" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/config" @@ -25,11 +30,6 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/policy" "github.com/elastic/fleet-server/v7/internal/pkg/smap" - "github.com/pkg/errors" - - "github.com/julienschmidt/httprouter" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" ) type HTTPError struct { @@ -338,8 +338,9 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag Int64("rev.coordinatorIdx", rev.CoordinatorIdx). Msg("ack policy revision") - if ok && rev.PolicyID == agent.PolicyID && (rev.RevisionIdx > currRev || - (rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) { + if ok && rev.PolicyID == agent.PolicyID && + (rev.RevisionIdx > currRev || + (rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) { found = true currRev = rev.RevisionIdx currCoord = rev.CoordinatorIdx @@ -350,12 +351,39 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag return nil } - if agent.DefaultAPIKeyID != "" { - res, err := ack.bulk.APIKeyRead(ctx, agent.DefaultAPIKeyID, true) + for _, output := range agent.Outputs { + if output.Type != policy.OutputTypeElasticsearch { + continue + } + + err := ack.updateAPIKey(ctx, + zlog, + agent.Id, + currRev, currCoord, + agent.PolicyID, + output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds) + if err != nil { + return err + } + } + + return nil + +} + +func (ack *AckT) updateAPIKey(ctx context.Context, + zlog zerolog.Logger, + agentID string, + currRev, currCoord int64, + policyID, apiKeyID, permissionHash string, + toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems) error { + + if apiKeyID != "" { + res, err := ack.bulk.APIKeyRead(ctx, apiKeyID, true) if err != nil { zlog.Error(). Err(err). - Str(LogAPIKeyID, agent.DefaultAPIKeyID). + Str(LogAPIKeyID, apiKeyID). Msg("Failed to read API Key roles") } else { clean, removedRolesCount, err := cleanRoles(res.RoleDescriptors) @@ -363,41 +391,26 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag zlog.Error(). Err(err). RawJSON("roles", res.RoleDescriptors). - Str(LogAPIKeyID, agent.DefaultAPIKeyID). + Str(LogAPIKeyID, apiKeyID). Msg("Failed to cleanup roles") } else if removedRolesCount > 0 { - if err := ack.bulk.APIKeyUpdate(ctx, agent.DefaultAPIKeyID, agent.PolicyOutputPermissionsHash, clean); err != nil { - zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, agent.DefaultAPIKeyID).Msg("Failed to update API Key") + if err := ack.bulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil { + zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Msg("Failed to update API Key") } else { zlog.Debug(). - Str("hash.sha256", agent.PolicyOutputPermissionsHash). - Str(LogAPIKeyID, agent.DefaultAPIKeyID). + Str("hash.sha256", permissionHash). + Str(LogAPIKeyID, apiKeyID). RawJSON("roles", clean). Int("removedRoles", removedRolesCount). Msg("Updating agent record to pick up reduced roles.") } } } - } - - sz := len(agent.DefaultAPIKeyHistory) - if sz > 0 { - ids := make([]string, sz) - for i := 0; i < sz; i++ { - if agent.DefaultAPIKeyHistory[i].ID == agent.DefaultAPIKeyID { - // already updated - continue - } - ids[i] = agent.DefaultAPIKeyHistory[i].ID - } - log.Info().Strs("ids", ids).Msg("Invalidate old API keys") - if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil { - log.Warn().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys") - } + ack.invalidateAPIKeys(ctx, toRetireAPIKeyIDs, apiKeyID) } body := makeUpdatePolicyBody( - agent.PolicyID, + policyID, currRev, currCoord, ) @@ -405,14 +418,14 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag err := ack.bulk.Update( ctx, dl.FleetAgents, - agent.Id, + agentID, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3), ) zlog.Err(err). - Str(LogPolicyID, agent.PolicyID). + Str(LogPolicyID, policyID). Int64("policyRevision", currRev). Int64("policyCoordinator", currCoord). Msg("ack policy") @@ -445,8 +458,25 @@ func cleanRoles(roles json.RawMessage) (json.RawMessage, int, error) { return r, len(keys), errors.Wrap(err, "failed to marshal resulting role definition") } +func (ack *AckT) invalidateAPIKeys(ctx context.Context, toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, skip string) { + ids := make([]string, 0, len(toRetireAPIKeyIDs)) + for _, k := range toRetireAPIKeyIDs { + if k.ID == skip || k.ID == "" { + continue + } + ids = append(ids, k.ID) + } + + if len(ids) > 0 { + log.Info().Strs("fleet.policy.apiKeyIDsToRetire", ids).Msg("Invalidate old API keys") + if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil { + log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys") + } + } +} + func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error { - apiKeys := _getAPIKeyIDs(agent) + apiKeys := agent.APIKeyIDs() if len(apiKeys) > 0 { zlog = zlog.With().Strs(LogAPIKeyID, apiKeys).Logger() @@ -501,17 +531,6 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent * return nil } -func _getAPIKeyIDs(agent *model.Agent) []string { - keys := make([]string, 0, 1) - if agent.AccessAPIKeyID != "" { - keys = append(keys, agent.AccessAPIKeyID) - } - if agent.DefaultAPIKeyID != "" { - keys = append(keys, agent.DefaultAPIKeyID) - } - return keys -} - // Generate an update script that validates that the policy_id // has not changed underneath us by an upstream process (Kibana or otherwise). // We have a race condition where a user could have assigned a new policy to diff --git a/internal/pkg/api/handleAck_test.go b/internal/pkg/api/handleAck_test.go index 90c961456..29c678c24 100644 --- a/internal/pkg/api/handleAck_test.go +++ b/internal/pkg/api/handleAck_test.go @@ -15,13 +15,14 @@ import ( "net/http" "testing" + "github.com/google/go-cmp/cmp" + "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" - "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -439,3 +440,55 @@ func TestHandleAckEvents(t *testing.T) { }) } } + +func TestInvalidateAPIKeys(t *testing.T) { + toRetire1 := []model.ToRetireAPIKeyIdsItems{{ + ID: "toRetire1", + }} + toRetire2 := []model.ToRetireAPIKeyIdsItems{{ + ID: "toRetire2_0", + }, { + ID: "toRetire2_1", + }} + var toRetire3 []model.ToRetireAPIKeyIdsItems + + skips := map[string]string{ + "1": "toRetire1", + "2": "toRetire2_0", + "3": "", + } + wants := map[string][]string{ + "1": {}, + "2": {"toRetire2_1"}, + "3": {}, + } + + agent := model.Agent{ + Outputs: map[string]*model.PolicyOutput{ + "1": {ToRetireAPIKeyIds: toRetire1}, + "2": {ToRetireAPIKeyIds: toRetire2}, + "3": {ToRetireAPIKeyIds: toRetire3}, + }, + } + + for i, out := range agent.Outputs { + skip := skips[i] + want := wants[i] + + bulker := ftesting.NewMockBulk() + if len(want) > 0 { + bulker.On("APIKeyInvalidate", + context.Background(), mock.MatchedBy(func(ids []string) bool { + // if A contains B and B contains A => A = B + return assert.Subset(t, ids, want) && + assert.Subset(t, want, ids) + })). + Return(nil) + } + + ack := &AckT{bulk: bulker} + ack.invalidateAPIKeys(context.Background(), out.ToRetireAPIKeyIds, skip) + + bulker.AssertExpectations(t) + } +} diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 721ee4538..2752dd147 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -10,6 +10,7 @@ import ( "compress/gzip" "context" "encoding/json" + "fmt" "math/rand" "net/http" "reflect" @@ -60,7 +61,6 @@ func (rt Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httpro Logger() err := rt.ct.handleCheckin(&zlog, w, r, id) - if err != nil { cntCheckin.IncError(err) resp := NewHTTPErrResp(err) @@ -430,13 +430,13 @@ func convertActions(agentID string, actions []model.Action) ([]ActionResp, strin // func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agentID string, pp *policy.ParsedPolicy) (*ActionResp, error) { zlog = zlog.With(). - Str("ctx", "processPolicy"). - Int64("policyRevision", pp.Policy.RevisionIdx). - Int64("policyCoordinator", pp.Policy.CoordinatorIdx). + Str("fleet.ctx", "processPolicy"). + Int64("fleet.policyRevision", pp.Policy.RevisionIdx). + Int64("fleet.policyCoordinator", pp.Policy.CoordinatorIdx). Str(LogPolicyID, pp.Policy.PolicyID). Logger() - // Repull and decode the agent object. Do not trust the cache. + // Repull and decode the agent object. Do not trust the cache. agent, err := dl.FindAgent(ctx, bulker, dl.QueryAgentByID, dl.FieldID, agentID) if err != nil { zlog.Error().Err(err).Msg("fail find agent record") @@ -446,7 +446,6 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a // Parse the outputs maps in order to prepare the outputs const outputsProperty = "outputs" outputs, err := smap.Parse(pp.Fields[outputsProperty]) - if err != nil { return nil, err } @@ -458,9 +457,9 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a // Iterate through the policy outputs and prepare them for _, policyOutput := range pp.Outputs { err = policyOutput.Prepare(ctx, zlog, bulker, &agent, outputs) - if err != nil { - return nil, err + return nil, fmt.Errorf("failed to prepare output %q: %w", + policyOutput.Name, err) } } diff --git a/internal/pkg/api/handleEnroll.go b/internal/pkg/api/handleEnroll.go index e5575e3b3..9123723d6 100644 --- a/internal/pkg/api/handleEnroll.go +++ b/internal/pkg/api/handleEnroll.go @@ -53,7 +53,6 @@ type EnrollerT struct { } func NewEnrollerT(verCon version.Constraints, cfg *config.Server, bulker bulk.Bulk, c cache.Cache) (*EnrollerT, error) { - log.Info(). Interface("limits", cfg.Limits.EnrollLimit). Msg("Setting config enroll_limit") @@ -187,7 +186,13 @@ func (et *EnrollerT) processRequest(rb *rollback.Rollback, zlog zerolog.Logger, return et._enroll(r.Context(), rb, zlog, req, erec.PolicyID, ver) } -func (et *EnrollerT) _enroll(ctx context.Context, rb *rollback.Rollback, zlog zerolog.Logger, req *EnrollRequest, policyID, ver string) (*EnrollResponse, error) { +func (et *EnrollerT) _enroll( + ctx context.Context, + rb *rollback.Rollback, + zlog zerolog.Logger, + req *EnrollRequest, + policyID, + ver string) (*EnrollResponse, error) { if req.SharedID != "" { // TODO: Support pre-existing install @@ -427,7 +432,7 @@ func generateAccessAPIKey(ctx context.Context, bulk bulk.Bulk, agentID string) ( agentID, "", []byte(kFleetAccessRolesJSON), - apikey.NewMetadata(agentID, apikey.TypeAccess), + apikey.NewMetadata(agentID, "", apikey.TypeAccess), ) } diff --git a/internal/pkg/apikey/apikey.go b/internal/pkg/apikey/apikey.go index 4924a647b..05551f272 100644 --- a/internal/pkg/apikey/apikey.go +++ b/internal/pkg/apikey/apikey.go @@ -6,12 +6,17 @@ package apikey import ( + "context" "encoding/base64" - "errors" + "encoding/json" "fmt" "net/http" "strings" "unicode/utf8" + + "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/pkg/errors" ) const ( @@ -28,6 +33,66 @@ var ( var AuthKey = http.CanonicalHeaderKey("Authorization") +// APIKeyMetadata tracks Metadata associated with an APIKey. +type APIKeyMetadata struct { + ID string + Metadata Metadata + RoleDescriptors json.RawMessage +} + +// Read gathers APIKeyMetadata from Elasticsearch using the given client. +func Read(ctx context.Context, client *elasticsearch.Client, id string, withOwner bool) (*APIKeyMetadata, error) { + + opts := []func(*esapi.SecurityGetAPIKeyRequest){ + client.Security.GetAPIKey.WithContext(ctx), + client.Security.GetAPIKey.WithID(id), + } + if withOwner { + opts = append(opts, client.Security.GetAPIKey.WithOwner(true)) + } + + res, err := client.Security.GetAPIKey( + opts..., + ) + + if err != nil { + return nil, fmt.Errorf("request to elasticsearch failed: %w", err) + } + defer res.Body.Close() + + if res.IsError() { + return nil, fmt.Errorf("%s: %w", res.String(), ErrAPIKeyNotFound) + } + + type APIKeyResponse struct { + ID string `json:"id"` + Metadata Metadata `json:"metadata"` + RoleDescriptors json.RawMessage `json:"role_descriptors"` + } + type GetAPIKeyResponse struct { + APIKeys []APIKeyResponse `json:"api_keys"` + } + + var resp GetAPIKeyResponse + d := json.NewDecoder(res.Body) + if err = d.Decode(&resp); err != nil { + return nil, fmt.Errorf( + "could not decode elasticsearch GetAPIKeyResponse: %w", err) + } + + if len(resp.APIKeys) == 0 { + return nil, ErrAPIKeyNotFound + } + + first := resp.APIKeys[0] + + return &APIKeyMetadata{ + ID: first.ID, + Metadata: first.Metadata, + RoleDescriptors: first.RoleDescriptors, + }, nil +} + // APIKey is used to represent an Elasticsearch API Key. type APIKey struct { ID string diff --git a/internal/pkg/apikey/apikey_integration_test.go b/internal/pkg/apikey/apikey_integration_test.go index 4647abcea..ce4529254 100644 --- a/internal/pkg/apikey/apikey_integration_test.go +++ b/internal/pkg/apikey/apikey_integration_test.go @@ -30,7 +30,7 @@ const testFleetRoles = ` } ` -func TestCreateAPIKeyWithMetadata(t *testing.T) { +func TestRead_existingKey(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() @@ -48,7 +48,7 @@ func TestCreateAPIKeyWithMetadata(t *testing.T) { agentID := uuid.Must(uuid.NewV4()).String() name := uuid.Must(uuid.NewV4()).String() akey, err := Create(ctx, es, name, "", "true", []byte(testFleetRoles), - NewMetadata(agentID, TypeAccess)) + NewMetadata(agentID, "", TypeAccess)) if err != nil { t.Fatal(err) } @@ -79,9 +79,99 @@ func TestCreateAPIKeyWithMetadata(t *testing.T) { t.Error(diff) } - // Try to get the key that doesn't exists, expect ErrApiKeyNotFound +} + +func TestRead_noKey(t *testing.T) { + ctx, cn := context.WithCancel(context.Background()) + defer cn() + + cfg := elasticsearch.Config{ + Username: "elastic", + Password: "changeme", + } + + es, err := elasticsearch.NewClient(cfg) + if err != nil { + t.Fatal(err) + } + + // Try to get the key that doesn't exist, expect ErrApiKeyNotFound _, err = Read(ctx, es, "0000000000000", false) if !errors.Is(err, ErrAPIKeyNotFound) { - t.Errorf("Unexpected error type: %v", err) + t.Errorf("Unexpected error: %v", err) + } +} + +func TestCreateAPIKeyWithMetadata(t *testing.T) { + tts := []struct { + name string + outputName string + }{ + {name: "with metadata.output_name", outputName: "a_output_name"}, + {name: "without metadata.output_name"}, + } + + for _, tt := range tts { + t.Run(tt.name, func(t *testing.T) { + ctx, cn := context.WithCancel(context.Background()) + defer cn() + + cfg := elasticsearch.Config{ + Username: "elastic", + Password: "changeme", + } + + es, err := elasticsearch.NewClient(cfg) + if err != nil { + t.Fatal(err) + } + + // Create the API key + agentID := uuid.Must(uuid.NewV4()).String() + name := uuid.Must(uuid.NewV4()).String() + outputName := tt.outputName + apiKey, err := Create( + ctx, + es, + name, + "", + "true", + []byte(testFleetRoles), + NewMetadata(agentID, outputName, TypeAccess)) + if err != nil { + t.Fatal(err) + } + + // Get the API key and verify that the metadata was saved correctly + aKeyMeta, err := Read(ctx, es, apiKey.ID, false) + if err != nil { + t.Fatal(err) + } + + diff := cmp.Diff(ManagedByFleetServer, aKeyMeta.Metadata.ManagedBy) + if diff != "" { + t.Error(diff) + } + + diff = cmp.Diff(true, aKeyMeta.Metadata.Managed) + if diff != "" { + t.Error(diff) + } + + diff = cmp.Diff(agentID, aKeyMeta.Metadata.AgentID) + if diff != "" { + t.Error(diff) + } + + diff = cmp.Diff(outputName, aKeyMeta.Metadata.OutputName) + if diff != "" { + t.Error(diff) + } + + diff = cmp.Diff(TypeAccess.String(), aKeyMeta.Metadata.Type) + if diff != "" { + t.Error(diff) + } + }) } } diff --git a/internal/pkg/apikey/create.go b/internal/pkg/apikey/create.go index f3cee99f8..de61390c3 100644 --- a/internal/pkg/apikey/create.go +++ b/internal/pkg/apikey/create.go @@ -42,7 +42,6 @@ func Create(ctx context.Context, client *elasticsearch.Client, name, ttl, refres bytes.NewReader(body), opts..., ) - if err != nil { return nil, err } diff --git a/internal/pkg/apikey/get.go b/internal/pkg/apikey/get.go deleted file mode 100644 index 9d45f2908..000000000 --- a/internal/pkg/apikey/get.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package apikey - -import ( - "context" - "encoding/json" - - "github.com/elastic/go-elasticsearch/v7" - "github.com/elastic/go-elasticsearch/v7/esapi" - "github.com/pkg/errors" -) - -// APIKetMetadata tracks Metadata associated with an APIKey. -type APIKeyMetadata struct { - ID string - Metadata Metadata - RoleDescriptors json.RawMessage -} - -// Read gathers APIKeyMetadata from Elasticsearch using the given client. -func Read(ctx context.Context, client *elasticsearch.Client, id string, withOwner bool) (*APIKeyMetadata, error) { - - opts := []func(*esapi.SecurityGetAPIKeyRequest){ - client.Security.GetAPIKey.WithContext(ctx), - client.Security.GetAPIKey.WithID(id), - } - if withOwner { - opts = append(opts, client.Security.GetAPIKey.WithOwner(true)) - } - - res, err := client.Security.GetAPIKey( - opts..., - ) - - if err != nil { - return nil, err - } - defer res.Body.Close() - - if res.IsError() { - err = errors.Wrap(ErrAPIKeyNotFound, res.String()) - return nil, err - } - - type APIKeyResponse struct { - ID string `json:"id"` - Metadata Metadata `json:"metadata"` - RoleDescriptors json.RawMessage `json:"role_descriptors"` - } - type GetAPIKeyResponse struct { - APIKeys []APIKeyResponse `json:"api_keys"` - } - - var resp GetAPIKeyResponse - d := json.NewDecoder(res.Body) - if err = d.Decode(&resp); err != nil { - return nil, err - } - - if len(resp.APIKeys) == 0 { - return nil, ErrAPIKeyNotFound - } - - first := resp.APIKeys[0] - - return &APIKeyMetadata{ - ID: first.ID, - Metadata: first.Metadata, - RoleDescriptors: first.RoleDescriptors, - }, nil -} diff --git a/internal/pkg/apikey/invalidate.go b/internal/pkg/apikey/invalidate.go index 421662388..6c5d5d304 100644 --- a/internal/pkg/apikey/invalidate.go +++ b/internal/pkg/apikey/invalidate.go @@ -38,7 +38,6 @@ func Invalidate(ctx context.Context, client *elasticsearch.Client, ids ...string bytes.NewReader(body), opts..., ) - if err != nil { return fmt.Errorf("InvalidateAPIKey: %w", err) } diff --git a/internal/pkg/apikey/metadata.go b/internal/pkg/apikey/metadata.go index c80997c7b..d00380c01 100644 --- a/internal/pkg/apikey/metadata.go +++ b/internal/pkg/apikey/metadata.go @@ -19,18 +19,20 @@ func (t Type) String() string { // Metadata is additional information associated with an APIKey. type Metadata struct { - AgentID string `json:"agent_id,omitempty"` - Managed bool `json:"managed,omitempty"` - ManagedBy string `json:"managed_by,omitempty"` - Type string `json:"type,omitempty"` + AgentID string `json:"agent_id,omitempty"` + Managed bool `json:"managed,omitempty"` + ManagedBy string `json:"managed_by,omitempty"` + OutputName string `json:"output_name,omitempty"` + Type string `json:"type,omitempty"` } // NewMetadata returns Metadata for the given agentID. -func NewMetadata(agentID string, typ Type) Metadata { +func NewMetadata(agentID string, outputName string, typ Type) Metadata { return Metadata{ - AgentID: agentID, - Managed: true, - ManagedBy: ManagedByFleetServer, - Type: typ.String(), + AgentID: agentID, + Managed: true, + ManagedBy: ManagedByFleetServer, + OutputName: outputName, + Type: typ.String(), } } diff --git a/internal/pkg/bulk/opBulk.go b/internal/pkg/bulk/opBulk.go index 50b2c47e0..d47ba9592 100644 --- a/internal/pkg/bulk/opBulk.go +++ b/internal/pkg/bulk/opBulk.go @@ -7,6 +7,7 @@ package bulk import ( "bytes" "context" + "errors" "fmt" "time" @@ -187,7 +188,6 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { } res, err := req.Do(ctx, b.es) - if err != nil { log.Error().Err(err).Str("mod", kModBulk).Msg("Fail BulkRequest req.Do") return err @@ -217,12 +217,18 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { var blk bulkIndexerResponse blk.Items = make([]bulkStubItem, 0, queueCnt) + // TODO: We're loosing information abut the errors, we should check a way + // to return the full error ES returns if err = easyjson.Unmarshal(buf.Bytes(), &blk); err != nil { - log.Error(). - Err(err). + log.Err(err). Str("mod", kModBulk). - Msg("Unmarshal error") - return err + Msg("flushBulk failed, could not unmarshal ES response") + return fmt.Errorf("flushBulk failed, could not unmarshal ES response: %w", err) + } + if blk.HasErrors { + // We lack information to properly correlate this error with what has failed. + // Thus, for now it'd be more noise than information outside an investigation. + log.Debug().Err(errors.New(buf.String())).Msg("Bulk call: Es returned an error") } log.Trace(). diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 53870e58e..2242305f8 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -508,7 +508,7 @@ func runUnenroller(ctx context.Context, bulker bulk.Bulk, policyID string, unenr func runUnenrollerWork(ctx context.Context, bulker bulk.Bulk, policyID string, unenrollTimeout time.Duration, zlog zerolog.Logger, agentsIndex string) error { agents, err := dl.FindOfflineAgents(ctx, bulker, policyID, unenrollTimeout, dl.WithIndexName(agentsIndex)) - if err != nil || len(agents) == 0 { + if err != nil { return err } @@ -540,11 +540,13 @@ func unenrollAgent(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a dl.FieldUnenrolledReason: unenrolledReasonTimeout, dl.FieldUpdatedAt: now, } + body, err := fields.Marshal() if err != nil { return err } - apiKeys := getAPIKeyIDs(agent) + + apiKeys := agent.APIKeyIDs() zlog = zlog.With(). Str(logger.AgentID, agent.Id). @@ -567,17 +569,6 @@ func unenrollAgent(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a return err } -func getAPIKeyIDs(agent *model.Agent) []string { - keys := make([]string, 0, 1) - if agent.AccessAPIKeyID != "" { - keys = append(keys, agent.AccessAPIKeyID) - } - if agent.DefaultAPIKeyID != "" { - keys = append(keys, agent.DefaultAPIKeyID) - } - return keys -} - func waitWithContext(ctx context.Context, to time.Duration) error { t := time.NewTimer(to) defer t.Stop() diff --git a/internal/pkg/coordinator/monitor_integration_test.go b/internal/pkg/coordinator/monitor_integration_test.go index ffef699d1..defc4a9c7 100644 --- a/internal/pkg/coordinator/monitor_integration_test.go +++ b/internal/pkg/coordinator/monitor_integration_test.go @@ -159,7 +159,7 @@ func TestMonitorUnenroller(t *testing.T) { agentID, "", []byte(""), - apikey.NewMetadata(agentID, apikey.TypeAccess), + apikey.NewMetadata(agentID, "", apikey.TypeAccess), ) require.NoError(t, err) outputKey, err := bulker.APIKeyCreate( @@ -167,20 +167,21 @@ func TestMonitorUnenroller(t *testing.T) { agentID, "", []byte(""), - apikey.NewMetadata(agentID, apikey.TypeAccess), + apikey.NewMetadata(agentID, "default", apikey.TypeAccess), ) require.NoError(t, err) // add agent that should be unenrolled sixAgo := time.Now().UTC().Add(-6 * time.Minute) agentBody, err := json.Marshal(model.Agent{ - AccessAPIKeyID: accessKey.ID, - DefaultAPIKeyID: outputKey.ID, - Active: true, - EnrolledAt: sixAgo.Format(time.RFC3339), - LastCheckin: sixAgo.Format(time.RFC3339), - PolicyID: policy1Id, - UpdatedAt: sixAgo.Format(time.RFC3339), + AccessAPIKeyID: accessKey.ID, + Outputs: map[string]*model.PolicyOutput{ + "default": {APIKeyID: outputKey.ID}}, + Active: true, + EnrolledAt: sixAgo.Format(time.RFC3339), + LastCheckin: sixAgo.Format(time.RFC3339), + PolicyID: policy1Id, + UpdatedAt: sixAgo.Format(time.RFC3339), }) require.NoError(t, err) _, err = bulker.Create(ctx, agentsIndex, agentID, agentBody) @@ -306,7 +307,7 @@ func TestMonitorUnenrollerSetAndClear(t *testing.T) { agentID, "", []byte(""), - apikey.NewMetadata(agentID, apikey.TypeAccess), + apikey.NewMetadata(agentID, "", apikey.TypeAccess), ) require.NoError(t, err) outputKey, err := bulker.APIKeyCreate( @@ -314,7 +315,7 @@ func TestMonitorUnenrollerSetAndClear(t *testing.T) { agentID, "", []byte(""), - apikey.NewMetadata(agentID, apikey.TypeAccess), + apikey.NewMetadata(agentID, "default", apikey.TypeAccess), ) require.NoError(t, err) diff --git a/internal/pkg/dl/agent.go b/internal/pkg/dl/agent.go index 1d52082f7..a4871fa73 100644 --- a/internal/pkg/dl/agent.go +++ b/internal/pkg/dl/agent.go @@ -6,6 +6,7 @@ package dl import ( "context" + "fmt" "time" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" @@ -48,19 +49,23 @@ func prepareOfflineAgentsByPolicyID() *dsl.Tmpl { return tmpl } -func FindAgent(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, name string, v interface{}, opt ...Option) (agent model.Agent, err error) { +func FindAgent(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, name string, v interface{}, opt ...Option) (model.Agent, error) { o := newOption(FleetAgents, opt...) res, err := SearchWithOneParam(ctx, bulker, tmpl, o.indexName, name, v) if err != nil { - return + return model.Agent{}, fmt.Errorf("failed searching for agent: %w", err) } if len(res.Hits) == 0 { - return agent, ErrNotFound + return model.Agent{}, ErrNotFound } - err = res.Hits[0].Unmarshal(&agent) - return agent, err + var agent model.Agent + if err = res.Hits[0].Unmarshal(&agent); err != nil { + return model.Agent{}, fmt.Errorf("could not unmarshal ES document into model.Agent: %w", err) + } + + return agent, nil } func FindOfflineAgents(ctx context.Context, bulker bulk.Bulk, policyID string, unenrollTimeout time.Duration, opt ...Option) ([]model.Agent, error) { @@ -71,18 +76,19 @@ func FindOfflineAgents(ctx context.Context, bulker bulk.Bulk, policyID string, u FieldLastCheckin: past, }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed searching for agent: %w", err) } if len(res.Hits) == 0 { - return nil, nil + return nil, ErrNotFound } agents := make([]model.Agent, len(res.Hits)) for i, hit := range res.Hits { if err := hit.Unmarshal(&agents[i]); err != nil { - return nil, err + return nil, fmt.Errorf("could not unmarshal ES document into model.Agent: %w", err) } } + return agents, nil } diff --git a/internal/pkg/dl/agent_integration_test.go b/internal/pkg/dl/agent_integration_test.go index 4e65ddb94..3baab6c7e 100644 --- a/internal/pkg/dl/agent_integration_test.go +++ b/internal/pkg/dl/agent_integration_test.go @@ -108,3 +108,48 @@ func TestFindOfflineAgents(t *testing.T) { require.Len(t, agents, 2) assert.EqualValues(t, []string{twoDayOldID, threeDayOldID}, []string{agents[0].Id, agents[1].Id}) } + +func TestFindAgent_NewModel(t *testing.T) { + index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) + + now := time.Now().UTC() + nowStr := now.Format(time.RFC3339) + + policyID := uuid.Must(uuid.NewV4()).String() + agentID := uuid.Must(uuid.NewV4()).String() + + wantOutputs := map[string]*model.PolicyOutput{ + "default": { + Type: "elasticsearch", + APIKey: "TestFindNewModelAgent_APIKey", + ToRetireAPIKeyIds: []model.ToRetireAPIKeyIdsItems{ + { + ID: "TestFindNewModelAgent_APIKeyID_invalidated", + RetiredAt: "TestFindNewModelAgent_APIKeyID_invalidated_at"}, + }, + APIKeyID: "TestFindNewModelAgent_APIKeyID", + PermissionsHash: "TestFindNewModelAgent_PermisPolicysionsHash", + }, + } + body, err := json.Marshal(model.Agent{ + PolicyID: policyID, + Active: true, + LastCheckin: nowStr, + LastCheckinStatus: "", + UpdatedAt: nowStr, + EnrolledAt: nowStr, + Outputs: wantOutputs, + }) + require.NoError(t, err) + + _, err = bulker.Create( + context.Background(), index, agentID, body, bulk.WithRefresh()) + require.NoError(t, err) + + agent, err := FindAgent( + context.Background(), bulker, QueryAgentByID, FieldID, agentID, WithIndexName(index)) + require.NoError(t, err) + + assert.Equal(t, agentID, agent.Id) + assert.Equal(t, wantOutputs, agent.Outputs) +} diff --git a/internal/pkg/dl/constants.go b/internal/pkg/dl/constants.go index 9fb0614da..419f92874 100644 --- a/internal/pkg/dl/constants.go +++ b/internal/pkg/dl/constants.go @@ -27,22 +27,23 @@ const ( FieldMaxSeqNo = "max_seq_no" FieldActionSeqNo = "action_seq_no" - FieldActionID = "action_id" - FieldPolicyID = "policy_id" - FieldRevisionIdx = "revision_idx" - FieldCoordinatorIdx = "coordinator_idx" - FieldLastCheckin = "last_checkin" - FieldLastCheckinStatus = "last_checkin_status" - FieldLocalMetadata = "local_metadata" - FieldPolicyRevisionIdx = "policy_revision_idx" - FieldPolicyCoordinatorIdx = "policy_coordinator_idx" - FieldDefaultAPIKey = "default_api_key" - FieldDefaultAPIKeyID = "default_api_key_id" //nolint:gosec // field name - FieldDefaultAPIKeyHistory = "default_api_key_history" //nolint:gosec // field name - FieldPolicyOutputPermissionsHash = "policy_output_permissions_hash" - FieldUnenrolledReason = "unenrolled_reason" - FieldAgentVersion = "version" - FieldAgent = "agent" + FieldActionID = "action_id" + FieldAgent = "agent" + FieldAgentVersion = "version" + FieldCoordinatorIdx = "coordinator_idx" + FieldLastCheckin = "last_checkin" + FieldLastCheckinStatus = "last_checkin_status" + FieldLocalMetadata = "local_metadata" + FieldPolicyCoordinatorIdx = "policy_coordinator_idx" + FieldPolicyID = "policy_id" + FieldPolicyOutputAPIKey = "api_key" + FieldPolicyOutputAPIKeyID = "api_key_id" + FieldPolicyOutputPermissionsHash = "permissions_hash" + FieldPolicyOutputToRetireAPIKeyIDs = "to_retire_api_key_ids" //nolint:gosec // false positive + FieldPolicyRevisionIdx = "policy_revision_idx" + FieldRevisionIdx = "revision_idx" + FieldUnenrolledReason = "unenrolled_reason" + FiledType = "type" FieldActive = "active" FieldUpdatedAt = "updated_at" diff --git a/internal/pkg/dl/migration.go b/internal/pkg/dl/migration.go index 4beb26741..4107dfd38 100644 --- a/internal/pkg/dl/migration.go +++ b/internal/pkg/dl/migration.go @@ -12,59 +12,73 @@ import ( "net/http" "time" - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" - "github.com/elastic/fleet-server/v7/internal/pkg/dsl" - "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/pkg/errors" "github.com/rs/zerolog/log" -) -func Migrate(ctx context.Context, bulker bulk.Bulk) error { - return migrateAgentMetadata(ctx, bulker) -} - -// FleetServer 7.15 added a new *AgentMetadata field to the Agent record. -// This field was populated in new enrollments in 7.15 and later; however, the -// change was not backported to support 7.14. The security team is reliant on the -// existence of this field in 7.16, so the following migration was added to -// support upgrade from 7.14. -// -// It is currently safe to run this in the background; albeit with some -// concern on conflicts. The conflict risk exists regardless as N Fleet Servers -// can be run in parallel at the same time. -// -// As the update only occurs once, the 99.9% case is a noop. -func migrateAgentMetadata(ctx context.Context, bulker bulk.Bulk) error { + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/dsl" +) - root := dsl.NewRoot() - root.Query().Bool().MustNot().Exists("agent.id") +type ( + migrationFn func(context.Context, bulk.Bulk) error + migrationBodyFn func() (string, string, []byte, error) + migrationResponse struct { + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Total int `json:"total"` + Updated int `json:"updated"` + Deleted int `json:"deleted"` + Batches int `json:"batches"` + VersionConflicts int `json:"version_conflicts"` + Noops int `json:"noops"` + Retries struct { + Bulk int `json:"bulk"` + Search int `json:"search"` + } `json:"retries"` + Failures []json.RawMessage `json:"failures"` + } +) - painless := "ctx._source.agent = [:]; ctx._source.agent.id = ctx._id;" - root.Param("script", painless) +// timeNow is used to get the current time. It should be replaced for testing. +var timeNow = time.Now - body, err := root.MarshalJSON() - if err != nil { - return err +// Migrate applies, in sequence, the migration functions. Currently, each migration +// function is responsible to ensure it only applies the migration if needed, +// being a no-op otherwise. +func Migrate(ctx context.Context, bulker bulk.Bulk) error { + for _, fn := range []migrationFn{migrateTov7_15, migrateToV8_5} { + if err := fn(ctx, bulker); err != nil { + return err + } } -LOOP: + return nil +} + +func migrate(ctx context.Context, bulker bulk.Bulk, fn migrationBodyFn) (int, error) { + var updatedDocs int for { - nConflicts, err := updateAgentMetadata(ctx, bulker, body) + name, index, body, err := fn() if err != nil { - return err - } - if nConflicts == 0 { - break LOOP + return updatedDocs, fmt.Errorf(": %w", err) } - time.Sleep(time.Second) + resp, err := applyMigration(ctx, name, index, bulker, body) + if err != nil { + return updatedDocs, fmt.Errorf("failed to apply migration %q: %w", + name, err) + } + updatedDocs += resp.Updated + if resp.VersionConflicts == 0 { + break + } } - return nil + return updatedDocs, nil } -func updateAgentMetadata(ctx context.Context, bulker bulk.Bulk, body []byte) (int, error) { +func applyMigration(ctx context.Context, name string, index string, bulker bulk.Bulk, body []byte) (migrationResponse, error) { start := time.Now() client := bulker.Client() @@ -78,59 +92,193 @@ func updateAgentMetadata(ctx context.Context, bulker bulk.Bulk, body []byte) (in client.UpdateByQuery.WithConflicts("proceed"), } - res, err := client.UpdateByQuery([]string{FleetAgents}, opts...) - + res, err := client.UpdateByQuery([]string{index}, opts...) if err != nil { - return 0, err + return migrationResponse{}, err } if res.IsError() { if res.StatusCode == http.StatusNotFound { // Ignore index not created yet; nothing to upgrade - return 0, nil + return migrationResponse{}, nil } - return 0, fmt.Errorf("Migrate UpdateByQuery %s", res.String()) + return migrationResponse{}, fmt.Errorf("migrate %s UpdateByQuery failed: %s", + name, res.String()) } - resp := struct { - Took int `json:"took"` - TimedOut bool `json:"timed_out"` - Total int `json:"total"` - Updated int `json:"updated"` - Deleted int `json:"deleted"` - Batches int `json:"batches"` - VersionConflicts int `json:"version_conflicts"` - Noops int `json:"noops"` - Retries struct { - Bulk int `json:"bulk"` - Search int `json:"search"` - } `json:"retries"` - Failures []json.RawMessage `json:"failures"` - }{} + resp := migrationResponse{} decoder := json.NewDecoder(res.Body) if err := decoder.Decode(&resp); err != nil { - return 0, errors.Wrap(err, "decode UpdateByQuery response") + return migrationResponse{}, errors.Wrap(err, "decode UpdateByQuery response") } log.Info(). - Int("took", resp.Took). - Bool("timed_out", resp.TimedOut). - Int("total", resp.Total). - Int("updated", resp.Updated). - Int("deleted", resp.Deleted). - Int("batches", resp.Batches). - Int("version_conflicts", resp.VersionConflicts). - Int("noops", resp.Noops). - Int("retries.bulk", resp.Retries.Bulk). - Int("retries.search", resp.Retries.Search). - Dur("rtt", time.Since(start)). - Msg("migrate agent records response") + Str("fleet.migration.name", name). + Int("fleet.migration.es.took", resp.Took). + Bool("fleet.migration.es.timed_out", resp.TimedOut). + Int("fleet.migration.total", resp.Total). + Int("fleet.migration.updated", resp.Updated). + Int("fleet.migration.deleted", resp.Deleted). + Int("fleet.migration.batches", resp.Batches). + Int("fleet.migration.version_conflicts", resp.VersionConflicts). + Int("fleet.migration.noops", resp.Noops). + Int("fleet.migration.retries.bulk", resp.Retries.Bulk). + Int("fleet.migration.retries.search", resp.Retries.Search). + Dur("fleet.migration.total.duration", time.Since(start)). + Msgf("migration %s done", name) for _, fail := range resp.Failures { - log.Error().RawJSON("failure", fail).Msg("migration failure") + log.Error().RawJSON("failure", fail).Msgf("failed applying %s migration", name) + } + + return resp, err +} + +// ============================== V7.15 migration ============================== +func migrateTov7_15(ctx context.Context, bulker bulk.Bulk) error { + log.Debug().Msg("applying migration to v7.15") + _, err := migrate(ctx, bulker, migrateAgentMetadata) + if err != nil { + return fmt.Errorf("v7.15.0 data migration failed: %w", err) + } + + return nil +} + +// FleetServer 7.15 added a new *AgentMetadata field to the Agent record. +// This field was populated in new enrollments in 7.15 and later; however, the +// change was not backported to support 7.14. The security team is reliant on the +// existence of this field in 7.16, so the following migration was added to +// support upgrade from 7.14. +// +// It is currently safe to run this in the background; albeit with some +// concern on conflicts. The conflict risk exists regardless as N Fleet Servers +// can be run in parallel at the same time. +// +// As the update only occurs once, the 99.9% case is a noop. +func migrateAgentMetadata() (string, string, []byte, error) { + const migrationName = "AgentMetadata" + query := dsl.NewRoot() + query.Query().Bool().MustNot().Exists("agent.id") + + painless := "ctx._source.agent = [:]; ctx._source.agent.id = ctx._id;" + query.Param("script", painless) + + body, err := query.MarshalJSON() + if err != nil { + return migrationName, FleetAgents, nil, fmt.Errorf("could not marshal ES query: %w", err) + } + + return migrationName, FleetAgents, body, nil +} + +// ============================== V8.5.0 migration ============================= +// https://github.com/elastic/fleet-server/issues/1672 + +func migrateToV8_5(ctx context.Context, bulker bulk.Bulk) error { + log.Debug().Msg("applying migration to v8.5.0") + migrated, err := migrate(ctx, bulker, migrateAgentOutputs) + if err != nil { + return fmt.Errorf("v8.5.0 data migration failed: %w", err) + } + + // The migration was necessary and indeed run, thus we need to regenerate + // the API keys for all agents. In order to do so, we increase the policy + // coordinator index to force a policy update. + if migrated > 0 { + _, err := migrate(ctx, bulker, migratePolicyCoordinatorIdx) + if err != nil { + return fmt.Errorf("v8.5.0 data migration failed: %w", err) + } + } + + return nil +} + +// migrateAgentOutputs performs the necessary changes on the Agent documents +// to introduce the `Outputs` field. +// +// FleetServer 8.5.0 introduces a new field to the Agent document, Outputs, to +// store the outputs credentials and data. The DefaultAPIKey, DefaultAPIKeyID, +// DefaultAPIKeyHistory and PolicyOutputPermissionsHash are now deprecated in +// favour of the new `Outputs` fields, which maps the output name to its data. +// This change fixes https://github.com/elastic/fleet-server/issues/1672. +// +// The change is backward compatible as the deprecated fields are just set to +// their zero value and an older version of FleetServer can repopulate them. +// However, reverting FleetServer to an older version might cause very issue +// this change fixes. +func migrateAgentOutputs() (string, string, []byte, error) { + const ( + migrationName = "AgentOutputs" + fieldOutputs = "outputs" + fieldRetiredAt = "retiredAt" + ) + + query := dsl.NewRoot() + query.Query().Bool().MustNot().Exists(fieldOutputs) + + fields := map[string]interface{}{fieldRetiredAt: timeNow().UTC().Format(time.RFC3339)} + painless := ` +// set up the new fields +ctx._source['` + fieldOutputs + `']=new HashMap(); +ctx._source['` + fieldOutputs + `']['default']=new HashMap(); +ctx._source['` + fieldOutputs + `']['default'].to_retire_api_key_ids=new ArrayList(); + +// copy 'default_api_key_history' to new 'outputs' field +ctx._source['` + fieldOutputs + `']['default'].type="elasticsearch"; +if (ctx._source.default_api_key_history != null && ctx._source.default_api_key_history.length > 0) { + ctx._source['` + fieldOutputs + `']['default'].to_retire_api_key_ids=ctx._source.default_api_key_history; +} + +Map map = new HashMap(); +map.put("retired_at", params.` + fieldRetiredAt + `); +map.put("id", ctx._source.default_api_key_id); + +// Make current API key empty, so fleet-server will generate a new one +// Add current API jey to be retired +ctx._source['` + fieldOutputs + `']['default'].to_retire_api_key_ids.add(map); +ctx._source['` + fieldOutputs + `']['default'].api_key=""; +ctx._source['` + fieldOutputs + `']['default'].api_key_id=""; +ctx._source['` + fieldOutputs + `']['default'].permissions_hash=ctx._source.policy_output_permissions_hash; + +// Erase deprecated fields +ctx._source.default_api_key_history=null; +ctx._source.default_api_key=""; +ctx._source.default_api_key_id=""; +ctx._source.policy_output_permissions_hash=""; +` + query.Param("script", map[string]interface{}{ + "lang": "painless", + "source": painless, + "params": fields, + }) + + body, err := query.MarshalJSON() + if err != nil { + return migrationName, FleetAgents, nil, fmt.Errorf("could not marshal ES query: %w", err) + } + + return migrationName, FleetAgents, body, nil +} + +// migratePolicyCoordinatorIdx increases the policy's CoordinatorIdx to force +// a policy update ensuring the output data will be migrated to the new +// Agent.Outputs field. See migrateAgentOutputs and https://github.com/elastic/fleet-server/issues/1672 +// for details. +func migratePolicyCoordinatorIdx() (string, string, []byte, error) { + const migrationName = "PolicyCoordinatorIdx" + + query := dsl.NewRoot() + query.Query().MatchAll() + query.Param("script", `ctx._source.coordinator_idx++;`) + + body, err := query.MarshalJSON() + if err != nil { + return migrationName, FleetPolicies, nil, fmt.Errorf("could not marshal ES query: %w", err) } - return resp.VersionConflicts, err + return migrationName, FleetPolicies, body, nil } diff --git a/internal/pkg/dl/migration_integration_test.go b/internal/pkg/dl/migration_integration_test.go new file mode 100644 index 000000000..55a2e61d7 --- /dev/null +++ b/internal/pkg/dl/migration_integration_test.go @@ -0,0 +1,295 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package dl + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/gofrs/uuid" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/model" + ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" +) + +const nowStr = "2022-08-12T16:50:05Z" + +func createSomeAgents(t *testing.T, n int, apiKey bulk.APIKey, index string, bulker bulk.Bulk) []string { + t.Helper() + + var createdAgents []string + + for i := 0; i < n; i++ { + outputAPIKey := bulk.APIKey{ + ID: fmt.Sprint(apiKey.ID, i), + Key: fmt.Sprint(apiKey.Key, i), + } + + agentID := uuid.Must(uuid.NewV4()).String() + policyID := uuid.Must(uuid.NewV4()).String() + + agentModel := model.Agent{ + PolicyID: policyID, + Active: true, + LastCheckin: nowStr, + LastCheckinStatus: "", + UpdatedAt: nowStr, + EnrolledAt: nowStr, + DefaultAPIKeyID: outputAPIKey.ID, + DefaultAPIKey: outputAPIKey.Agent(), + PolicyOutputPermissionsHash: fmt.Sprint("a_output_permission_SHA_", i), + DefaultAPIKeyHistory: []model.ToRetireAPIKeyIdsItems{ + { + ID: "old_" + outputAPIKey.ID, + RetiredAt: nowStr, + }, + }, + } + + body, err := json.Marshal(agentModel) + require.NoError(t, err) + + _, err = bulker.Create( + context.Background(), index, agentID, body, bulk.WithRefresh()) + require.NoError(t, err) + + createdAgents = append(createdAgents, agentID) + } + + return createdAgents +} + +func createSomePolicies(t *testing.T, n int, index string, bulker bulk.Bulk) []string { + t.Helper() + + var created []string + + for i := 0; i < n; i++ { + now := time.Now().UTC() + nowStr := now.Format(time.RFC3339) + + policyModel := model.Policy{ + ESDocument: model.ESDocument{}, + CoordinatorIdx: int64(i), + Data: nil, + DefaultFleetServer: false, + PolicyID: fmt.Sprint(i), + RevisionIdx: 1, + Timestamp: nowStr, + UnenrollTimeout: 0, + } + + body, err := json.Marshal(policyModel) + require.NoError(t, err) + + policyDocID, err := bulker.Create( + context.Background(), index, "", body, bulk.WithRefresh()) + require.NoError(t, err) + + created = append(created, policyDocID) + } + + return created +} + +func TestPolicyCoordinatorIdx(t *testing.T) { + index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetPolicies) + + docIDs := createSomePolicies(t, 25, index, bulker) + + migrated, err := migrate(context.Background(), bulker, migratePolicyCoordinatorIdx) + require.NoError(t, err) + + require.Equal(t, len(docIDs), migrated) + + for i := range docIDs { + policies, err := QueryLatestPolicies( + context.Background(), bulker, WithIndexName(index)) + if err != nil { + assert.NoError(t, err, "failed to query latest policies") // we want to continue even if a single agent fails + continue + } + + var got model.Policy + for _, p := range policies { + if p.PolicyID == fmt.Sprint(i) { + got = p + } + } + + assert.Equal(t, int64(i+1), got.CoordinatorIdx) + } +} + +func TestMigrateOutputs_withDefaultAPIKeyHistory(t *testing.T) { + now, err := time.Parse(time.RFC3339, nowStr) + require.NoError(t, err, "could not parse time "+nowStr) + timeNow = func() time.Time { + return now + } + + index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) + apiKey := bulk.APIKey{ + ID: "testAgent_", + Key: "testAgent_key_", + } + + agentIDs := createSomeAgents(t, 25, apiKey, index, bulker) + + migratedAgents, err := migrate(context.Background(), bulker, migrateAgentOutputs) + require.NoError(t, err) + + assert.Equal(t, len(agentIDs), migratedAgents) + + for i, id := range agentIDs { + wantOutputType := "elasticsearch" //nolint:goconst // test cases have some duplication + + got, err := FindAgent( + context.Background(), bulker, QueryAgentByID, FieldID, id, WithIndexName(index)) + if err != nil { + assert.NoError(t, err, "failed to find agent ID %q", id) // we want to continue even if a single agent fails + continue + } + + wantToRetireAPIKeyIds := []model.ToRetireAPIKeyIdsItems{ + { + // Current API should be marked to retire after the migration + ID: fmt.Sprintf("%s%d", apiKey.ID, i), + RetiredAt: timeNow().UTC().Format(time.RFC3339)}, + { + ID: fmt.Sprintf("old_%s%d", apiKey.ID, i), + RetiredAt: nowStr}, + } + + // Assert new fields + require.Len(t, got.Outputs, 1) + // Default API key is empty to force fleet-server to regenerate them. + assert.Empty(t, got.Outputs["default"].APIKey) + assert.Empty(t, got.Outputs["default"].APIKeyID) + + assert.Equal(t, wantOutputType, got.Outputs["default"].Type) + assert.Equal(t, + fmt.Sprint("a_output_permission_SHA_", i), + got.Outputs["default"].PermissionsHash) + + // Assert ToRetireAPIKeyIds contains the expected values, regardless of the order. + for _, want := range wantToRetireAPIKeyIds { + var found bool + for _, got := range got.Outputs["default"].ToRetireAPIKeyIds { + found = found || cmp.Equal(want, got) + } + if !found { + t.Errorf("could not find %#v, in %#v", + want, got.Outputs["default"].ToRetireAPIKeyIds) + } + } + + // Assert deprecated fields + assert.Empty(t, got.DefaultAPIKey) + assert.Empty(t, got.DefaultAPIKey) + assert.Empty(t, got.PolicyOutputPermissionsHash) + assert.Nil(t, got.DefaultAPIKeyHistory) + } +} + +func TestMigrateOutputs_nil_DefaultAPIKeyHistory(t *testing.T) { + wantOutputType := "elasticsearch" + + now, err := time.Parse(time.RFC3339, nowStr) + require.NoError(t, err, "could not parse time "+nowStr) + timeNow = func() time.Time { + return now + } + + index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) + apiKey := bulk.APIKey{ + ID: "testAgent_", + Key: "testAgent_key_", + } + + i := 0 + outputAPIKey := bulk.APIKey{ + ID: fmt.Sprint(apiKey.ID, i), + Key: fmt.Sprint(apiKey.Key, i), + } + + agentID := uuid.Must(uuid.NewV4()).String() + policyID := uuid.Must(uuid.NewV4()).String() + + agentModel := model.Agent{ + PolicyID: policyID, + Active: true, + LastCheckin: nowStr, + LastCheckinStatus: "", + UpdatedAt: nowStr, + EnrolledAt: nowStr, + DefaultAPIKeyID: outputAPIKey.ID, + DefaultAPIKey: outputAPIKey.Agent(), + PolicyOutputPermissionsHash: fmt.Sprint("a_output_permission_SHA_", i), + } + + body, err := json.Marshal(agentModel) + require.NoError(t, err) + + _, err = bulker.Create( + context.Background(), index, agentID, body, bulk.WithRefresh()) + require.NoError(t, err) + + migratedAgents, err := migrate(context.Background(), bulker, migrateAgentOutputs) + require.NoError(t, err) + + got, err := FindAgent( + context.Background(), bulker, QueryAgentByID, FieldID, agentID, WithIndexName(index)) + require.NoError(t, err, "failed to find agent ID %q", agentID) // we want to continue even if a single agent fails + + assert.Equal(t, 1, migratedAgents) + + // Assert new fields + require.Len(t, got.Outputs, 1) + // Default API key is empty to force fleet-server to regenerate them. + assert.Empty(t, got.Outputs["default"].APIKey) + assert.Empty(t, got.Outputs["default"].APIKeyID) + assert.Equal(t, wantOutputType, got.Outputs["default"].Type) + assert.Equal(t, + fmt.Sprint("a_output_permission_SHA_", i), + got.Outputs["default"].PermissionsHash) + + // Assert ToRetireAPIKeyIds contains the expected values, regardless of the order. + if assert.Len(t, got.Outputs["default"].ToRetireAPIKeyIds, 1) { + assert.Equal(t, + model.ToRetireAPIKeyIdsItems{ID: outputAPIKey.ID, RetiredAt: nowStr}, + got.Outputs["default"].ToRetireAPIKeyIds[0]) + } + + // Assert deprecated fields + assert.Empty(t, got.DefaultAPIKey) + assert.Empty(t, got.DefaultAPIKey) + assert.Empty(t, got.PolicyOutputPermissionsHash) + assert.Nil(t, got.DefaultAPIKeyHistory) +} + +func TestMigrateOutputs_no_agent_document(t *testing.T) { + now, err := time.Parse(time.RFC3339, nowStr) + require.NoError(t, err, "could not parse time "+nowStr) + timeNow = func() time.Time { + return now + } + + _, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) + + migratedAgents, err := migrate(context.Background(), bulker, migrateAgentOutputs) + require.NoError(t, err) + + assert.Equal(t, 0, migratedAgents) +} diff --git a/internal/pkg/es/bulk_update_api_key.go b/internal/pkg/es/bulk_update_api_key.go index 8ad66b187..e75df2996 100644 --- a/internal/pkg/es/bulk_update_api_key.go +++ b/internal/pkg/es/bulk_update_api_key.go @@ -15,15 +15,16 @@ import ( "context" "io" "net/http" - "strings" + "strings" "github.com/elastic/go-elasticsearch/v7/esapi" ) const updateAPIKeyPath = "/_security/api_key/_bulk_update" -type UpdateApiKeyBulk func (o ...func(*UpdateApiKeyBulkRequest))(*Response,error) -type UpdateApiKeyBulkRequest struct{ +type UpdateApiKeyBulk func(o ...func(*UpdateApiKeyBulkRequest)) (*Response, error) + +type UpdateApiKeyBulkRequest struct { Body io.Reader Header http.Header @@ -34,50 +35,49 @@ type UpdateApiKeyBulkRequest struct{ // Do executes the request and returns response or error. // func (r UpdateApiKeyBulkRequest) Do(ctx context.Context, transport esapi.Transport) (*esapi.Response, error) { - var path strings.Builder - - path.Grow(len(updateAPIKeyPath)) - path.WriteString(updateAPIKeyPath) - - req, err := newRequest(http.MethodPost, path.String(), r.Body) - if err != nil { - return nil, err - } - - if r.Body != nil { - req.Header[headerContentType] = headerContentTypeJSON - } - - if len(r.Header) > 0 { - if len(req.Header) == 0 { - req.Header = r.Header - } else { - for k, vv := range r.Header { - for _, v := range vv { - req.Header.Add(k, v) - } - } - } - } - - if ctx != nil { - req = req.WithContext(ctx) - } - - res, err := transport.Perform(req) - if err != nil { - return nil, err - } - - response := esapi.Response{ - StatusCode: res.StatusCode, - Body: res.Body, - Header: res.Header, - } - - return &response, nil -} + var path strings.Builder + + path.Grow(len(updateAPIKeyPath)) + path.WriteString(updateAPIKeyPath) + + req, err := newRequest(http.MethodPost, path.String(), r.Body) + if err != nil { + return nil, err + } + if r.Body != nil { + req.Header[headerContentType] = headerContentTypeJSON + } + + if len(r.Header) > 0 { + if len(req.Header) == 0 { + req.Header = r.Header + } else { + for k, vv := range r.Header { + for _, v := range vv { + req.Header.Add(k, v) + } + } + } + } + + if ctx != nil { + req = req.WithContext(ctx) + } + + res, err := transport.Perform(req) + if err != nil { + return nil, err + } + + response := esapi.Response{ + StatusCode: res.StatusCode, + Body: res.Body, + Header: res.Header, + } + + return &response, nil +} // WithContext sets the request context. // @@ -95,7 +95,6 @@ func (f UpdateApiKeyBulkRequest) WithBody(v io.Reader) func(*UpdateApiKeyBulkReq } } - // WithHeader adds the headers to the HTTP request. // func (f UpdateApiKeyBulkRequest) WithHeader(h map[string]string) func(*UpdateApiKeyBulkRequest) { @@ -107,4 +106,4 @@ func (f UpdateApiKeyBulkRequest) WithHeader(h map[string]string) func(*UpdateApi r.Header.Add(k, v) } } -} \ No newline at end of file +} diff --git a/internal/pkg/es/error.go b/internal/pkg/es/error.go index 79b07499c..a5e575df5 100644 --- a/internal/pkg/es/error.go +++ b/internal/pkg/es/error.go @@ -37,17 +37,25 @@ func (e ErrElastic) Error() string { // Otherwise were getting: "elastic fail 404::" msg := "elastic fail " var b strings.Builder - b.Grow(len(msg) + 5 + len(e.Type) + len(e.Reason)) + b.Grow(len(msg) + 11 + len(e.Type) + len(e.Reason) + len(e.Cause.Type) + len(e.Cause.Reason)) b.WriteString(msg) b.WriteString(strconv.Itoa(e.Status)) if e.Type != "" { - b.WriteString(":") + b.WriteString(": ") b.WriteString(e.Type) } if e.Reason != "" { - b.WriteString(":") + b.WriteString(": ") b.WriteString(e.Reason) } + if e.Cause.Type != "" { + b.WriteString(": ") + b.WriteString(e.Cause.Type) + } + if e.Cause.Reason != "" { + b.WriteString(": ") + b.WriteString(e.Cause.Reason) + } return b.String() } @@ -83,8 +91,8 @@ func TranslateError(status int, e *ErrorT) error { Type string Reason string }{ - e.Cause.Type, - e.Cause.Reason, + Type: e.Cause.Type, + Reason: e.Cause.Reason, }, } } diff --git a/internal/pkg/model/ext.go b/internal/pkg/model/ext.go index d89787855..4a11bbe08 100644 --- a/internal/pkg/model/ext.go +++ b/internal/pkg/model/ext.go @@ -27,14 +27,36 @@ func (m *Server) SetTime(t time.Time) { } // CheckDifferentVersion returns Agent version if it is different from ver, otherwise return empty string -func (m *Agent) CheckDifferentVersion(ver string) string { - if m == nil { +func (a *Agent) CheckDifferentVersion(ver string) string { + if a == nil { return "" } - if m.Agent == nil || ver != m.Agent.Version { + if a.Agent == nil || ver != a.Agent.Version { return ver } return "" } + +// APIKeyIDs returns all the API keys, the valid, in-use as well as the one +// marked to be retired. +func (a *Agent) APIKeyIDs() []string { + if a == nil { + return nil + } + keys := make([]string, 0, len(a.Outputs)+1) + if a.AccessAPIKeyID != "" { + keys = append(keys, a.AccessAPIKeyID) + } + + for _, output := range a.Outputs { + keys = append(keys, output.APIKeyID) + for _, key := range output.ToRetireAPIKeyIds { + keys = append(keys, key.ID) + } + } + + return keys + +} diff --git a/internal/pkg/model/ext_test.go b/internal/pkg/model/ext_test.go index e48194b30..527570270 100644 --- a/internal/pkg/model/ext_test.go +++ b/internal/pkg/model/ext_test.go @@ -2,15 +2,13 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build !integration -// +build !integration - package model import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" ) func TestAgentGetNewVersion(t *testing.T) { @@ -85,3 +83,54 @@ func TestAgentGetNewVersion(t *testing.T) { }) } } + +func TestAgentAPIKeyIDs(t *testing.T) { + tcs := []struct { + name string + agent Agent + want []string + }{ + { + name: "no API key marked to be retired", + agent: Agent{ + AccessAPIKeyID: "access_api_key_id", + Outputs: map[string]*PolicyOutput{ + "p1": {APIKeyID: "p1_api_key_id"}, + "p2": {APIKeyID: "p2_api_key_id"}, + }, + }, + want: []string{"access_api_key_id", "p1_api_key_id", "p2_api_key_id"}, + }, + { + name: "with API key marked to be retired", + agent: Agent{ + AccessAPIKeyID: "access_api_key_id", + Outputs: map[string]*PolicyOutput{ + "p1": { + APIKeyID: "p1_api_key_id", + ToRetireAPIKeyIds: []ToRetireAPIKeyIdsItems{{ + ID: "p1_to_retire_key", + }}}, + "p2": { + APIKeyID: "p2_api_key_id", + ToRetireAPIKeyIds: []ToRetireAPIKeyIdsItems{{ + ID: "p2_to_retire_key", + }}}, + }, + }, + want: []string{ + "access_api_key_id", "p1_api_key_id", "p2_api_key_id", + "p1_to_retire_key", "p2_to_retire_key"}, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.agent.APIKeyIDs() + + // if A contains B and B contains A => A = B + assert.Subset(t, tc.want, got) + assert.Subset(t, got, tc.want) + }) + } +} diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index 9fb0d07fc..5d3c844f1 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -124,13 +124,13 @@ type Agent struct { Active bool `json:"active"` Agent *AgentMetadata `json:"agent,omitempty"` - // API key the Elastic Agent uses to authenticate with elasticsearch + // Deprecated. Use Outputs instead. API key the Elastic Agent uses to authenticate with elasticsearch DefaultAPIKey string `json:"default_api_key,omitempty"` - // Default API Key History - DefaultAPIKeyHistory []DefaultAPIKeyHistoryItems `json:"default_api_key_history,omitempty"` + // Deprecated. Use Outputs instead. Default API Key History + DefaultAPIKeyHistory []ToRetireAPIKeyIdsItems `json:"default_api_key_history,omitempty"` - // ID of the API key the Elastic Agent uses to authenticate with elasticsearch + // Deprecated. Use Outputs instead. ID of the API key the Elastic Agent uses to authenticate with elasticsearch DefaultAPIKeyID string `json:"default_api_key_id,omitempty"` // Date/time the Elastic Agent enrolled @@ -148,6 +148,9 @@ type Agent struct { // Local metadata information for the Elastic Agent LocalMetadata json.RawMessage `json:"local_metadata,omitempty"` + // Outputs is the policy output data, mapping the output name to its data + Outputs map[string]*PolicyOutput `json:"outputs,omitempty"` + // Packages array Packages []string `json:"packages,omitempty"` @@ -157,7 +160,7 @@ type Agent struct { // The policy ID for the Elastic Agent PolicyID string `json:"policy_id,omitempty"` - // The policy output permissions hash + // Deprecated. Use Outputs instead. The policy output permissions hash PolicyOutputPermissionsHash string `json:"policy_output_permissions_hash,omitempty"` // The current policy revision_idx for the Elastic Agent @@ -250,16 +253,6 @@ type Body struct { type Data struct { } -// DefaultAPIKeyHistoryItems -type DefaultAPIKeyHistoryItems struct { - - // API Key identifier - ID string `json:"id,omitempty"` - - // Date/time the API key was retired - RetiredAt string `json:"retired_at,omitempty"` -} - // EnrollmentAPIKey An Elastic Agent enrollment API key type EnrollmentAPIKey struct { ESDocument @@ -336,6 +329,26 @@ type PolicyLeader struct { Timestamp string `json:"@timestamp,omitempty"` } +// PolicyOutput holds the needed data to manage the output API keys +type PolicyOutput struct { + ESDocument + + // API key the Elastic Agent uses to authenticate with elasticsearch + APIKey string `json:"api_key"` + + // ID of the API key the Elastic Agent uses to authenticate with elasticsearch + APIKeyID string `json:"api_key_id"` + + // The policy output permissions hash + PermissionsHash string `json:"permissions_hash"` + + // API keys to be invalidated on next agent ack + ToRetireAPIKeyIds []ToRetireAPIKeyIdsItems `json:"to_retire_api_key_ids,omitempty"` + + // Type is the output type. Currently only Elasticsearch is supported. + Type string `json:"type"` +} + // Server A Fleet Server type Server struct { ESDocument @@ -357,6 +370,16 @@ type ServerMetadata struct { Version string `json:"version"` } +// ToRetireAPIKeyIdsItems the Output API Keys that were replaced and should be retired +type ToRetireAPIKeyIdsItems struct { + + // API Key identifier + ID string `json:"id,omitempty"` + + // Date/time the API key was retired + RetiredAt string `json:"retired_at,omitempty"` +} + // UserProvidedMetadata User provided metadata information for the Elastic Agent type UserProvidedMetadata struct { } diff --git a/internal/pkg/policy/parsed_policy.go b/internal/pkg/policy/parsed_policy.go index dbf5d3801..029298ef5 100644 --- a/internal/pkg/policy/parsed_policy.go +++ b/internal/pkg/policy/parsed_policy.go @@ -42,7 +42,7 @@ type ParsedPolicy struct { Policy model.Policy Fields map[string]json.RawMessage Roles RoleMapT - Outputs map[string]PolicyOutput + Outputs map[string]Output Default ParsedPolicyDefaults } @@ -91,8 +91,8 @@ func NewParsedPolicy(p model.Policy) (*ParsedPolicy, error) { return pp, nil } -func constructPolicyOutputs(outputsRaw json.RawMessage, roles map[string]RoleT) (map[string]PolicyOutput, error) { - result := make(map[string]PolicyOutput) +func constructPolicyOutputs(outputsRaw json.RawMessage, roles map[string]RoleT) (map[string]Output, error) { + result := make(map[string]Output) outputsMap, err := smap.Parse(outputsRaw) if err != nil { @@ -102,7 +102,7 @@ func constructPolicyOutputs(outputsRaw json.RawMessage, roles map[string]RoleT) for k := range outputsMap { v := outputsMap.GetMap(k) - p := PolicyOutput{ + p := Output{ Name: k, Type: v.GetString(FieldOutputType), } @@ -126,13 +126,13 @@ func parsePerms(permsRaw json.RawMessage) (RoleMapT, error) { // iterate across the keys m := make(RoleMapT, len(permMap)) for k := range permMap { - v := permMap.GetMap(k) if v != nil { var r RoleT // Stable hash on permissions payload + // permission hash created here if r.Sha2, err = v.Hash(); err != nil { return nil, err } diff --git a/internal/pkg/policy/parsed_policy_test.go b/internal/pkg/policy/parsed_policy_test.go index 547cfcf7a..32ef271a7 100644 --- a/internal/pkg/policy/parsed_policy_test.go +++ b/internal/pkg/policy/parsed_policy_test.go @@ -13,7 +13,6 @@ import ( ) func TestNewParsedPolicy(t *testing.T) { - // Run two formatting of the same payload to validate that the sha2 remains the same payloads := []string{ testPolicy, diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index e260a6a30..55683cf51 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -32,163 +32,208 @@ var ( ErrFailInjectAPIKey = errors.New("fail inject api key") ) -type PolicyOutput struct { +type Output struct { Name string Type string Role *RoleT } -func (p *PolicyOutput) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agent *model.Agent, outputMap smap.Map) error { +// Prepare prepares the output p to be sent to the elastic-agent +// The agent might be mutated for an elasticsearch output +func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agent *model.Agent, outputMap smap.Map) error { + zlog = zlog.With(). + Str("fleet.agent.id", agent.Id). + Str("fleet.policy.output.name", p.Name).Logger() + switch p.Type { case OutputTypeElasticsearch: zlog.Debug().Msg("preparing elasticsearch output") - - // The role is required to do api key management - if p.Role == nil { - zlog.Error().Str("name", p.Name).Msg("policy does not contain required output permission section") - return ErrNoOutputPerms + if err := p.prepareElasticsearch(ctx, zlog, bulker, agent, outputMap); err != nil { + return fmt.Errorf("failed to prepare elasticsearch output %q: %w", p.Name, err) } + case OutputTypeLogstash: + zlog.Debug().Msg("preparing logstash output") + zlog.Info().Msg("no actions required for logstash output preparation") + default: + zlog.Error().Msgf("unknown output type: %s; skipping preparation", p.Type) + return fmt.Errorf("encountered unexpected output type while preparing outputs: %s", p.Type) + } + return nil +} - // Determine whether we need to generate an output ApiKey. - // This is accomplished by comparing the sha2 hash stored in the agent - // record with the precalculated sha2 hash of the role. - - // Note: This will need to be updated when doing multi-cluster elasticsearch support - // Currently, we only have access to the token for the elasticsearch instance fleet-server - // is monitors. When updating for multiple ES instances we need to tie the token to the output. - needNewKey := false - needUpdateKey := false - switch { - case agent.DefaultAPIKey == "": - zlog.Debug().Msg("must generate api key as default API key is not present") - needNewKey = true - case p.Role.Sha2 != agent.PolicyOutputPermissionsHash: - zlog.Debug().Msg("must generate api key as policy output permissions changed") - needUpdateKey = true - default: - zlog.Debug().Msg("policy output permissions are the same") - } +func (p *Output) prepareElasticsearch( + ctx context.Context, + zlog zerolog.Logger, + bulker bulk.Bulk, + agent *model.Agent, + outputMap smap.Map) error { + // The role is required to do api key management + if p.Role == nil { + zlog.Error(). + Msg("policy does not contain required output permission section") + return ErrNoOutputPerms + } - if needUpdateKey { - zlog.Debug(). - RawJSON("roles", p.Role.Raw). - Str("oldHash", agent.PolicyOutputPermissionsHash). - Str("newHash", p.Role.Sha2). - Msg("Updating an API key") - - // query current api key for roles so we don't lose permissions in the meantime - currentRoles, err := fetchAPIKeyRoles(ctx, bulker, agent.DefaultAPIKeyID) - if err != nil { - zlog.Error(). - Str("apiKeyID", agent.DefaultAPIKeyID). - Err(err).Msg("fail fetching roles for key") - return err - } + output, foundOutput := agent.Outputs[p.Name] + if !foundOutput { + if agent.Outputs == nil { + agent.Outputs = map[string]*model.PolicyOutput{} + } - // merge roles with p.Role - newRoles, err := mergeRoles(zlog, currentRoles, p.Role) - if err != nil { - zlog.Error(). - Str("apiKeyID", agent.DefaultAPIKeyID). - Err(err).Msg("fail merging roles for key") - return err - } + zlog.Debug().Msgf("creating agent.Outputs[%s]", p.Name) + output = &model.PolicyOutput{} + agent.Outputs[p.Name] = output + } - // hash provided is only for merging request together and not persisted - err = bulker.APIKeyUpdate(ctx, agent.DefaultAPIKeyID, newRoles.Sha2, newRoles.Raw) - if err != nil { - zlog.Error().Err(err).Msg("fail generate output key") - zlog.Debug().RawJSON("roles", newRoles.Raw).Str("sha", newRoles.Sha2).Err(err).Msg("roles not updated") - return err - } + // Determine whether we need to generate an output ApiKey. + // This is accomplished by comparing the sha2 hash stored in the corresponding + // output in the agent record with the precalculated sha2 hash of the role. + + // Note: This will need to be updated when doing multi-cluster elasticsearch support + // Currently, we assume all ES outputs are the same ES fleet-server is connected to. + needNewKey := false + needUpdateKey := false + switch { + case output.APIKey == "": + zlog.Debug().Msg("must generate api key as default API key is not present") + needNewKey = true + case p.Role.Sha2 != output.PermissionsHash: + // the is actually the OutputPermissionsHash for the default hash. The Agent + // document on ES does not have OutputPermissionsHash for any other output + // besides the default one. It seems to me error-prone to rely on the default + // output permissions hash to generate new API keys for other outputs. + zlog.Debug().Msg("must generate api key as policy output permissions changed") + needUpdateKey = true + default: + zlog.Debug().Msg("policy output permissions are the same") + } - zlog.Debug(). - Str("hash.sha256", p.Role.Sha2). - Str("roles", string(p.Role.Raw)). - Msg("Updating agent record to pick up most recent roles.") + if needUpdateKey { + zlog.Debug(). + RawJSON("roles", p.Role.Raw). + Str("oldHash", output.PermissionsHash). + Str("newHash", p.Role.Sha2). + Msg("Generating a new API key") + + // query current api key for roles so we don't lose permissions in the meantime + currentRoles, err := fetchAPIKeyRoles(ctx, bulker, output.APIKeyID) + if err != nil { + zlog.Error(). + Str("apiKeyID", output.APIKeyID). + Err(err).Msg("fail fetching roles for key") + return err + } - fields := map[string]interface{}{ - dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, - } + // merge roles with p.Role + newRoles, err := mergeRoles(zlog, currentRoles, p.Role) + if err != nil { + zlog.Error(). + Str("apiKeyID", output.APIKeyID). + Err(err).Msg("fail merging roles for key") + return err + } - // Using painless script to update permission hash for updated key - body, err := renderUpdatePainlessScript(fields) - if err != nil { - return err - } + // hash provided is only for merging request together and not persisted + err = bulker.APIKeyUpdate(ctx, output.APIKeyID, newRoles.Sha2, newRoles.Raw) + if err != nil { + zlog.Error().Err(err).Msg("fail generate output key") + zlog.Debug().RawJSON("roles", newRoles.Raw).Str("sha", newRoles.Sha2).Err(err).Msg("roles not updated") + return err + } - if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil { - zlog.Error().Err(err).Msg("fail update agent record") - return err - } + output.PermissionsHash = p.Role.Sha2 // for the sake of consistency + zlog.Debug(). + Str("hash.sha256", p.Role.Sha2). + Str("roles", string(p.Role.Raw)). + Msg("Updating agent record to pick up most recent roles.") - } else if needNewKey { - zlog.Debug(). - RawJSON("roles", p.Role.Raw). - Str("oldHash", agent.PolicyOutputPermissionsHash). - Str("newHash", p.Role.Sha2). - Msg("Generating a new API key") - - outputAPIKey, err := generateOutputAPIKey(ctx, bulker, agent.Id, p.Name, p.Role.Raw) - if err != nil { - zlog.Error().Err(err).Msg("fail generate output key") - return err - } + fields := map[string]interface{}{ + dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, + } - agent.DefaultAPIKey = outputAPIKey.Agent() + // Using painless script to update permission hash for updated key + body, err := renderUpdatePainlessScript(p.Name, fields) + if err != nil { + return err + } - // When a new keys is generated we need to update the Agent record, - // this will need to be updated when multiples Elasticsearch output - // are used. - zlog.Info(). - Str("hash.sha256", p.Role.Sha2). - Str(logger.DefaultOutputAPIKeyID, outputAPIKey.ID). - Msg("Updating agent record to pick up default output key.") + if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil { + zlog.Error().Err(err).Msg("fail update agent record") + return err + } - fields := map[string]interface{}{ - dl.FieldDefaultAPIKey: outputAPIKey.Agent(), - dl.FieldDefaultAPIKeyID: outputAPIKey.ID, - dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, - } - if agent.DefaultAPIKeyID != "" { - fields[dl.FieldDefaultAPIKeyHistory] = model.DefaultAPIKeyHistoryItems{ - ID: agent.DefaultAPIKeyID, - RetiredAt: time.Now().UTC().Format(time.RFC3339), - } - } + } else if needNewKey { + zlog.Debug(). + RawJSON("fleet.policy.roles", p.Role.Raw). + Str("fleet.policy.default.oldHash", output.PermissionsHash). + Str("fleet.policy.default.newHash", p.Role.Sha2). + Msg("Generating a new API key") + + ctx := zlog.WithContext(ctx) + outputAPIKey, err := + generateOutputAPIKey(ctx, bulker, agent.Id, p.Name, p.Role.Raw) + if err != nil { + return fmt.Errorf("failed generate output API key: %w", err) + } - // Using painless script to append the old keys to the history - body, err := renderUpdatePainlessScript(fields) + // When a new keys is generated we need to update the Agent record, + // this will need to be updated when multiples remote Elasticsearch output + // are supported. + zlog.Info(). + Str("fleet.policy.role.hash.sha256", p.Role.Sha2). + Str(logger.DefaultOutputAPIKeyID, outputAPIKey.ID). + Msg("Updating agent record to pick up default output key.") + + fields := map[string]interface{}{ + dl.FieldPolicyOutputAPIKey: outputAPIKey.Agent(), + dl.FieldPolicyOutputAPIKeyID: outputAPIKey.ID, + dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, + } - if err != nil { - return err + if !foundOutput { + fields[dl.FiledType] = OutputTypeElasticsearch + } + if output.APIKeyID != "" { + fields[dl.FieldPolicyOutputToRetireAPIKeyIDs] = model.ToRetireAPIKeyIdsItems{ + ID: output.APIKeyID, + RetiredAt: time.Now().UTC().Format(time.RFC3339), } + } - if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil { - zlog.Error().Err(err).Msg("fail update agent record") - return err - } + // Using painless script to append the old keys to the history + body, err := renderUpdatePainlessScript(p.Name, fields) + if err != nil { + return fmt.Errorf("could no tupdate painless script: %w", err) } - // Always insert the `api_key` as part of the output block, this is required - // because only fleet server knows the api key for the specific agent, if we don't - // add it the agent will not receive the `api_key` and will not be able to connect - // to Elasticsearch. - // - // We need to investigate allocation with the new LS output, we had optimization - // in place to reduce number of agent policy allocation when sending the updated - // agent policy to multiple agents. - // See: https://github.com/elastic/fleet-server/issues/1301 - if ok := setMapObj(outputMap, agent.DefaultAPIKey, p.Name, "api_key"); !ok { - return ErrFailInjectAPIKey + if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil { + zlog.Error().Err(err).Msg("fail update agent record") + return fmt.Errorf("fail update agent record: %w", err) } - case OutputTypeLogstash: - zlog.Debug().Msg("preparing logstash output") - zlog.Info().Msg("no actions required for logstash output preparation") - default: - zlog.Error().Msgf("unknown output type: %s; skipping preparation", p.Type) - return fmt.Errorf("encountered unexpected output type while preparing outputs: %s", p.Type) + + // Now that all is done, we can update the output on the agent variable + // Right not it's more for consistency and to ensure the in-memory agent + // data is correct and in sync with ES, so it can be safely used after + // this method returns. + output.Type = OutputTypeElasticsearch + output.APIKey = outputAPIKey.Agent() + output.APIKeyID = outputAPIKey.ID + output.PermissionsHash = p.Role.Sha2 // for the sake of consistency } + + // Always insert the `api_key` as part of the output block, this is required + // because only fleet server knows the api key for the specific agent, if we don't + // add it the agent will not receive the `api_key` and will not be able to connect + // to Elasticsearch. + // + // We need to investigate allocation with the new LS output, we had optimization + // in place to reduce number of agent policy allocation when sending the updated + // agent policy to multiple agents. + // See: https://github.com/elastic/fleet-server/issues/1301 + if err := setMapObj(outputMap, output.APIKey, p.Name, "api_key"); err != nil { + return err + } + return nil } @@ -261,11 +306,6 @@ func mergeRoles(zlog zerolog.Logger, old, new *RoleT) (*RoleT, error) { } } - // creates new key for permissions - // marks it stale so they can be removed later - if _, exists := m[candidate]; !exists { - return fmt.Sprintf("%s-0-rdstale", candidate) - } // 1 should be enough, 100 is just to have some space for i := 0; i < 100; i++ { @@ -305,13 +345,32 @@ func mergeRoles(zlog zerolog.Logger, old, new *RoleT) (*RoleT, error) { return r, nil } -func renderUpdatePainlessScript(fields map[string]interface{}) ([]byte, error) { +func renderUpdatePainlessScript(outputName string, fields map[string]interface{}) ([]byte, error) { var source strings.Builder + + // prepare agent.elasticsearch_outputs[OUTPUT_NAME] + source.WriteString(fmt.Sprintf(` +if (ctx._source['outputs']==null) + {ctx._source['outputs']=new HashMap();} +if (ctx._source['outputs']['%s']==null) + {ctx._source['outputs']['%s']=new HashMap();} +`, outputName, outputName)) + for field := range fields { - if field == dl.FieldDefaultAPIKeyHistory { - source.WriteString(fmt.Sprint("if (ctx._source.", field, "==null) {ctx._source.", field, "=new ArrayList();} ctx._source.", field, ".add(params.", field, ");")) + if field == dl.FieldPolicyOutputToRetireAPIKeyIDs { + // dl.FieldPolicyOutputToRetireAPIKeyIDs is a special case. + // It's an array that gets deleted when the keys are invalidated. + // Thus, append the old API key ID, create the field if necessary. + source.WriteString(fmt.Sprintf(` +if (ctx._source['outputs']['%s'].%s==null) + {ctx._source['outputs']['%s'].%s=new ArrayList();} +ctx._source['outputs']['%s'].%s.add(params.%s); +`, outputName, field, outputName, field, outputName, field, field)) } else { - source.WriteString(fmt.Sprint("ctx._source.", field, "=", "params.", field, ";")) + // Update the other fields + source.WriteString(fmt.Sprintf(` +ctx._source['outputs']['%s'].%s=params.%s;`, + outputName, field, field)) } } @@ -326,36 +385,45 @@ func renderUpdatePainlessScript(fields map[string]interface{}) ([]byte, error) { return body, err } -func generateOutputAPIKey(ctx context.Context, bulk bulk.Bulk, agentID, outputName string, roles []byte) (*apikey.APIKey, error) { +func generateOutputAPIKey( + ctx context.Context, + bulk bulk.Bulk, + agentID, + outputName string, + roles []byte) (*apikey.APIKey, error) { name := fmt.Sprintf("%s:%s", agentID, outputName) + zerolog.Ctx(ctx).Info().Msgf("generating output API key %s for agent ID %s", + name, agentID) return bulk.APIKeyCreate( ctx, name, "", roles, - apikey.NewMetadata(agentID, apikey.TypeOutput), + apikey.NewMetadata(agentID, outputName, apikey.TypeOutput), ) } -func setMapObj(obj map[string]interface{}, val interface{}, keys ...string) bool { +func setMapObj(obj map[string]interface{}, val interface{}, keys ...string) error { if len(keys) == 0 { - return false + return fmt.Errorf("no key to be updated: %w", ErrFailInjectAPIKey) } for _, k := range keys[:len(keys)-1] { v, ok := obj[k] if !ok { - return false + return fmt.Errorf("no key %q not present on MapObj: %w", + k, ErrFailInjectAPIKey) } obj, ok = v.(map[string]interface{}) if !ok { - return false + return fmt.Errorf("cannot cast %T to map[string]interface{}: %w", + obj, ErrFailInjectAPIKey) } } k := keys[len(keys)-1] obj[k] = val - return true + return nil } diff --git a/internal/pkg/policy/policy_output_integration_test.go b/internal/pkg/policy/policy_output_integration_test.go new file mode 100644 index 000000000..5c8a254b8 --- /dev/null +++ b/internal/pkg/policy/policy_output_integration_test.go @@ -0,0 +1,200 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package policy + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/gofrs/uuid" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/model" + "github.com/elastic/fleet-server/v7/internal/pkg/smap" + ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" +) + +func TestRenderUpdatePainlessScript(t *testing.T) { + tts := []struct { + name string + + existingToRetireAPIKeyIds []model.ToRetireAPIKeyIdsItems + }{ + { + name: "to_retire_api_key_ids is empty", + }, + { + name: "to_retire_api_key_ids is not empty", + existingToRetireAPIKeyIds: []model.ToRetireAPIKeyIdsItems{{ + ID: "pre_existing_ID", RetiredAt: "pre_existing__RetiredAt"}}, + }, + } + + for _, tt := range tts { + t.Run(tt.name, func(t *testing.T) { + outputPermissionSha := "new_permissionSHA_" + tt.name + outputName := "output_" + tt.name + outputAPIKey := bulk.APIKey{ID: "new_ID", Key: "new-key"} + + index, bulker := ftesting.SetupCleanIndex(context.Background(), t, dl.FleetAgents) + + now := time.Now().UTC() + nowStr := now.Format(time.RFC3339) + + agentID := uuid.Must(uuid.NewV4()).String() + policyID := uuid.Must(uuid.NewV4()).String() + + previousAPIKey := bulk.APIKey{ + ID: "old_" + outputAPIKey.ID, + Key: "old_" + outputAPIKey.Key, + } + + wantOutputs := map[string]*model.PolicyOutput{ + outputName: { + APIKey: outputAPIKey.Agent(), + APIKeyID: outputAPIKey.ID, + PermissionsHash: outputPermissionSha, + Type: OutputTypeElasticsearch, + ToRetireAPIKeyIds: append(tt.existingToRetireAPIKeyIds, + model.ToRetireAPIKeyIdsItems{ + ID: previousAPIKey.ID, RetiredAt: nowStr}), + }, + } + + agentModel := model.Agent{ + PolicyID: policyID, + Active: true, + LastCheckin: nowStr, + LastCheckinStatus: "", + UpdatedAt: nowStr, + EnrolledAt: nowStr, + Outputs: map[string]*model.PolicyOutput{ + outputName: { + Type: OutputTypeElasticsearch, + APIKey: previousAPIKey.Agent(), + APIKeyID: previousAPIKey.ID, + PermissionsHash: "old_" + outputPermissionSha, + }, + }, + } + if tt.existingToRetireAPIKeyIds != nil { + agentModel.Outputs[outputName].ToRetireAPIKeyIds = + tt.existingToRetireAPIKeyIds + } + + body, err := json.Marshal(agentModel) + require.NoError(t, err) + + _, err = bulker.Create( + context.Background(), index, agentID, body, bulk.WithRefresh()) + require.NoError(t, err) + + fields := map[string]interface{}{ + dl.FieldPolicyOutputAPIKey: outputAPIKey.Agent(), + dl.FieldPolicyOutputAPIKeyID: outputAPIKey.ID, + dl.FieldPolicyOutputPermissionsHash: outputPermissionSha, + dl.FieldPolicyOutputToRetireAPIKeyIDs: model.ToRetireAPIKeyIdsItems{ + ID: previousAPIKey.ID, RetiredAt: nowStr}, + } + + got, err := renderUpdatePainlessScript(outputName, fields) + require.NoError(t, err, "renderUpdatePainlessScript returned an unexpected error") + + err = bulker.Update(context.Background(), dl.FleetAgents, agentID, got) + require.NoError(t, err, "bulker.Update failed") + + // there is some refresh thing that needs time, I didn't manage to find + // how ot fix it at the requests to ES level, thus this timeout here. + time.Sleep(time.Second) + + gotAgent, err := dl.FindAgent( + context.Background(), bulker, dl.QueryAgentByID, dl.FieldID, agentID, dl.WithIndexName(index)) + require.NoError(t, err) + + assert.Equal(t, agentID, gotAgent.Id) + assert.Len(t, gotAgent.Outputs, len(wantOutputs)) + assert.Equal(t, wantOutputs, gotAgent.Outputs) + }) + } +} + +func TestPolicyOutputESPrepareRealES(t *testing.T) { + index, bulker := ftesting.SetupCleanIndex(context.Background(), t, dl.FleetAgents) + + agentID := createAgent(t, index, bulker) + agent, err := dl.FindAgent( + context.Background(), bulker, dl.QueryAgentByID, dl.FieldID, agentID, dl.WithIndexName(index)) + if err != nil { + require.NoError(t, err, "failed to find agent ID %q", agentID) + } + + output := Output{ + Type: OutputTypeElasticsearch, + Name: "test output", + Role: &RoleT{ + Sha2: "new-hash", + Raw: TestPayload, + }, + } + policyMap := smap.Map{ + "test output": map[string]interface{}{}, + } + + err = output.prepareElasticsearch( + context.Background(), zerolog.Nop(), bulker, &agent, policyMap) + require.NoError(t, err) + + // need to wait a bit before querying the agent again + // TODO: find a better way to query the updated agent + time.Sleep(time.Second) + + got, err := dl.FindAgent( + context.Background(), bulker, dl.QueryAgentByID, dl.FieldID, agentID, dl.WithIndexName(index)) + if err != nil { + require.NoError(t, err, "failed to find agent ID %q", agentID) + } + + gotOutput, ok := got.Outputs[output.Name] + require.True(t, ok, "no '%s' output fouled on agent document", output.Name) + + assert.Empty(t, gotOutput.ToRetireAPIKeyIds) + assert.Equal(t, gotOutput.Type, OutputTypeElasticsearch) + assert.Equal(t, gotOutput.PermissionsHash, output.Role.Sha2) + assert.NotEmpty(t, gotOutput.APIKey) + assert.NotEmpty(t, gotOutput.APIKeyID) +} + +func createAgent(t *testing.T, index string, bulker bulk.Bulk) string { + const nowStr = "2022-08-12T16:50:05Z" + + agentID := uuid.Must(uuid.NewV4()).String() + policyID := uuid.Must(uuid.NewV4()).String() + + agentModel := model.Agent{ + PolicyID: policyID, + Active: true, + LastCheckin: nowStr, + LastCheckinStatus: "", + UpdatedAt: nowStr, + EnrolledAt: nowStr, + } + + body, err := json.Marshal(agentModel) + require.NoError(t, err) + + _, err = bulker.Create( + context.Background(), index, agentID, body, bulk.WithRefresh()) + require.NoError(t, err) + + return agentID +} diff --git a/internal/pkg/policy/policy_output_test.go b/internal/pkg/policy/policy_output_test.go index 89b05bdbf..f74d57b3e 100644 --- a/internal/pkg/policy/policy_output_test.go +++ b/internal/pkg/policy/policy_output_test.go @@ -8,6 +8,7 @@ import ( "context" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -23,7 +24,7 @@ var TestPayload []byte func TestPolicyLogstashOutputPrepare(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - po := PolicyOutput{ + po := Output{ Type: OutputTypeLogstash, Name: "test output", Role: &RoleT{ @@ -39,7 +40,7 @@ func TestPolicyLogstashOutputPrepare(t *testing.T) { func TestPolicyLogstashOutputPrepareNoRole(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - po := PolicyOutput{ + po := Output{ Type: OutputTypeLogstash, Name: "test output", Role: nil, @@ -54,7 +55,7 @@ func TestPolicyLogstashOutputPrepareNoRole(t *testing.T) { func TestPolicyDefaultLogstashOutputPrepare(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - po := PolicyOutput{ + po := Output{ Type: OutputTypeLogstash, Name: "test output", Role: &RoleT{ @@ -71,7 +72,7 @@ func TestPolicyDefaultLogstashOutputPrepare(t *testing.T) { func TestPolicyESOutputPrepareNoRole(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - po := PolicyOutput{ + po := Output{ Type: OutputTypeElasticsearch, Name: "test output", Role: nil, @@ -86,8 +87,11 @@ func TestPolicyOutputESPrepare(t *testing.T) { t.Run("Permission hash == Agent Permission Hash no need to regenerate the key", func(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() + + apiKey := bulk.APIKey{ID: "test_id_existing", Key: "existing-key"} + hashPerm := "abc123" - po := PolicyOutput{ + output := Output{ Type: OutputTypeElasticsearch, Name: "test output", Role: &RoleT{ @@ -101,33 +105,64 @@ func TestPolicyOutputESPrepare(t *testing.T) { } testAgent := &model.Agent{ - DefaultAPIKey: "test_id:EXISTING-KEY", - PolicyOutputPermissionsHash: hashPerm, + Outputs: map[string]*model.PolicyOutput{ + output.Name: { + ESDocument: model.ESDocument{}, + APIKey: apiKey.Agent(), + ToRetireAPIKeyIds: nil, + APIKeyID: apiKey.ID, + PermissionsHash: hashPerm, + Type: OutputTypeElasticsearch, + }, + }, } - err := po.Prepare(context.Background(), logger, bulker, testAgent, policyMap) + err := output.Prepare(context.Background(), logger, bulker, testAgent, policyMap) require.NoError(t, err, "expected prepare to pass") - key, ok := policyMap.GetMap("test output")["api_key"].(string) + key, ok := policyMap.GetMap(output.Name)["api_key"].(string) + gotOutput := testAgent.Outputs[output.Name] - require.True(t, ok, "unable to case api key") - require.Equal(t, testAgent.DefaultAPIKey, key) - bulker.AssertNotCalled(t, "Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) - bulker.AssertNotCalled(t, "APIKeyCreate", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + require.True(t, ok, "api key not present on policy map") + assert.Equal(t, apiKey.Agent(), key) + + assert.Equal(t, apiKey.Agent(), gotOutput.APIKey) + assert.Equal(t, apiKey.ID, gotOutput.APIKeyID) + assert.Equal(t, output.Role.Sha2, gotOutput.PermissionsHash) + assert.Equal(t, output.Type, gotOutput.Type) + assert.Empty(t, gotOutput.ToRetireAPIKeyIds) + + // Old model must always remain empty + assert.Empty(t, testAgent.DefaultAPIKey) + assert.Empty(t, testAgent.DefaultAPIKeyID) + assert.Empty(t, testAgent.DefaultAPIKeyHistory) + assert.Empty(t, testAgent.PolicyOutputPermissionsHash) + + bulker.AssertNotCalled(t, "Update", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + bulker.AssertNotCalled(t, "APIKeyCreate", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) bulker.AssertExpectations(t) }) t.Run("Permission hash != Agent Permission Hash need to regenerate permissions", func(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - bulker.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - bulker.On("APIKeyUpdate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + + oldAPIKey := bulk.APIKey{ID: "test_id", Key: "EXISTING-KEY"} + wantAPIKey := bulk.APIKey{ID: "test_id", Key: "EXISTING-KEY"} + hashPerm := "old-HASH" + bulker. On("APIKeyRead", mock.Anything, mock.Anything, mock.Anything). Return(&bulk.APIKeyMetadata{ID: "test_id", RoleDescriptors: TestPayload}, nil). Once() + bulker.On("Update", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(nil).Once() + bulker.On("APIKeyUpdate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - po := PolicyOutput{ + output := Output{ Type: OutputTypeElasticsearch, Name: "test output", Role: &RoleT{ @@ -141,27 +176,55 @@ func TestPolicyOutputESPrepare(t *testing.T) { } testAgent := &model.Agent{ - DefaultAPIKey: "test_id:EXISTING-KEY", - PolicyOutputPermissionsHash: "old-HASH", + Outputs: map[string]*model.PolicyOutput{ + output.Name: { + ESDocument: model.ESDocument{}, + APIKey: oldAPIKey.Agent(), + ToRetireAPIKeyIds: nil, + APIKeyID: oldAPIKey.ID, + PermissionsHash: hashPerm, + Type: OutputTypeElasticsearch, + }, + }, } - err := po.Prepare(context.Background(), logger, bulker, testAgent, policyMap) + err := output.Prepare(context.Background(), logger, bulker, testAgent, policyMap) require.NoError(t, err, "expected prepare to pass") - key, ok := policyMap.GetMap("test output")["api_key"].(string) + key, ok := policyMap.GetMap(output.Name)["api_key"].(string) + gotOutput := testAgent.Outputs[output.Name] require.True(t, ok, "unable to case api key") - require.Equal(t, testAgent.DefaultAPIKey, key) + require.Equal(t, wantAPIKey.Agent(), key) + + assert.Equal(t, wantAPIKey.Agent(), gotOutput.APIKey) + assert.Equal(t, wantAPIKey.ID, gotOutput.APIKeyID) + assert.Equal(t, output.Role.Sha2, gotOutput.PermissionsHash) + assert.Equal(t, output.Type, gotOutput.Type) + + // assert.Contains(t, gotOutput.ToRetireAPIKeyIds, oldAPIKey.ID) // TODO: assert on bulker.Update + + // Old model must always remain empty + assert.Empty(t, testAgent.DefaultAPIKey) + assert.Empty(t, testAgent.DefaultAPIKeyID) + assert.Empty(t, testAgent.DefaultAPIKeyHistory) + assert.Empty(t, testAgent.PolicyOutputPermissionsHash) + bulker.AssertExpectations(t) }) t.Run("Generate API Key on new Agent", func(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - bulker.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - bulker.On("APIKeyCreate", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&bulk.APIKey{"abc", "new-key"}, nil).Once() //nolint:govet // test case - - po := PolicyOutput{ + bulker.On("Update", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(nil).Once() + apiKey := bulk.APIKey{ID: "abc", Key: "new-key"} + bulker.On("APIKeyCreate", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&apiKey, nil).Once() + + output := Output{ Type: OutputTypeElasticsearch, Name: "test output", Role: &RoleT{ @@ -174,15 +237,29 @@ func TestPolicyOutputESPrepare(t *testing.T) { "test output": map[string]interface{}{}, } - testAgent := &model.Agent{} + testAgent := &model.Agent{Outputs: map[string]*model.PolicyOutput{}} - err := po.Prepare(context.Background(), logger, bulker, testAgent, policyMap) + err := output.Prepare(context.Background(), logger, bulker, testAgent, policyMap) require.NoError(t, err, "expected prepare to pass") - key, ok := policyMap.GetMap("test output")["api_key"].(string) + key, ok := policyMap.GetMap(output.Name)["api_key"].(string) + gotOutput := testAgent.Outputs[output.Name] require.True(t, ok, "unable to case api key") - require.Equal(t, "abc:new-key", key) + assert.Equal(t, apiKey.Agent(), key) + + assert.Equal(t, apiKey.Agent(), gotOutput.APIKey) + assert.Equal(t, apiKey.ID, gotOutput.APIKeyID) + assert.Equal(t, output.Role.Sha2, gotOutput.PermissionsHash) + assert.Equal(t, output.Type, gotOutput.Type) + assert.Empty(t, gotOutput.ToRetireAPIKeyIds) + + // Old model must always remain empty + assert.Empty(t, testAgent.DefaultAPIKey) + assert.Empty(t, testAgent.DefaultAPIKeyID) + assert.Empty(t, testAgent.DefaultAPIKeyHistory) + assert.Empty(t, testAgent.PolicyOutputPermissionsHash) + bulker.AssertExpectations(t) }) } diff --git a/internal/pkg/testing/esutil/bootstrap.go b/internal/pkg/testing/esutil/bootstrap.go index e2aafce76..978f95a75 100644 --- a/internal/pkg/testing/esutil/bootstrap.go +++ b/internal/pkg/testing/esutil/bootstrap.go @@ -10,7 +10,7 @@ import ( "github.com/elastic/go-elasticsearch/v7" ) -// EnsureIndex sets up the index if it doesn't exists, utilized for integration tests at the moment +// EnsureIndex sets up the index if it doesn't exist. It's utilized for integration tests at the moment. func EnsureIndex(ctx context.Context, cli *elasticsearch.Client, name, mapping string) error { err := EnsureTemplate(ctx, cli, name, mapping, false) if err != nil { diff --git a/internal/pkg/testing/setup.go b/internal/pkg/testing/setup.go index 8dac38cdc..8f38ba7e6 100644 --- a/internal/pkg/testing/setup.go +++ b/internal/pkg/testing/setup.go @@ -98,7 +98,7 @@ func SetupCleanIndex(ctx context.Context, t *testing.T, index string, opts ...bu func CleanIndex(ctx context.Context, t *testing.T, bulker bulk.Bulk, index string) string { t.Helper() - t.Helper() + tmpl := dsl.NewTmpl() root := dsl.NewRoot() root.Query().MatchAll() @@ -106,25 +106,25 @@ func CleanIndex(ctx context.Context, t *testing.T, bulker bulk.Bulk, index strin query, err := q.Render(make(map[string]interface{})) if err != nil { - t.Fatal(err) + t.Fatalf("could not clean index: failed to render query template: %v", err) } cli := bulker.Client() + res, err := cli.API.DeleteByQuery([]string{index}, bytes.NewReader(query), cli.API.DeleteByQuery.WithContext(ctx), cli.API.DeleteByQuery.WithRefresh(true), ) - if err != nil { - t.Fatal(err) + t.Fatalf("could not clean index %s, DeleteByQuery failed: %v", + index, err) } defer res.Body.Close() var esres es.DeleteByQueryResponse - err = json.NewDecoder(res.Body).Decode(&esres) if err != nil { - t.Fatal(err) + t.Fatalf("could not decode ES response: %v", err) } if res.IsError() { @@ -135,9 +135,9 @@ func CleanIndex(ctx context.Context, t *testing.T, bulker bulk.Bulk, index strin } } } - if err != nil { - t.Fatal(err) + t.Fatalf("ES returned an error: %v. body: %q", err, res) } + return index } diff --git a/model/schema.json b/model/schema.json index 91bda492f..423cfe3fc 100644 --- a/model/schema.json +++ b/model/schema.json @@ -244,6 +244,7 @@ "name" ] }, + "server-metadata": { "title": "Server Metadata", "description": "A Fleet Server metadata", @@ -264,6 +265,7 @@ "version" ] }, + "server": { "title": "Server", "description": "A Fleet Server", @@ -284,6 +286,7 @@ "server" ] }, + "policy": { "title": "Policy", "description": "A policy that an Elastic Agent is attached to", @@ -329,6 +332,7 @@ "default_fleet_server" ] }, + "policy-leader": { "title": "Policy Leader", "description": "The current leader Fleet Server for a policy", @@ -345,6 +349,60 @@ "server" ] }, + + "to_retire_api_key_ids": { + "type": "array", + "items": { + "description": "the Output API Keys that were replaced and should be retired", + "type": "object", + "properties": { + "id": { + "description": "API Key identifier", + "type": "string" + }, + "retired_at": { + "description": "Date/time the API key was retired", + "type": "string", + "format": "date-time" + } + } + } + }, + + "policy_output" : { + "type": "object", + "description": "holds the needed data to manage the output API keys", + "properties": { + "api_key": { + "description": "API key the Elastic Agent uses to authenticate with elasticsearch", + "type": "string" + }, + "to_retire_api_key_ids": { + "description": "API keys to be invalidated on next agent ack", + "$ref": "#/definitions/to_retire_api_key_ids" + }, + "api_key_id": { + "description": "ID of the API key the Elastic Agent uses to authenticate with elasticsearch", + "type": "string" + }, + "permissions_hash": { + "description": "The policy output permissions hash", + "type": "string" + }, + "type": { + "description": "Type is the output type. Currently only Elasticsearch is supported.", + "type": "string" + } + }, + "required": [ + "api_key", + "api_key_history", + "api_key_id", + "permissions_hash", + "type" + ] + }, + "agent": { "title": "Agent", "description": "An Elastic Agent that has enrolled into Fleet", @@ -441,7 +499,7 @@ "type": "integer" }, "policy_output_permissions_hash": { - "description": "The policy output permissions hash", + "description": "Deprecated. Use Outputs instead. The policy output permissions hash", "type": "string" }, "last_updated": { @@ -459,30 +517,21 @@ "type": "string" }, "default_api_key_id": { - "description": "ID of the API key the Elastic Agent uses to authenticate with elasticsearch", + "description": "Deprecated. Use Outputs instead. ID of the API key the Elastic Agent uses to authenticate with elasticsearch", "type": "string" }, "default_api_key": { - "description": "API key the Elastic Agent uses to authenticate with elasticsearch", + "description": "Deprecated. Use Outputs instead. API key the Elastic Agent uses to authenticate with elasticsearch", "type": "string" }, "default_api_key_history": { - "description": "Default API Key History", - "type": "array", - "items": { - "type": "object", - "properties": { - "id": { - "description": "API Key identifier", - "type": "string" - }, - "retired_at": { - "description": "Date/time the API key was retired", - "type": "string", - "format": "date-time" - } - } - } + "description": "Deprecated. Use Outputs instead. Default API Key History", + "$ref": "#/definitions/to_retire_api_key_ids" + }, + "outputs": { + "description": "Outputs is the policy output data, mapping the output name to its data", + "type": "object", + "additionalProperties": { "$ref": "#/definitions/policy_output"} }, "updated_at": { "description": "Date/time the Elastic Agent was last updated", @@ -512,6 +561,7 @@ "status" ] }, + "enrollment_api_key": { "title": "Enrollment API key", "description": "An Elastic Agent enrollment API key", @@ -555,6 +605,7 @@ ] } }, + "checkin": { "title": "Checkin", "description": "An Elastic Agent checkin to Fleet",