Skip to content

Commit

Permalink
Fix and reintroduce "Allow multiple ES outputs as long as they are th…
Browse files Browse the repository at this point in the history
…e same ES" (#1879)

* Revert "Revert "Fix v8.5.0 migration painless script" (#1878)"
  This reverts commit ef9ca2b.

* Revert "Revert "Allow multiple ES outputs as long as they are the same ES (#1684)""
  This reverts commit bb696ac.

* avoid new API keys being marked for invalidation

Co-authored-by: Michal Pristas <michal.pristas@gmail.com>
  He fixed the merge conflicts after Bulk API Keys update (#1779), commit 46ac14b, got merged
  • Loading branch information
AndersonQ authored Sep 20, 2022
1 parent 46ac14b commit ca9041f
Show file tree
Hide file tree
Showing 32 changed files with 1,708 additions and 558 deletions.
14 changes: 9 additions & 5 deletions cmd/fleet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,17 +821,21 @@ func (f *FleetServer) runSubsystems(ctx context.Context, cfg *config.Config, g *
remoteVersion, err := ver.CheckCompatibility(ctx, esCli, f.bi.Version)
if err != nil {
if len(remoteVersion) != 0 {
return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w", f.bi.Version, remoteVersion, err)
return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w",
f.bi.Version, remoteVersion, err)
}
return fmt.Errorf("failed version compatibility check with elasticsearch: %w", err)
}

// Run migrations; current safe to do in background. That may change in the future.
g.Go(loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error {
// Run migrations
loggedMigration := loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error {
return dl.Migrate(ctx, bulker)
}))
})
if err = loggedMigration(); err != nil {
return fmt.Errorf("failed to run subsystems: %w", err)
}

