diff --git a/dev-tools/integration/.env b/dev-tools/integration/.env index 8d460e721f..b4bb4fe9f1 100644 --- a/dev-tools/integration/.env +++ b/dev-tools/integration/.env @@ -1,4 +1,4 @@ -ELASTICSEARCH_VERSION=8.5.0-c7913db3-SNAPSHOT +ELASTICSEARCH_VERSION=8.5.0-56d2c52d-SNAPSHOT ELASTICSEARCH_USERNAME=elastic ELASTICSEARCH_PASSWORD=changeme TEST_ELASTICSEARCH_HOSTS=localhost:9200 \ No newline at end of file diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 1261d8f6c5..7128fe4c8c 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -26,6 +26,8 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/policy" + "github.com/elastic/fleet-server/v7/internal/pkg/smap" + "github.com/pkg/errors" "github.com/julienschmidt/httprouter" "github.com/rs/zerolog" @@ -351,7 +353,51 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag return nil } - ack.invalidateAPIKeys(ctx, agent) + if agent.DefaultAPIKeyID != "" { + res, err := ack.bulk.APIKeyRead(ctx, agent.DefaultAPIKeyID, true) + if err != nil { + zlog.Error(). + Err(err). + Str(LogAPIKeyID, agent.DefaultAPIKeyID). + Msg("Failed to read API Key roles") + } else { + clean, removedRolesCount, err := cleanRoles(res.RoleDescriptors) + if err != nil { + zlog.Error(). + Err(err). + RawJSON("roles", res.RoleDescriptors). + Str(LogAPIKeyID, agent.DefaultAPIKeyID). + Msg("Failed to cleanup roles") + } else if removedRolesCount > 0 { + if err := ack.bulk.APIKeyUpdate(ctx, agent.DefaultAPIKeyID, agent.PolicyOutputPermissionsHash, clean); err != nil { + zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, agent.DefaultAPIKeyID).Msg("Failed to update API Key") + } else { + zlog.Debug(). + Str("hash.sha256", agent.PolicyOutputPermissionsHash). + Str(LogAPIKeyID, agent.DefaultAPIKeyID). + RawJSON("roles", clean). + Int("removedRoles", removedRolesCount). + Msg("Updating agent record to pick up reduced roles.") + } + } + } + } + + sz := len(agent.DefaultAPIKeyHistory) + if sz > 0 { + ids := make([]string, sz) + for i := 0; i < sz; i++ { + if agent.DefaultAPIKeyHistory[i].ID == agent.DefaultAPIKeyID { + // already updated + continue + } + ids[i] = agent.DefaultAPIKeyHistory[i].ID + } + log.Info().Strs("ids", ids).Msg("Invalidate old API keys") + if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil { + log.Warn().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys") + } + } body := makeUpdatePolicyBody( agent.PolicyID, @@ -368,7 +414,7 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag bulk.WithRetryOnConflict(3), ) - zlog.Info().Err(err). + zlog.Err(err). Str(LogPolicyID, agent.PolicyID). Int64("policyRevision", currRev). Int64("policyCoordinator", currCoord). @@ -377,20 +423,29 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag return errors.Wrap(err, "handlePolicyChange update") } -func (ack *AckT) invalidateAPIKeys(ctx context.Context, agent *model.Agent) { - var ids []string - for _, out := range agent.Outputs { - for _, k := range out.ToRetireAPIKeyIds { - ids = append(ids, k.ID) - } +func cleanRoles(roles json.RawMessage) (json.RawMessage, int, error) { + rr := smap.Map{} + if err := json.Unmarshal(roles, &rr); err != nil { + return nil, 0, errors.Wrap(err, "failed to unmarshal provided roles") } - if len(ids) > 0 { - log.Info().Strs("fleet.policy.apiKeyIDsToRetire", ids).Msg("Invalidate old API keys") - if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil { - log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys") + keys := make([]string, 0, len(rr)) + for k := range rr { + if strings.HasSuffix(k, "-rdstale") { + keys = append(keys, k) } } + + if len(keys) == 0 { + return roles, 0, nil + } + + for _, k := range keys { + delete(rr, k) + } + + r, err := json.Marshal(rr) + return r, len(keys), errors.Wrap(err, "failed to marshal resulting role definition") } func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error { diff --git a/internal/pkg/api/handleEnroll.go b/internal/pkg/api/handleEnroll.go index 7c5b1dd5a4..9123723d6c 100644 --- a/internal/pkg/api/handleEnroll.go +++ b/internal/pkg/api/handleEnroll.go @@ -298,7 +298,7 @@ func invalidateAPIKey(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk LOOP: for { - _, err := bulker.APIKeyRead(ctx, apikeyID) + _, err := bulker.APIKeyRead(ctx, apikeyID, false) switch { case err == nil: diff --git a/internal/pkg/apikey/apikey_integration_test.go b/internal/pkg/apikey/apikey_integration_test.go index 72f410d999..5e3212f0ae 100644 --- a/internal/pkg/apikey/apikey_integration_test.go +++ b/internal/pkg/apikey/apikey_integration_test.go @@ -45,12 +45,13 @@ func TestRead(t *testing.T) { } // Try to get the key that doesn't exist, expect ErrApiKeyNotFound - _, err = Read(ctx, es, "0000000000000") + _, err = Read(ctx, es, "0000000000000", false) if !errors.Is(err, ErrAPIKeyNotFound) { t.Errorf("Unexpected error type: %v", err) } } + func TestCreateAPIKeyWithMetadata(t *testing.T) { tts := []struct { name string @@ -92,7 +93,7 @@ func TestCreateAPIKeyWithMetadata(t *testing.T) { } // Get the API key and verify that the metadata was saved correctly - aKeyMeta, err := Read(ctx, es, apiKey.ID) + aKeyMeta, err := Read(ctx, es, apiKey.ID, false) if err != nil { t.Fatal(err) } diff --git a/internal/pkg/bulk/block.go b/internal/pkg/bulk/block.go index 28c80927e4..c2535172eb 100644 --- a/internal/pkg/bulk/block.go +++ b/internal/pkg/bulk/block.go @@ -43,6 +43,7 @@ const ( ActionDelete ActionIndex ActionUpdate + ActionUpdateAPIKey ActionRead ActionSearch ActionFleetSearch @@ -53,6 +54,7 @@ var actionStrings = []string{ "delete", "index", "update", + "update_api_key", "read", "search", "fleet_search", diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 93420c7805..68840729c2 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -55,9 +55,10 @@ type Bulk interface { // APIKey operations APIKeyCreate(ctx context.Context, name, ttl string, roles []byte, meta interface{}) (*APIKey, error) - APIKeyRead(ctx context.Context, id string) (*APIKeyMetadata, error) + APIKeyRead(ctx context.Context, id string, withOwner bool) (*APIKeyMetadata, error) APIKeyAuth(ctx context.Context, key APIKey) (*SecurityInfo, error) APIKeyInvalidate(ctx context.Context, ids ...string) error + APIKeyUpdate(ctx context.Context, id, outputPolicyHash string, roles []byte) error // Accessor used to talk to elastic search direcly bypassing bulk engine Client() *elasticsearch.Client @@ -81,6 +82,7 @@ const ( defaultMaxPending = 32 defaultBlockQueueSz = 32 // Small capacity to allow multiOp to spin fast defaultAPIKeyMaxParallel = 32 + defaultApikeyMaxReqSize = 100 * 1024 * 1024 ) func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker { @@ -136,6 +138,8 @@ func blkToQueueType(blk *bulkT) queueType { } else { queueIdx = kQueueRead } + case ActionUpdateAPIKey: + queueIdx = kQueueAPIKeyUpdate default: if forceRefresh { queueIdx = kQueueRefreshBulk @@ -288,6 +292,8 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu err = b.flushRead(ctx, queue) case kQueueSearch, kQueueFleetSearch: err = b.flushSearch(ctx, queue) + case kQueueAPIKeyUpdate: + err = b.flushUpdateAPIKey(ctx, queue) default: err = b.flushBulk(ctx, queue) } diff --git a/internal/pkg/bulk/opApiKey.go b/internal/pkg/bulk/opApiKey.go index 049c0ce172..099a7d2917 100644 --- a/internal/pkg/bulk/opApiKey.go +++ b/internal/pkg/bulk/opApiKey.go @@ -5,15 +5,36 @@ package bulk import ( + "bytes" "context" + "encoding/json" + "math" "github.com/elastic/fleet-server/v7/internal/pkg/apikey" + "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/rs/zerolog/log" +) + +const ( + envelopeSize = 64 // 64B + safeBuffer = 0.9 ) // The ApiKey API's are not yet bulk enabled. Stub the calls in the bulker // and limit parallel access to prevent many requests from overloading // the connection pool in the elastic search client. +type apiKeyUpdateRequest struct { + ID string `json:"id,omitempty"` + Roles json.RawMessage `json:"role_descriptors,omitempty"` + RolesHash string `json:"role_hash,omitempty"` +} + +type esAPIKeyBulkUpdateRequest struct { + IDs []string `json:"ids,omitempty"` + Roles json.RawMessage `json:"role_descriptors,omitempty"` +} + func (b *Bulker) APIKeyAuth(ctx context.Context, key APIKey) (*SecurityInfo, error) { if err := b.apikeyLimit.Acquire(ctx, 1); err != nil { return nil, err @@ -32,13 +53,13 @@ func (b *Bulker) APIKeyCreate(ctx context.Context, name, ttl string, roles []byt return apikey.Create(ctx, b.Client(), name, ttl, "false", roles, meta) } -func (b *Bulker) APIKeyRead(ctx context.Context, id string) (*APIKeyMetadata, error) { +func (b *Bulker) APIKeyRead(ctx context.Context, id string, withOwner bool) (*APIKeyMetadata, error) { if err := b.apikeyLimit.Acquire(ctx, 1); err != nil { return nil, err } defer b.apikeyLimit.Release(1) - return apikey.Read(ctx, b.Client(), id) + return apikey.Read(ctx, b.Client(), id, withOwner) } func (b *Bulker) APIKeyInvalidate(ctx context.Context, ids ...string) error { @@ -49,3 +70,178 @@ func (b *Bulker) APIKeyInvalidate(ctx context.Context, ids ...string) error { return apikey.Invalidate(ctx, b.Client(), ids...) } + +func (b *Bulker) APIKeyUpdate(ctx context.Context, id, outputPolicyHash string, roles []byte) error { + req := &apiKeyUpdateRequest{ + ID: id, + Roles: roles, + RolesHash: outputPolicyHash, + } + + body, err := json.Marshal(req) + if err != nil { + return err + } + + _, err = b.waitBulkAction(ctx, ActionUpdateAPIKey, "", id, body) + return err +} + +// flushUpdateAPIKey takes an update API Key queue and groups request based on roles applied +// It needs to group agent IDs per Role Hash in order to produce more efficient request containing a list of IDs for a change(update) +// One thing to have in mind is that in a single queue there may be change and ack request with roles. in this case +// Later occurrence wins overwriting policy change to reduced set of permissions. +// Even if the order was incorrect we end up with just a bit broader permission set, never too strict, so agent does not +// end up with fewer permissions than it needs +func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error { + idsPerRole := make(map[string][]string) + roles := make(map[string]json.RawMessage) + rolePerID := make(map[string]string) + responses := make(map[int]int) + idxToID := make(map[int32]string) + IDToResponse := make(map[string]int) + maxKeySize := 0 + + // merge ids + for n := queue.head; n != nil; n = n.next { + content := n.buf.Bytes() + metaMap := make(map[string]interface{}) + dec := json.NewDecoder(bytes.NewReader(content)) + if err := dec.Decode(&metaMap); err != nil { + log.Error(). + Err(err). + Str("mod", kModBulk). + Msg("Failed to unmarshal api key update meta map") + return err + } + + var req *apiKeyUpdateRequest + if err := dec.Decode(&req); err != nil { + log.Error(). + Err(err). + Str("mod", kModBulk). + Str("request", string(content)). + Msg("Failed to unmarshal api key update request") + return err + } + + if _, tracked := roles[req.RolesHash]; !tracked { + roles[req.RolesHash] = req.Roles + } + + // last one wins, it may be policy change and ack are in the same queue + rolePerID[req.ID] = req.RolesHash + idxToID[n.idx] = req.ID + if maxKeySize < len(req.ID) { + maxKeySize = len(req.ID) + } + } + + for id, roleHash := range rolePerID { + delete(rolePerID, id) + idsPerRole[roleHash] = append(idsPerRole[roleHash], id) + + } + + responseIdx := 0 + for hash, role := range roles { + idsPerBatch := b.getIDsCountPerBatch(len(role), maxKeySize) + ids := idsPerRole[hash] + if idsPerBatch <= 0 { + log.Error().Str("err", "request too large").Msg("No API Key ID could fit request size for bulk update") + log.Debug(). + RawJSON("role", role). + Strs("ids", ids). + Msg("IDs could not fit into a message") + + // idsPerRole for specific role no longer needed + delete(idsPerRole, hash) + continue + } + + batches := int(math.Ceil(float64(len(ids)) / float64(idsPerBatch))) + + // batch ids into batches of meaningful size + for batch := 0; batch < batches; batch++ { + // guard against indexing out of range + to := (batch + 1) * idsPerBatch + if to > len(ids) { + to = len(ids) + } + + // handle ids in batch, we put them into single request + // and assign response index to the id so we can notify caller + idsInBatch := ids[batch*idsPerBatch : to] + bulkReq := &esAPIKeyBulkUpdateRequest{ + IDs: idsInBatch, + Roles: role, + } + delete(roles, hash) + + payload, err := json.Marshal(bulkReq) + if err != nil { + return err + } + + req := &es.UpdateApiKeyBulkRequest{ + Body: bytes.NewReader(payload), + } + + res, err := req.Do(ctx, b.es) + if err != nil { + log.Error().Err(err).Msg("Error sending bulk API Key update request to Elasticsearch") + return err + } + if res.Body != nil { + defer res.Body.Close() + } + if res.IsError() { + log.Error().Str("err", res.String()).Msg("Error in bulk API Key update result to Elasticsearch") + return parseError(res) + } + + log.Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("API Keys updated.") + + responses[responseIdx] = res.StatusCode + for _, id := range idsInBatch { + IDToResponse[id] = responseIdx + } + responseIdx++ + } + + // idsPerRole for specific role no longer needed + delete(idsPerRole, hash) + } + + // WARNING: Once we start pushing items to + // the queue, the node pointers are invalid. + // Do NOT return a non-nil value or failQueue + // up the stack will fail. + + for n := queue.head; n != nil; n = n.next { + // 'n' is invalid immediately on channel send + responseIdx := IDToResponse[idxToID[n.idx]] + res := responses[responseIdx] + select { + case n.ch <- respT{ + err: nil, + idx: n.idx, + data: &BulkIndexerResponseItem{ + DocumentID: "", + Status: res, + }, + }: + default: + panic("Unexpected blocked response channel on flushRead") + } + } + return nil +} + +func (b *Bulker) getIDsCountPerBatch(roleSize, maxKeySize int) int { + spareSpace := b.opts.apikeyMaxReqSize - roleSize - envelopeSize + if spareSpace > maxKeySize { + return int(float64(spareSpace) * safeBuffer / float64(maxKeySize)) + } + return 0 +} diff --git a/internal/pkg/bulk/opt.go b/internal/pkg/bulk/opt.go index e0701823eb..6eeb2fe217 100644 --- a/internal/pkg/bulk/opt.go +++ b/internal/pkg/bulk/opt.go @@ -62,6 +62,7 @@ type bulkOptT struct { maxPending int blockQueueSz int apikeyMaxParallel int + apikeyMaxReqSize int } type BulkOpt func(*bulkOptT) @@ -108,6 +109,15 @@ func WithAPIKeyMaxParallel(max int) BulkOpt { } } +// WithAPIKeyMaxRequestSize sets the maximum size of the request body. Default 100MB +func WithAPIKeyMaxRequestSize(maxBytes int) BulkOpt { + return func(opt *bulkOptT) { + if opt.apikeyMaxReqSize > 0 { + opt.apikeyMaxReqSize = maxBytes + } + } +} + func parseBulkOpts(opts ...BulkOpt) bulkOptT { bopt := bulkOptT{ flushInterval: defaultFlushInterval, @@ -116,6 +126,7 @@ func parseBulkOpts(opts ...BulkOpt) bulkOptT { maxPending: defaultMaxPending, apikeyMaxParallel: defaultAPIKeyMaxParallel, blockQueueSz: defaultBlockQueueSz, + apikeyMaxReqSize: defaultApikeyMaxReqSize, } for _, f := range opts { @@ -132,6 +143,7 @@ func (o *bulkOptT) MarshalZerologObject(e *zerolog.Event) { e.Int("maxPending", o.maxPending) e.Int("blockQueueSz", o.blockQueueSz) e.Int("apikeyMaxParallel", o.apikeyMaxParallel) + e.Int("apikeyMaxReqSize", o.apikeyMaxReqSize) } // BulkOptsFromCfg transforms config to a slize of BulkOpt @@ -152,5 +164,6 @@ func BulkOptsFromCfg(cfg *config.Config) []BulkOpt { WithFlushThresholdSize(bulkCfg.FlushThresholdSize), WithMaxPending(bulkCfg.FlushMaxPending), WithAPIKeyMaxParallel(maxKeyParallel), + WithAPIKeyMaxRequestSize(cfg.Output.Elasticsearch.MaxContentLength), } } diff --git a/internal/pkg/bulk/queue.go b/internal/pkg/bulk/queue.go index dc7bde5d15..f2060212a3 100644 --- a/internal/pkg/bulk/queue.go +++ b/internal/pkg/bulk/queue.go @@ -20,6 +20,7 @@ const ( kQueueFleetSearch kQueueRefreshBulk kQueueRefreshRead + kQueueAPIKeyUpdate kNumQueues ) @@ -37,6 +38,8 @@ func (q queueT) Type() string { return "refreshBulk" case kQueueRefreshRead: return "refreshRead" + case kQueueAPIKeyUpdate: + return "apiKeyUpdate" } panic("unknown") } diff --git a/internal/pkg/config/config_test.go b/internal/pkg/config/config_test.go index 4e3d7ea616..110f937835 100644 --- a/internal/pkg/config/config_test.go +++ b/internal/pkg/config/config_test.go @@ -237,12 +237,13 @@ func defaultFleet() Fleet { func defaultElastic() Elasticsearch { return Elasticsearch{ - Protocol: "http", - ServiceToken: "test-token", - Hosts: []string{"localhost:9200"}, - MaxRetries: 3, - MaxConnPerHost: 128, - Timeout: 90 * time.Second, + Protocol: "http", + ServiceToken: "test-token", + Hosts: []string{"localhost:9200"}, + MaxRetries: 3, + MaxConnPerHost: 128, + MaxContentLength: 104857600, + Timeout: 90 * time.Second, } } diff --git a/internal/pkg/config/output.go b/internal/pkg/config/output.go index 5804d58583..8e8751d45a 100644 --- a/internal/pkg/config/output.go +++ b/internal/pkg/config/output.go @@ -28,19 +28,20 @@ var hasScheme = regexp.MustCompile(`^([a-z][a-z0-9+\-.]*)://`) // Elasticsearch is the configuration for elasticsearch. type Elasticsearch struct { - Protocol string `config:"protocol"` - Hosts []string `config:"hosts"` - Path string `config:"path"` - Headers map[string]string `config:"headers"` - APIKey string `config:"api_key"` - ServiceToken string `config:"service_token"` - ProxyURL string `config:"proxy_url"` - ProxyDisable bool `config:"proxy_disable"` - ProxyHeaders map[string]string `config:"proxy_headers"` - TLS *tlscommon.Config `config:"ssl"` - MaxRetries int `config:"max_retries"` - MaxConnPerHost int `config:"max_conn_per_host"` - Timeout time.Duration `config:"timeout"` + Protocol string `config:"protocol"` + Hosts []string `config:"hosts"` + Path string `config:"path"` + Headers map[string]string `config:"headers"` + APIKey string `config:"api_key"` + ServiceToken string `config:"service_token"` + ProxyURL string `config:"proxy_url"` + ProxyDisable bool `config:"proxy_disable"` + ProxyHeaders map[string]string `config:"proxy_headers"` + TLS *tlscommon.Config `config:"ssl"` + MaxRetries int `config:"max_retries"` + MaxConnPerHost int `config:"max_conn_per_host"` + Timeout time.Duration `config:"timeout"` + MaxContentLength int `config:"max_content_length"` } // InitDefaults initializes the defaults for the configuration. @@ -50,6 +51,7 @@ func (c *Elasticsearch) InitDefaults() { c.Timeout = 90 * time.Second c.MaxRetries = 3 c.MaxConnPerHost = 128 + c.MaxContentLength = 100 * 1024 * 1024 } // Validate ensures that the configuration is valid. diff --git a/internal/pkg/es/bulk_update_api_key.go b/internal/pkg/es/bulk_update_api_key.go new file mode 100644 index 0000000000..8ad66b1875 --- /dev/null +++ b/internal/pkg/es/bulk_update_api_key.go @@ -0,0 +1,110 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Code generated from specification version 7.x: DO NOT EDIT + +// This is a copy of api.search.go file from go-elasticsearch library +// It was modified for /_fleet/_fleet_search experimental API, +// implemented by the custom fleet plugin https://github.com/elastic/elasticsearch/pull/73134 +// This file can be removed and replaced with the official client library wrapper once it is available + +package es + +import ( + "context" + "io" + "net/http" + "strings" + + "github.com/elastic/go-elasticsearch/v7/esapi" +) + +const updateAPIKeyPath = "/_security/api_key/_bulk_update" +type UpdateApiKeyBulk func (o ...func(*UpdateApiKeyBulkRequest))(*Response,error) + +type UpdateApiKeyBulkRequest struct{ + Body io.Reader + + Header http.Header + + ctx context.Context +} + +// Do executes the request and returns response or error. +// +func (r UpdateApiKeyBulkRequest) Do(ctx context.Context, transport esapi.Transport) (*esapi.Response, error) { + var path strings.Builder + + path.Grow(len(updateAPIKeyPath)) + path.WriteString(updateAPIKeyPath) + + req, err := newRequest(http.MethodPost, path.String(), r.Body) + if err != nil { + return nil, err + } + + if r.Body != nil { + req.Header[headerContentType] = headerContentTypeJSON + } + + if len(r.Header) > 0 { + if len(req.Header) == 0 { + req.Header = r.Header + } else { + for k, vv := range r.Header { + for _, v := range vv { + req.Header.Add(k, v) + } + } + } + } + + if ctx != nil { + req = req.WithContext(ctx) + } + + res, err := transport.Perform(req) + if err != nil { + return nil, err + } + + response := esapi.Response{ + StatusCode: res.StatusCode, + Body: res.Body, + Header: res.Header, + } + + return &response, nil +} + + +// WithContext sets the request context. +// +func (f UpdateApiKeyBulkRequest) WithContext(v context.Context) func(*UpdateApiKeyBulkRequest) { + return func(r *UpdateApiKeyBulkRequest) { + r.ctx = v + } +} + +// WithBody - The search definition using the Query DSL. +// +func (f UpdateApiKeyBulkRequest) WithBody(v io.Reader) func(*UpdateApiKeyBulkRequest) { + return func(r *UpdateApiKeyBulkRequest) { + r.Body = v + } +} + + +// WithHeader adds the headers to the HTTP request. +// +func (f UpdateApiKeyBulkRequest) WithHeader(h map[string]string) func(*UpdateApiKeyBulkRequest) { + return func(r *UpdateApiKeyBulkRequest) { + if r.Header == nil { + r.Header = make(http.Header) + } + for k, v := range h { + r.Header.Add(k, v) + } + } +} \ No newline at end of file diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index fefd192d3a..e460bbd8a6 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -91,22 +91,78 @@ func (p *Output) prepareElasticsearch( // Note: This will need to be updated when doing multi-cluster elasticsearch support // Currently, we assume all ES outputs are the same ES fleet-server is connected to. - needNewKey := true + needNewKey := false + needUpdateKey := false switch { case output.APIKey == "": zlog.Debug().Msg("must generate api key as default API key is not present") + needNewKey = true case p.Role.Sha2 != output.PermissionsHash: // the is actually the OutputPermissionsHash for the default hash. The Agent // document on ES does not have OutputPermissionsHash for any other output // besides the default one. It seems to me error-prone to rely on the default // output permissions hash to generate new API keys for other outputs. zlog.Debug().Msg("must generate api key as policy output permissions changed") + needUpdateKey = true default: - needNewKey = false zlog.Debug().Msg("policy output permissions are the same") } - if needNewKey { + if needUpdateKey { + zlog.Debug(). + RawJSON("roles", p.Role.Raw). + Str("oldHash", output.PermissionsHash). + Str("newHash", p.Role.Sha2). + Msg("Generating a new API key") + + // query current api key for roles so we don't lose permissions in the meantime + currentRoles, err := fetchAPIKeyRoles(ctx, bulker, output.APIKeyID) + if err != nil { + zlog.Error(). + Str("apiKeyID", output.APIKeyID). + Err(err).Msg("fail fetching roles for key") + return err + } + + // merge roles with p.Role + newRoles, err := mergeRoles(zlog, currentRoles, p.Role) + if err != nil { + zlog.Error(). + Str("apiKeyID", output.APIKeyID). + Err(err).Msg("fail merging roles for key") + return err + } + + // hash provided is only for merging request together and not persisted + err = bulker.APIKeyUpdate(ctx, output.APIKeyID, newRoles.Sha2, newRoles.Raw) + if err != nil { + zlog.Error().Err(err).Msg("fail generate output key") + zlog.Debug().RawJSON("roles", newRoles.Raw).Str("sha", newRoles.Sha2).Err(err).Msg("roles not updated") + return err + } + + output.PermissionsHash = p.Role.Sha2 // for the sake of consistency + zlog.Debug(). + Str("hash.sha256", p.Role.Sha2). + Str("roles", string(p.Role.Raw)). + Msg("Updating agent record to pick up most recent roles.") + + fields := map[string]interface{}{ + dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, + } + + // Using painless script to update permission hash for updated key + body, err := renderUpdatePainlessScript(p.Name, fields) + if err != nil { + return err + } + + if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil { + zlog.Error().Err(err).Msg("fail update agent record") + return err + } + + } else if needNewKey { zlog.Debug(). RawJSON("fleet.policy.roles", p.Role.Raw). Str("fleet.policy.default.oldHash", output.PermissionsHash). @@ -181,6 +237,114 @@ func (p *Output) prepareElasticsearch( return nil } +func fetchAPIKeyRoles(ctx context.Context, b bulk.Bulk, apiKeyID string) (*RoleT, error) { + res, err := b.APIKeyRead(ctx, apiKeyID, true) + if err != nil { + return nil, err + } + + roleMap, err := smap.Parse(res.RoleDescriptors) + if err != nil { + return nil, err + } + r := &RoleT{ + Raw: res.RoleDescriptors, + } + + // Stable hash on permissions payload + if r.Sha2, err = roleMap.Hash(); err != nil { + return nil, err + } + + return r, nil +} + +// mergeRoles takes old and new role sets and merges them following these rules: +// - take all new roles +// - append all old roles +// to avoid name collisions every old entry has a `rdstale` suffix +// if rdstale suffix already exists it uses `{index}-rdstale` to avoid further collisions +// everything ending with `rdstale` is removed on ack. +// in case we have key `123` in both old and new result will be: {"123", "123-0-rdstale"} +// in case old contains {"123", "123-0-rdstale"} and new contains {"123"} result is: {"123", "123-rdstale", "123-0-rdstale"} +func mergeRoles(zlog zerolog.Logger, old, new *RoleT) (*RoleT, error) { + if old == nil { + return new, nil + } + if new == nil { + return old, nil + } + + oldMap, err := smap.Parse(old.Raw) + if err != nil { + return nil, err + } + if oldMap == nil { + return new, nil + } + + newMap, err := smap.Parse(new.Raw) + if err != nil { + return nil, err + } + if newMap == nil { + return old, nil + } + + destMap := smap.Map{} + // copy all from new + for k, v := range newMap { + destMap[k] = v + } + + findNewKey := func(m smap.Map, candidate string) string { + if strings.HasSuffix(candidate, "-rdstale") { + candidate = strings.TrimSuffix(candidate, "-rdstale") + dashIdx := strings.LastIndex(candidate, "-") + if dashIdx >= 0 { + candidate = candidate[:dashIdx] + } + + } + + // 1 should be enough, 100 is just to have some space + for i := 0; i < 100; i++ { + c := fmt.Sprintf("%s-%d-rdstale", candidate, i) + + if _, exists := m[c]; !exists { + return c + } + } + + return "" + } + // copy old + for k, v := range oldMap { + newKey := findNewKey(destMap, k) + if newKey == "" { + zlog.Warn().Msg("Failed to find a key for role assignement.") + + zlog.Debug(). + RawJSON("roles", new.Raw). + Str("candidate", k). + Msg("roles not included.") + + continue + } + destMap[newKey] = v + } + + r := &RoleT{} + if r.Sha2, err = destMap.Hash(); err != nil { + return nil, err + } + if r.Raw, err = json.Marshal(destMap); err != nil { + return nil, err + } + + return r, nil +} + func renderUpdatePainlessScript(outputName string, fields map[string]interface{}) ([]byte, error) { var source strings.Builder @@ -262,4 +426,4 @@ func setMapObj(obj map[string]interface{}, val interface{}, keys ...string) erro obj[k] = val return nil -} +} \ No newline at end of file diff --git a/internal/pkg/policy/policy_output_test.go b/internal/pkg/policy/policy_output_test.go index d66275d04b..9da306c357 100644 --- a/internal/pkg/policy/policy_output_test.go +++ b/internal/pkg/policy/policy_output_test.go @@ -145,20 +145,22 @@ func TestPolicyOutputESPrepare(t *testing.T) { bulker.AssertExpectations(t) }) - t.Run("Permission hash != Agent Permission Hash need to regenerate the key", func(t *testing.T) { + t.Run("Permission hash != Agent Permission Hash need to regenerate permissions", func(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() oldAPIKey := bulk.APIKey{ID: "test_id", Key: "EXISTING-KEY"} - wantAPIKey := bulk.APIKey{ID: "abc", Key: "new-key"} + wantAPIKey := bulk.APIKey{ID: "test_id", Key: "EXISTING-KEY"} hashPerm := "old-HASH" + bulker. + On("APIKeyRead", mock.Anything, mock.Anything, mock.Anything). + Return(&bulk.APIKeyMetadata{ID: "test_id", RoleDescriptors: TestPayload}, nil). + Once() bulker.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil).Once() - bulker.On("APIKeyCreate", - mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(&wantAPIKey, nil).Once() + bulker.On("APIKeyUpdate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() output := Output{ Type: OutputTypeElasticsearch, @@ -260,4 +262,4 @@ func TestPolicyOutputESPrepare(t *testing.T) { bulker.AssertExpectations(t) }) -} +} \ No newline at end of file diff --git a/internal/pkg/testing/bulk.go b/internal/pkg/testing/bulk.go index 1123232b79..724d540865 100644 --- a/internal/pkg/testing/bulk.go +++ b/internal/pkg/testing/bulk.go @@ -83,7 +83,7 @@ func (m *MockBulk) APIKeyCreate(ctx context.Context, name, ttl string, roles []b return args.Get(0).(*bulk.APIKey), args.Error(1) } -func (m *MockBulk) APIKeyRead(ctx context.Context, id string) (*bulk.APIKeyMetadata, error) { +func (m *MockBulk) APIKeyRead(ctx context.Context, id string, _ bool) (*bulk.APIKeyMetadata, error) { args := m.Called(ctx, id) return args.Get(0).(*bulk.APIKeyMetadata), args.Error(1) } @@ -98,4 +98,9 @@ func (m *MockBulk) APIKeyInvalidate(ctx context.Context, ids ...string) error { return args.Error(0) } +func (m *MockBulk) APIKeyUpdate(ctx context.Context, id, outputPolicyHash string, roles []byte) error { + args := m.Called(ctx, id) + return args.Error(0) +} + var _ bulk.Bulk = (*MockBulk)(nil)