// Run schduler for periodic GC/cleanup
// Run scheduler for periodic GC/cleanup
gcCfg := cfg.Inputs[0].Server.GC
sched, err := scheduler.New(gc.Schedules(bulker, gcCfg.ScheduleInterval, gcCfg.CleanupAfterExpiredInterval))
if err != nil {
Expand Down
111 changes: 65 additions & 46 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
"strings"
"time"

"github.com/julienschmidt/httprouter"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
Expand All @@ -25,11 +30,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
"github.com/elastic/fleet-server/v7/internal/pkg/smap"
"github.com/pkg/errors"

"github.com/julienschmidt/httprouter"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

type HTTPError struct {
Expand Down Expand Up @@ -338,8 +338,9 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
Int64("rev.coordinatorIdx", rev.CoordinatorIdx).
Msg("ack policy revision")

if ok && rev.PolicyID == agent.PolicyID && (rev.RevisionIdx > currRev ||
(rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) {
if ok && rev.PolicyID == agent.PolicyID &&
(rev.RevisionIdx > currRev ||
(rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) {
found = true
currRev = rev.RevisionIdx
currCoord = rev.CoordinatorIdx
Expand All @@ -350,69 +351,81 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
return nil
}

if agent.DefaultAPIKeyID != "" {
res, err := ack.bulk.APIKeyRead(ctx, agent.DefaultAPIKeyID, true)
for _, output := range agent.Outputs {
if output.Type != policy.OutputTypeElasticsearch {
continue
}

err := ack.updateAPIKey(ctx,
zlog,
agent.Id,
currRev, currCoord,
agent.PolicyID,
output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds)
if err != nil {
return err
}
}

return nil

}

func (ack *AckT) updateAPIKey(ctx context.Context,
zlog zerolog.Logger,
agentID string,
currRev, currCoord int64,
policyID, apiKeyID, permissionHash string,
toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems) error {

if apiKeyID != "" {
res, err := ack.bulk.APIKeyRead(ctx, apiKeyID, true)
if err != nil {
zlog.Error().
Err(err).
Str(LogAPIKeyID, agent.DefaultAPIKeyID).
Str(LogAPIKeyID, apiKeyID).
Msg("Failed to read API Key roles")
} else {
clean, removedRolesCount, err := cleanRoles(res.RoleDescriptors)
if err != nil {
zlog.Error().
Err(err).
RawJSON("roles", res.RoleDescriptors).
Str(LogAPIKeyID, agent.DefaultAPIKeyID).
Str(LogAPIKeyID, apiKeyID).
Msg("Failed to cleanup roles")
} else if removedRolesCount > 0 {
if err := ack.bulk.APIKeyUpdate(ctx, agent.DefaultAPIKeyID, agent.PolicyOutputPermissionsHash, clean); err != nil {
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, agent.DefaultAPIKeyID).Msg("Failed to update API Key")
if err := ack.bulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil {
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Msg("Failed to update API Key")
} else {
zlog.Debug().
Str("hash.sha256", agent.PolicyOutputPermissionsHash).
Str(LogAPIKeyID, agent.DefaultAPIKeyID).
Str("hash.sha256", permissionHash).
Str(LogAPIKeyID, apiKeyID).
RawJSON("roles", clean).
Int("removedRoles", removedRolesCount).
Msg("Updating agent record to pick up reduced roles.")
}
}
}
}

sz := len(agent.DefaultAPIKeyHistory)
if sz > 0 {
ids := make([]string, sz)
for i := 0; i < sz; i++ {
if agent.DefaultAPIKeyHistory[i].ID == agent.DefaultAPIKeyID {
// already updated
continue
}
ids[i] = agent.DefaultAPIKeyHistory[i].ID
}
log.Info().Strs("ids", ids).Msg("Invalidate old API keys")
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil {
log.Warn().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
}
ack.invalidateAPIKeys(ctx, toRetireAPIKeyIDs, apiKeyID)
}

body := makeUpdatePolicyBody(
agent.PolicyID,
policyID,
currRev,
currCoord,
)

err := ack.bulk.Update(
ctx,
dl.FleetAgents,
agent.Id,
agentID,
body,
bulk.WithRefresh(),
bulk.WithRetryOnConflict(3),
)

zlog.Err(err).
Str(LogPolicyID, agent.PolicyID).
Str(LogPolicyID, policyID).
Int64("policyRevision", currRev).
Int64("policyCoordinator", currCoord).
Msg("ack policy")
Expand Down Expand Up @@ -445,8 +458,25 @@ func cleanRoles(roles json.RawMessage) (json.RawMessage, int, error) {
return r, len(keys), errors.Wrap(err, "failed to marshal resulting role definition")
}

func (ack *AckT) invalidateAPIKeys(ctx context.Context, toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, skip string) {
ids := make([]string, 0, len(toRetireAPIKeyIDs))
for _, k := range toRetireAPIKeyIDs {
if k.ID == skip || k.ID == "" {
continue
}
ids = append(ids, k.ID)
}

if len(ids) > 0 {
log.Info().Strs("fleet.policy.apiKeyIDsToRetire", ids).Msg("Invalidate old API keys")
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil {
log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
}
}
}

func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error {
apiKeys := _getAPIKeyIDs(agent)
apiKeys := agent.APIKeyIDs()
if len(apiKeys) > 0 {
zlog = zlog.With().Strs(LogAPIKeyID, apiKeys).Logger()

Expand Down Expand Up @@ -501,17 +531,6 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *
return nil
}

func _getAPIKeyIDs(agent *model.Agent) []string {
keys := make([]string, 0, 1)
if agent.AccessAPIKeyID != "" {
keys = append(keys, agent.AccessAPIKeyID)
}
if agent.DefaultAPIKeyID != "" {
keys = append(keys, agent.DefaultAPIKeyID)
}
return keys
}

// Generate an update script that validates that the policy_id
// has not changed underneath us by an upstream process (Kibana or otherwise).
// We have a race condition where a user could have assigned a new policy to
Expand Down
55 changes: 54 additions & 1 deletion internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import (
"net/http"
"testing"

"github.com/google/go-cmp/cmp"

"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing"
testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log"
"github.com/google/go-cmp/cmp"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -439,3 +440,55 @@ func TestHandleAckEvents(t *testing.T) {
})
}
}

func TestInvalidateAPIKeys(t *testing.T) {
toRetire1 := []model.ToRetireAPIKeyIdsItems{{
ID: "toRetire1",
}}
toRetire2 := []model.ToRetireAPIKeyIdsItems{{
ID: "toRetire2_0",
}, {
ID: "toRetire2_1",
}}
var toRetire3 []model.ToRetireAPIKeyIdsItems

skips := map[string]string{
"1": "toRetire1",
"2": "toRetire2_0",
"3": "",
}
wants := map[string][]string{
"1": {},
"2": {"toRetire2_1"},
"3": {},
}

agent := model.Agent{
Outputs: map[string]*model.PolicyOutput{
"1": {ToRetireAPIKeyIds: toRetire1},
"2": {ToRetireAPIKeyIds: toRetire2},
"3": {ToRetireAPIKeyIds: toRetire3},
},
}

for i, out := range agent.Outputs {
skip := skips[i]
want := wants[i]

bulker := ftesting.NewMockBulk()
if len(want) > 0 {
bulker.On("APIKeyInvalidate",
context.Background(), mock.MatchedBy(func(ids []string) bool {
// if A contains B and B contains A => A = B
return assert.Subset(t, ids, want) &&
assert.Subset(t, want, ids)
})).
Return(nil)
}

ack := &AckT{bulk: bulker}
ack.invalidateAPIKeys(context.Background(), out.ToRetireAPIKeyIds, skip)

bulker.AssertExpectations(t)
}
}
15 changes: 7 additions & 8 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"reflect"
Expand Down Expand Up @@ -60,7 +61,6 @@ func (rt Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httpro
Logger()

err := rt.ct.handleCheckin(&zlog, w, r, id)

if err != nil {
cntCheckin.IncError(err)
resp := NewHTTPErrResp(err)
Expand Down Expand Up @@ -430,13 +430,13 @@ func convertActions(agentID string, actions []model.Action) ([]ActionResp, strin
//
func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agentID string, pp *policy.ParsedPolicy) (*ActionResp, error) {
zlog = zlog.With().
Str("ctx", "processPolicy").
Int64("policyRevision", pp.Policy.RevisionIdx).
Int64("policyCoordinator", pp.Policy.CoordinatorIdx).
Str("fleet.ctx", "processPolicy").
Int64("fleet.policyRevision", pp.Policy.RevisionIdx).
Int64("fleet.policyCoordinator", pp.Policy.CoordinatorIdx).
Str(LogPolicyID, pp.Policy.PolicyID).
Logger()

// Repull and decode the agent object. Do not trust the cache.
// Repull and decode the agent object. Do not trust the cache.
agent, err := dl.FindAgent(ctx, bulker, dl.QueryAgentByID, dl.FieldID, agentID)
if err != nil {
zlog.Error().Err(err).Msg("fail find agent record")
Expand All @@ -446,7 +446,6 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
// Parse the outputs maps in order to prepare the outputs
const outputsProperty = "outputs"
outputs, err := smap.Parse(pp.Fields[outputsProperty])

if err != nil {
return nil, err
}
Expand All @@ -458,9 +457,9 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
// Iterate through the policy outputs and prepare them
for _, policyOutput := range pp.Outputs {
err = policyOutput.Prepare(ctx, zlog, bulker, &agent, outputs)

if err != nil {
return nil, err
return nil, fmt.Errorf("failed to prepare output %q: %w",
policyOutput.Name, err)
}
}

Expand Down
11 changes: 8 additions & 3 deletions internal/pkg/api/handleEnroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type EnrollerT struct {
}

func NewEnrollerT(verCon version.Constraints, cfg *config.Server, bulker bulk.Bulk, c cache.Cache) (*EnrollerT, error) {

log.Info().
Interface("limits", cfg.Limits.EnrollLimit).
Msg("Setting config enroll_limit")
Expand Down Expand Up @@ -187,7 +186,13 @@ func (et *EnrollerT) processRequest(rb *rollback.Rollback, zlog zerolog.Logger,
return et._enroll(r.Context(), rb, zlog, req, erec.PolicyID, ver)
}

func (et *EnrollerT) _enroll(ctx context.Context, rb *rollback.Rollback, zlog zerolog.Logger, req *EnrollRequest, policyID, ver string) (*EnrollResponse, error) {
func (et *EnrollerT) _enroll(
ctx context.Context,
rb *rollback.Rollback,
zlog zerolog.Logger,
req *EnrollRequest,
policyID,
ver string) (*EnrollResponse, error) {

if req.SharedID != "" {
// TODO: Support pre-existing install
Expand Down Expand Up @@ -427,7 +432,7 @@ func generateAccessAPIKey(ctx context.Context, bulk bulk.Bulk, agentID string) (
agentID,
"",
[]byte(kFleetAccessRolesJSON),
apikey.NewMetadata(agentID, apikey.TypeAccess),
apikey.NewMetadata(agentID, "", apikey.TypeAccess),
)
}

Expand Down
Loading

0 comments on commit ca9041f

Please sign in to comment.