From 59967f730064399346618ff1e0836db072b42458 Mon Sep 17 00:00:00 2001 From: Michel Laterman <82832767+michel-laterman@users.noreply.github.com> Date: Thu, 29 Sep 2022 11:24:43 -0700 Subject: [PATCH] Cleanup cmd/fleet/main.go (#1886) * Replace cache.Config with config.Cache * Move server setup from cmd/fleet to new pkg/server * Move constants * Fix imports and integration tests * fix linter --- cmd/fleet/main.go | 880 +-------------------- cmd/fleet/main_integration_test.go | 3 +- cmd/fleet/server_integration_test.go | 5 +- internal/pkg/api/handleAck_test.go | 2 +- internal/pkg/api/handleStatus_test.go | 2 +- internal/pkg/api/router_test.go | 2 +- internal/pkg/bulk/bulk_integration_test.go | 2 +- internal/pkg/cache/cache.go | 30 +- internal/pkg/cache/impl_integration.go | 4 +- internal/pkg/cache/impl_ristretto.go | 4 +- internal/pkg/config/cache.go | 27 + internal/pkg/config/config.go | 62 ++ internal/pkg/policy/parsed_policy_test.go | 7 +- internal/pkg/server/agent.go | 247 ++++++ internal/pkg/server/fleet.go | 587 ++++++++++++++ internal/pkg/server/server.go | 15 + 16 files changed, 969 insertions(+), 910 deletions(-) create mode 100644 internal/pkg/server/agent.go create mode 100644 internal/pkg/server/fleet.go create mode 100644 internal/pkg/server/server.go diff --git a/cmd/fleet/main.go b/cmd/fleet/main.go index e61113820..ddd6c03ae 100644 --- a/cmd/fleet/main.go +++ b/cmd/fleet/main.go @@ -8,58 +8,25 @@ package fleet import ( "context" "errors" - "fmt" - "io" - "net/url" "os" - "reflect" - "runtime/debug" - "sync" - "time" "go.elastic.co/apm" - apmtransport "go.elastic.co/apm/transport" - "github.com/elastic/go-ucfg" "github.com/elastic/go-ucfg/yaml" - "github.com/elastic/fleet-server/v7/internal/pkg/action" - "github.com/elastic/fleet-server/v7/internal/pkg/api" "github.com/elastic/fleet-server/v7/internal/pkg/build" - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" - "github.com/elastic/fleet-server/v7/internal/pkg/cache" - "github.com/elastic/fleet-server/v7/internal/pkg/checkin" "github.com/elastic/fleet-server/v7/internal/pkg/config" - "github.com/elastic/fleet-server/v7/internal/pkg/coordinator" - "github.com/elastic/fleet-server/v7/internal/pkg/dl" - "github.com/elastic/fleet-server/v7/internal/pkg/es" - "github.com/elastic/fleet-server/v7/internal/pkg/gc" "github.com/elastic/fleet-server/v7/internal/pkg/logger" - "github.com/elastic/fleet-server/v7/internal/pkg/monitor" - "github.com/elastic/fleet-server/v7/internal/pkg/policy" - "github.com/elastic/fleet-server/v7/internal/pkg/profile" - "github.com/elastic/fleet-server/v7/internal/pkg/reload" - "github.com/elastic/fleet-server/v7/internal/pkg/scheduler" + "github.com/elastic/fleet-server/v7/internal/pkg/server" "github.com/elastic/fleet-server/v7/internal/pkg/signal" - "github.com/elastic/fleet-server/v7/internal/pkg/sleep" "github.com/elastic/fleet-server/v7/internal/pkg/status" - "github.com/elastic/fleet-server/v7/internal/pkg/ver" - "github.com/hashicorp/go-version" - "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/cobra" - "golang.org/x/sync/errgroup" - - "github.com/elastic/elastic-agent-client/v7/pkg/client" - "github.com/elastic/elastic-agent-client/v7/pkg/proto" ) const ( - kAgentMode = "agent-mode" - kAgentModeRestartLoopDelay = 2 * time.Second - - kUAFleetServer = "Fleet-Server" + kAgentMode = "agent-mode" ) func init() { @@ -72,26 +39,6 @@ func installSignalHandler() context.Context { return signal.HandleInterrupt(rootCtx) } -func makeCache(cfg *config.Config) (cache.Cache, error) { - cacheCfg := makeCacheConfig(cfg) - log.Info().Interface("cfg", cacheCfg).Msg("Setting cache config options") - return cache.New(cacheCfg) -} - -func makeCacheConfig(cfg *config.Config) cache.Config { - ccfg := cfg.Inputs[0].Cache - - return cache.Config{ - NumCounters: ccfg.NumCounters, - MaxCost: ccfg.MaxCost, - ActionTTL: ccfg.ActionTTL, - EnrollKeyTTL: ccfg.EnrollKeyTTL, - ArtifactTTL: ccfg.ArtifactTTL, - APIKeyTTL: ccfg.APIKeyTTL, - APIKeyJitter: ccfg.APIKeyJitter, - } -} - func initLogger(cfg *config.Config, version, commit string) (*logger.Logger, error) { l, err := logger.Init(cfg, build.ServiceName) if err != nil { @@ -122,7 +69,7 @@ func getRunCommand(bi build.Info) func(cmd *cobra.Command, args []string) error } var l *logger.Logger - var runErr error + var srv server.Server if agentMode { cfg, err := config.FromConfig(cliCfg) if err != nil { @@ -133,12 +80,10 @@ func getRunCommand(bi build.Info) func(cmd *cobra.Command, args []string) error return err } - agent, err := NewAgentMode(cliCfg, os.Stdin, bi, l) + srv, err = server.NewAgent(cliCfg, os.Stdin, bi, l) if err != nil { return err } - - runErr = agent.Run(installSignalHandler()) } else { cfgPath, err := cmd.Flags().GetString("config") if err != nil { @@ -162,18 +107,16 @@ func getRunCommand(bi build.Info) func(cmd *cobra.Command, args []string) error return err } - srv, err := NewFleetServer(cfg, bi, status.NewLog()) + srv, err = server.NewFleet(cfg, bi, status.NewLog()) if err != nil { return err } - - runErr = srv.Run(installSignalHandler()) } - if runErr != nil && !errors.Is(runErr, context.Canceled) { - log.Error().Err(runErr).Msg("Exiting") + if err := srv.Run(installSignalHandler()); err != nil && !errors.Is(err, context.Canceled) { + log.Error().Err(err).Msg("Exiting") l.Sync() - return runErr + return err } l.Sync() return nil @@ -191,810 +134,3 @@ func NewCommand(bi build.Info) *cobra.Command { cmd.Flags().VarP(config.NewFlag(), "E", "E", "Overwrite configuration value") return cmd } - -type firstCfg struct { - cfg *config.Config - err error -} - -type AgentMode struct { - cliCfg *ucfg.Config - bi build.Info - reloadables []reload.Reloadable - - agent client.Client - - mux sync.Mutex - firstCfg chan firstCfg - srv *FleetServer - srvCtx context.Context - srvCanceller context.CancelFunc - startChan chan struct{} -} - -func NewAgentMode(cliCfg *ucfg.Config, reader io.Reader, bi build.Info, reloadables ...reload.Reloadable) (*AgentMode, error) { - var err error - - a := &AgentMode{ - cliCfg: cliCfg, - bi: bi, - reloadables: reloadables, - } - a.agent, err = client.NewFromReader(reader, a) - if err != nil { - return nil, err - } - return a, nil -} - -func (a *AgentMode) Run(ctx context.Context) error { - ctx, canceller := context.WithCancel(ctx) - defer canceller() - - a.firstCfg = make(chan firstCfg) - a.startChan = make(chan struct{}, 1) - log.Info().Msg("starting communication connection back to Elastic Agent") - err := a.agent.Start(ctx) - if err != nil { - return err - } - - // wait for the initial configuration to be sent from the - // Elastic Agent before starting the actual Fleet Server. - log.Info().Msg("waiting for Elastic Agent to send initial configuration") - var cfg firstCfg - select { - case <-ctx.Done(): - return fmt.Errorf("never received initial configuration: %w", ctx.Err()) - case cfg = <-a.firstCfg: - } - - // possible that first configuration resulted in an error - if cfg.err != nil { - // unblock startChan even though there was an error - a.startChan <- struct{}{} - return cfg.err - } - - // start fleet server with the initial configuration and its - // own context (needed so when OnStop occurs the fleet server - // is stopped and not the elastic-agent-client as well) - srvCtx, srvCancel := context.WithCancel(ctx) - defer srvCancel() - log.Info().Msg("received initial configuration starting Fleet Server") - srv, err := NewFleetServer(cfg.cfg, a.bi, status.NewChained(status.NewLog(), a.agent)) - if err != nil { - // unblock startChan even though there was an error - a.startChan <- struct{}{} - return err - } - a.mux.Lock() - close(a.firstCfg) - a.firstCfg = nil - a.srv = srv - a.srvCtx = srvCtx - a.srvCanceller = srvCancel - a.mux.Unlock() - - // trigger startChan so OnConfig can continue - a.startChan <- struct{}{} - - // keep trying to restart the FleetServer on failure, reporting - // the status back to Elastic Agent - res := make(chan error) - go func() { - for { - err := a.srv.Run(srvCtx) - if err == nil || errors.Is(err, context.Canceled) { - res <- err - return - } - // sleep some before calling Run again - _ = sleep.WithContext(srvCtx, kAgentModeRestartLoopDelay) - } - }() - return <-res -} - -func (a *AgentMode) OnConfig(s string) { - a.mux.Lock() - cliCfg := ucfg.MustNewFrom(a.cliCfg, config.DefaultOptions...) - srv := a.srv - ctx := a.srvCtx - canceller := a.srvCanceller - cfgChan := a.firstCfg - startChan := a.startChan - a.mux.Unlock() - - var cfg *config.Config - var err error - defer func() { - if err != nil { - if cfgChan != nil { - // failure on first config - cfgChan <- firstCfg{ - cfg: nil, - err: err, - } - // block until startChan signalled - <-startChan - return - } - - log.Err(err).Msg("failed to reload configuration") - if canceller != nil { - canceller() - } - } - }() - - // load configuration and then merge it on top of the CLI configuration - var cfgData *ucfg.Config - cfgData, err = yaml.NewConfig([]byte(s), config.DefaultOptions...) - if err != nil { - return - } - err = cliCfg.Merge(cfgData, config.DefaultOptions...) - if err != nil { - return - } - cfg, err = config.FromConfig(cliCfg) - if err != nil { - return - } - - if cfgChan != nil { - // reload the generic reloadables - for _, r := range a.reloadables { - err = r.Reload(ctx, cfg) - if err != nil { - return - } - } - - // send starting configuration so Fleet Server can start - cfgChan <- firstCfg{ - cfg: cfg, - err: nil, - } - - // block handling more OnConfig calls until the Fleet Server - // has been fully started - <-startChan - } else if srv != nil { - // reload the generic reloadables - for _, r := range a.reloadables { - err = r.Reload(ctx, cfg) - if err != nil { - return - } - } - - // reload the server - err = srv.Reload(ctx, cfg) - if err != nil { - return - } - } else { - err = fmt.Errorf("internal service should have been started") - return - } -} - -func (a *AgentMode) OnStop() { - a.mux.Lock() - canceller := a.srvCanceller - a.mux.Unlock() - - if canceller != nil { - canceller() - } -} - -func (a *AgentMode) OnError(err error) { - // Log communication error through the logger. These errors are only - // provided for logging purposes. The elastic-agent-client handles - // retries and reconnects internally automatically. - log.Err(err) -} - -type FleetServer struct { - bi build.Info - verCon version.Constraints - - cfg *config.Config - cfgCh chan *config.Config - cache cache.Cache - reporter status.Reporter -} - -// NewFleetServer creates the actual fleet server service. -func NewFleetServer(cfg *config.Config, bi build.Info, reporter status.Reporter) (*FleetServer, error) { - verCon, err := api.BuildVersionConstraint(bi.Version) - if err != nil { - return nil, err - } - - err = cfg.LoadServerLimits() - if err != nil { - return nil, fmt.Errorf("encountered error while loading server limits: %w", err) - } - cache, err := makeCache(cfg) - if err != nil { - return nil, err - } - - return &FleetServer{ - bi: bi, - verCon: verCon, - cfg: cfg, - cfgCh: make(chan *config.Config, 1), - cache: cache, - reporter: reporter, - }, nil -} - -type runFunc func(context.Context) error - -// Run runs the fleet server -func (f *FleetServer) Run(ctx context.Context) error { - var curCfg *config.Config - newCfg := f.cfg - - // Replace context with cancellable ctx - // in order to automatically cancel all the go routines - // that were started in the scope of this function on function exit - ctx, cn := context.WithCancel(ctx) - defer cn() - - stop := func(cn context.CancelFunc, g *errgroup.Group) { - if cn != nil { - cn() - } - if g != nil { - err := g.Wait() - if err != nil { - log.Error().Err(err).Msg("error encountered while stopping server") - } - } - } - - start := func(ctx context.Context, runfn runFunc, ech chan<- error) (*errgroup.Group, context.CancelFunc) { - ctx, cn = context.WithCancel(ctx) - g, ctx := errgroup.WithContext(ctx) - - g.Go(func() error { - err := runfn(ctx) - if err != nil { - ech <- err - } - return err - }) - return g, cn - } - - var ( - proCancel, srvCancel context.CancelFunc - proEg, srvEg *errgroup.Group - ) - - started := false - -LOOP: - for { - ech := make(chan error, 2) - if started { - f.reporter.Status(proto.StateObserved_CONFIGURING, "Re-configuring", nil) //nolint:errcheck // unclear on what should we do if updating the status fails? - } else { - started = true - f.reporter.Status(proto.StateObserved_STARTING, "Starting", nil) //nolint:errcheck // unclear on what should we do if updating the status fails? - } - - err := newCfg.LoadServerLimits() - if err != nil { - return fmt.Errorf("encountered error while loading server limits: %w", err) - } - - // Create or recreate cache - if configCacheChanged(curCfg, newCfg) { - log.Info().Msg("reconfigure cache on configuration change") - cacheCfg := makeCacheConfig(newCfg) - err := f.cache.Reconfigure(cacheCfg) - log.Info().Err(err).Interface("cfg", cacheCfg).Msg("reconfigure cache complete") - if err != nil { - return err - } - } - - // Start or restart profiler - if configChangedProfiler(curCfg, newCfg) { - if proCancel != nil { - log.Info().Msg("stopping profiler on configuration change") - stop(proCancel, proEg) - } - proEg, proCancel = nil, nil - if newCfg.Inputs[0].Server.Profiler.Enabled { - log.Info().Msg("starting profiler on configuration change") - proEg, proCancel = start(ctx, func(ctx context.Context) error { - return profile.RunProfiler(ctx, newCfg.Inputs[0].Server.Profiler.Bind) - }, ech) - } - } - - // Start or restart server - if configChangedServer(curCfg, newCfg) { - if srvCancel != nil { - log.Info().Msg("stopping server on configuration change") - stop(srvCancel, srvEg) - } - log.Info().Msg("starting server on configuration change") - srvEg, srvCancel = start(ctx, func(ctx context.Context) error { - return f.runServer(ctx, newCfg) - }, ech) - } - - curCfg = newCfg - f.cfg = curCfg - - select { - case newCfg = <-f.cfgCh: - log.Info().Msg("Server configuration update") - case err := <-ech: - f.reporter.Status(proto.StateObserved_FAILED, fmt.Sprintf("Error - %s", err), nil) //nolint:errcheck // unclear on what should we do if updating the status fails? - log.Error().Err(err).Msg("Fleet Server failed") - return err - case <-ctx.Done(): - f.reporter.Status(proto.StateObserved_STOPPING, "Stopping", nil) //nolint:errcheck // unclear on what should we do if updating the status fails? - break LOOP - } - } - - // Server is coming down; wait for the server group to exit cleanly. - // Timeout if something is locked up. - err := safeWait(srvEg, time.Second) - - // Eat cancel error to minimize confusion in logs - if errors.Is(err, context.Canceled) { - err = nil - } - - log.Info().Err(err).Msg("Fleet Server exited") - return err -} - -func configChangedProfiler(curCfg, newCfg *config.Config) bool { - - changed := true - - switch { - case curCfg == nil: - case curCfg.Inputs[0].Server.Profiler.Enabled != newCfg.Inputs[0].Server.Profiler.Enabled: - case curCfg.Inputs[0].Server.Profiler.Bind != newCfg.Inputs[0].Server.Profiler.Bind: - default: - changed = false - } - - return changed -} - -func redactOutputCfg(cfg *config.Config) config.Output { - const kRedacted = "[redacted]" - redacted := cfg.Output - - if redacted.Elasticsearch.APIKey != "" { - redacted.Elasticsearch.APIKey = kRedacted - } - - if redacted.Elasticsearch.ServiceToken != "" { - redacted.Elasticsearch.ServiceToken = kRedacted - } - - if redacted.Elasticsearch.TLS != nil { - newTLS := *redacted.Elasticsearch.TLS - - if newTLS.Certificate.Key != "" { - newTLS.Certificate.Key = kRedacted - } - if newTLS.Certificate.Passphrase != "" { - newTLS.Certificate.Passphrase = kRedacted - } - - redacted.Elasticsearch.TLS = &newTLS - } - - return redacted -} - -func redactServerCfg(cfg *config.Config) config.Server { - const kRedacted = "[redacted]" - redacted := cfg.Inputs[0].Server - - if redacted.TLS != nil { - newTLS := *redacted.TLS - - if newTLS.Certificate.Key != "" { - newTLS.Certificate.Key = kRedacted - } - if newTLS.Certificate.Passphrase != "" { - newTLS.Certificate.Passphrase = kRedacted - } - - redacted.TLS = &newTLS - } - - return redacted -} - -func redactConfig(cfg *config.Config) *config.Config { - redacted := &config.Config{ - Fleet: cfg.Fleet, - Output: cfg.Output, - Inputs: make([]config.Input, 1), - Logging: cfg.Logging, - HTTP: cfg.HTTP, - } - redacted.Inputs[0].Server = redactServerCfg(cfg) - redacted.Output = redactOutputCfg(cfg) - return redacted -} - -func configChangedServer(curCfg, newCfg *config.Config) bool { - - zlog := log.With().Interface("new", redactConfig(newCfg)).Logger() - - changed := true - switch { - case curCfg == nil: - zlog.Info().Msg("initial server configuration") - case !reflect.DeepEqual(curCfg.Fleet, newCfg.Fleet): - zlog.Info(). - Interface("old", redactConfig(curCfg)). - Msg("fleet configuration has changed") - case !reflect.DeepEqual(curCfg.Output, newCfg.Output): - zlog.Info(). - Interface("old", redactConfig(curCfg)). - Msg("output configuration has changed") - case !reflect.DeepEqual(curCfg.Inputs[0].Server, newCfg.Inputs[0].Server): - zlog.Info(). - Interface("old", redactConfig(curCfg)). - Msg("server configuration has changed") - default: - changed = false - } - - return changed -} - -func configCacheChanged(curCfg, newCfg *config.Config) bool { - if curCfg == nil { - return false - } - return curCfg.Inputs[0].Cache != newCfg.Inputs[0].Cache -} - -func safeWait(g *errgroup.Group, to time.Duration) error { - var err error - waitCh := make(chan error) - go func() { - waitCh <- g.Wait() - }() - - select { - case err = <-waitCh: - case <-time.After(to): - log.Warn().Msg("deadlock: goroutine locked up on errgroup.Wait()") - err = errors.New("group wait timeout") - } - - return err -} - -func loggedRunFunc(ctx context.Context, tag string, runfn runFunc) func() error { - return func() error { - - log.Debug().Msg(tag + " started") - - err := runfn(ctx) - - lvl := zerolog.DebugLevel - switch { - case err == nil: - case errors.Is(err, context.Canceled): - err = nil - default: - lvl = zerolog.ErrorLevel - } - - log.WithLevel(lvl).Err(err).Msg(tag + " exited") - return err - } -} - -func initRuntime(cfg *config.Config) { - gcPercent := cfg.Inputs[0].Server.Runtime.GCPercent - if gcPercent != 0 { - old := debug.SetGCPercent(gcPercent) - - log.Info(). - Int("old", old). - Int("new", gcPercent). - Msg("SetGCPercent") - } -} - -func (f *FleetServer) initBulker(ctx context.Context, tracer *apm.Tracer, cfg *config.Config) (*bulk.Bulker, error) { - es, err := es.NewClient(ctx, cfg, false, elasticsearchOptions( - cfg.Inputs[0].Server.Instrumentation.Enabled, f.bi, - )...) - if err != nil { - return nil, err - } - - blk := bulk.NewBulker(es, tracer, bulk.BulkOptsFromCfg(cfg)...) - return blk, nil -} - -func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err error) { - initRuntime(cfg) - - // The metricsServer is only enabled if http.enabled is set in the config - metricsServer, err := api.InitMetrics(ctx, cfg, f.bi) - switch { - case err != nil: - return err - case metricsServer != nil: - defer func() { - _ = metricsServer.Stop() - }() - } - - // Bulker is started in its own context and managed in the scope of this function. This is done so - // when the `ctx` is cancelled, the bulker will remain executing until this function exits. - // This allows the child subsystems to continue to write to the data store while tearing down. - bulkCtx, bulkCancel := context.WithCancel(context.Background()) - defer bulkCancel() - - // Create the APM tracer. - tracer, err := f.initTracer(cfg.Inputs[0].Server.Instrumentation) - if err != nil { - return err - } - - // Create the bulker subsystem - bulker, err := f.initBulker(bulkCtx, tracer, cfg) - if err != nil { - return err - } - - // Execute the bulker engine in a goroutine with its orphaned context. - // Create an error channel for the case where the bulker exits - // unexpectedly (ie. not cancelled by the bulkCancel context). - errCh := make(chan error) - - go func() { - runFunc := loggedRunFunc(bulkCtx, "Bulker", bulker.Run) - - // Emit the error from bulker.Run to the local error channel. - // The error group will be listening for it. (see comments below) - errCh <- runFunc() - }() - - // Wrap context with an error group context to manage the lifecycle - // of the subsystems. An error from any subsystem, or if the - // parent context is cancelled, will cancel the group. - // see https://pkg.go.dev/golang.org/x/sync/errgroup#Group.Go - g, ctx := errgroup.WithContext(ctx) - - // Stub a function for inclusion in the errgroup that exits when - // the bulker exits. If the bulker exits before the error group, - // this will tear down the error group and g.Wait() will return. - // Otherwise it will be a noop. - g.Go(func() (err error) { - select { - case err = <-errCh: - case <-ctx.Done(): - err = ctx.Err() - } - return - }) - - if tracer != nil { - go func() { - <-ctx.Done() - log.Info().Msg("flushing instrumentation tracer...") - tracer.Flush(nil) - tracer.Close() - }() - } - - if err = f.runSubsystems(ctx, cfg, g, bulker, tracer); err != nil { - return err - } - - return g.Wait() -} - -func (f *FleetServer) runSubsystems(ctx context.Context, cfg *config.Config, g *errgroup.Group, bulker bulk.Bulk, tracer *apm.Tracer) (err error) { - esCli := bulker.Client() - - // Check version compatibility with Elasticsearch - remoteVersion, err := ver.CheckCompatibility(ctx, esCli, f.bi.Version) - if err != nil { - if len(remoteVersion) != 0 { - return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w", - f.bi.Version, remoteVersion, err) - } - return fmt.Errorf("failed version compatibility check with elasticsearch: %w", err) - } - - // Run migrations - loggedMigration := loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error { - return dl.Migrate(ctx, bulker) - }) - if err = loggedMigration(); err != nil { - return fmt.Errorf("failed to run subsystems: %w", err) - } - - // Run scheduler for periodic GC/cleanup - gcCfg := cfg.Inputs[0].Server.GC - sched, err := scheduler.New(gc.Schedules(bulker, gcCfg.ScheduleInterval, gcCfg.CleanupAfterExpiredInterval)) - if err != nil { - return fmt.Errorf("failed to create elasticsearch GC: %w", err) - } - g.Go(loggedRunFunc(ctx, "Elasticsearch GC", sched.Run)) - - // Monitoring es client, longer timeout, no retries - monCli, err := es.NewClient(ctx, cfg, true, elasticsearchOptions( - cfg.Inputs[0].Server.Instrumentation.Enabled, f.bi, - )...) - if err != nil { - return err - } - - // Coordinator policy monitor - pim, err := monitor.New(dl.FleetPolicies, esCli, monCli, - monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize), - monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout), - ) - if err != nil { - return err - } - - g.Go(loggedRunFunc(ctx, "Policy index monitor", pim.Run)) - cord := coordinator.NewMonitor(cfg.Fleet, f.bi.Version, bulker, pim, coordinator.NewCoordinatorZero) - g.Go(loggedRunFunc(ctx, "Coordinator policy monitor", cord.Run)) - - // Policy monitor - pm := policy.NewMonitor(bulker, pim, cfg.Inputs[0].Server.Limits.PolicyThrottle) - g.Go(loggedRunFunc(ctx, "Policy monitor", pm.Run)) - - // Policy self monitor - sm := policy.NewSelfMonitor(cfg.Fleet, bulker, pim, cfg.Inputs[0].Policy.ID, f.reporter) - g.Go(loggedRunFunc(ctx, "Policy self monitor", sm.Run)) - - // Actions monitoring - var am monitor.SimpleMonitor - var ad *action.Dispatcher - var tr *action.TokenResolver - - am, err = monitor.NewSimple(dl.FleetActions, esCli, monCli, - monitor.WithExpiration(true), - monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize), - monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout), - ) - if err != nil { - return err - } - g.Go(loggedRunFunc(ctx, "Revision monitor", am.Run)) - - ad = action.NewDispatcher(am) - g.Go(loggedRunFunc(ctx, "Revision dispatcher", ad.Run)) - tr, err = action.NewTokenResolver(bulker) - if err != nil { - return err - } - - bc := checkin.NewBulk(bulker) - g.Go(loggedRunFunc(ctx, "Bulk checkin", bc.Run)) - - ct := api.NewCheckinT(f.verCon, &cfg.Inputs[0].Server, f.cache, bc, pm, am, ad, tr, bulker) - et, err := api.NewEnrollerT(f.verCon, &cfg.Inputs[0].Server, bulker, f.cache) - if err != nil { - return err - } - - at := api.NewArtifactT(&cfg.Inputs[0].Server, bulker, f.cache) - ack := api.NewAckT(&cfg.Inputs[0].Server, bulker, f.cache) - st := api.NewStatusT(&cfg.Inputs[0].Server, bulker, f.cache) - - router := api.NewRouter(&cfg.Inputs[0].Server, bulker, ct, et, at, ack, st, sm, tracer, f.bi) - - g.Go(loggedRunFunc(ctx, "Http server", func(ctx context.Context) error { - return router.Run(ctx) - })) - - return err -} - -// Reload reloads the fleet server with the latest configuration. -func (f *FleetServer) Reload(ctx context.Context, cfg *config.Config) error { - select { - case f.cfgCh <- cfg: - case <-ctx.Done(): - } - return nil -} - -func (f *FleetServer) initTracer(cfg config.Instrumentation) (*apm.Tracer, error) { - if !cfg.Enabled { - return nil, nil - } - - log.Info().Msg("fleet-server instrumentation is enabled") - - // TODO(marclop): Ideally, we'd use apmtransport.NewHTTPTransportOptions() - // but it doesn't exist today. Update this code once we have something - // available via the APM Go agent. - const ( - envVerifyServerCert = "ELASTIC_APM_VERIFY_SERVER_CERT" - envServerCert = "ELASTIC_APM_SERVER_CERT" - envCACert = "ELASTIC_APM_SERVER_CA_CERT_FILE" - envGlobalLabels = "ELASTIC_APM_GLOBAL_LABELS" - envTransactionSampleRate = "ELASTIC_APM_TRANSACTION_SAMPLE_RATE" - ) - if cfg.TLS.SkipVerify { - os.Setenv(envVerifyServerCert, "false") - defer os.Unsetenv(envVerifyServerCert) - } - if cfg.TLS.ServerCertificate != "" { - os.Setenv(envServerCert, cfg.TLS.ServerCertificate) - defer os.Unsetenv(envServerCert) - } - if cfg.TLS.ServerCA != "" { - os.Setenv(envCACert, cfg.TLS.ServerCA) - defer os.Unsetenv(envCACert) - } - if cfg.GlobalLabels != "" { - os.Setenv(envGlobalLabels, cfg.GlobalLabels) - defer os.Unsetenv(envGlobalLabels) - } - if cfg.TransactionSampleRate != "" { - os.Setenv(envTransactionSampleRate, cfg.TransactionSampleRate) - defer os.Unsetenv(envTransactionSampleRate) - } - transport, err := apmtransport.NewHTTPTransport() - if err != nil { - return nil, err - } - - if len(cfg.Hosts) > 0 { - hosts := make([]*url.URL, 0, len(cfg.Hosts)) - for _, host := range cfg.Hosts { - u, err := url.Parse(host) - if err != nil { - return nil, fmt.Errorf("failed parsing %s: %w", host, err) - } - hosts = append(hosts, u) - } - transport.SetServerURL(hosts...) - } - if cfg.APIKey != "" { - transport.SetAPIKey(cfg.APIKey) - } else { - transport.SetSecretToken(cfg.SecretToken) - } - return apm.NewTracerOptions(apm.TracerOptions{ - ServiceName: "fleet-server", - ServiceVersion: f.bi.Version, - ServiceEnvironment: cfg.Environment, - Transport: transport, - }) -} - -func elasticsearchOptions(instumented bool, bi build.Info) []es.ConfigOption { - options := []es.ConfigOption{es.WithUserAgent(kUAFleetServer, bi)} - if instumented { - options = append(options, es.InstrumentRoundTripper()) - } - return options -} diff --git a/cmd/fleet/main_integration_test.go b/cmd/fleet/main_integration_test.go index d0f842536..914dfb27b 100644 --- a/cmd/fleet/main_integration_test.go +++ b/cmd/fleet/main_integration_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/build" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/model" + fserver "github.com/elastic/fleet-server/v7/internal/pkg/server" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" "github.com/elastic/fleet-server/v7/internal/pkg/testing/suite" ) @@ -117,7 +118,7 @@ func (s *agentSuite) TestAgentMode(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - agent, err := NewAgentMode(ucfg.New(), r, biInfo) + agent, err := fserver.NewAgent(ucfg.New(), r, biInfo) require.NoError(t, err) err = agent.Run(ctx) assert.NoError(t, err) diff --git a/cmd/fleet/server_integration_test.go b/cmd/fleet/server_integration_test.go index 76c279009..31398213b 100644 --- a/cmd/fleet/server_integration_test.go +++ b/cmd/fleet/server_integration_test.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/build" "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/logger" + "github.com/elastic/fleet-server/v7/internal/pkg/server" "github.com/elastic/fleet-server/v7/internal/pkg/sleep" "github.com/elastic/fleet-server/v7/internal/pkg/status" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" @@ -45,7 +46,7 @@ const ( type tserver struct { cfg *config.Config g *errgroup.Group - srv *FleetServer + srv *server.Fleet } func (s *tserver) baseURL() string { @@ -82,7 +83,7 @@ func startTestServer(ctx context.Context) (*tserver, error) { cfg.Inputs[0].Server = *srvcfg log.Info().Uint16("port", port).Msg("Test fleet server") - srv, err := NewFleetServer(cfg, build.Info{Version: serverVersion}, status.NewLog()) + srv, err := server.NewFleet(cfg, build.Info{Version: serverVersion}, status.NewLog()) if err != nil { return nil, fmt.Errorf("unable to create server: %w", err) } diff --git a/internal/pkg/api/handleAck_test.go b/internal/pkg/api/handleAck_test.go index 29c678c24..076068535 100644 --- a/internal/pkg/api/handleAck_test.go +++ b/internal/pkg/api/handleAck_test.go @@ -411,7 +411,7 @@ func TestHandleAckEvents(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { logger := testlog.SetLogger(t) - cache, err := cache.New(cache.Config{NumCounters: 100, MaxCost: 100000}) + cache, err := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000}) if err != nil { t.Fatal(err) } diff --git a/internal/pkg/api/handleStatus_test.go b/internal/pkg/api/handleStatus_test.go index 474ab2a2d..f3d184b82 100644 --- a/internal/pkg/api/handleStatus_test.go +++ b/internal/pkg/api/handleStatus_test.go @@ -51,7 +51,7 @@ func TestHandleStatus(t *testing.T) { cfg := &config.Server{} cfg.InitDefaults() - c, err := cache.New(cache.Config{NumCounters: 100, MaxCost: 100000}) + c, err := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000}) require.NoError(t, err) authfnOk := func(r *http.Request) (*apikey.APIKey, error) { diff --git a/internal/pkg/api/router_test.go b/internal/pkg/api/router_test.go index a874958fd..9844c0ec9 100644 --- a/internal/pkg/api/router_test.go +++ b/internal/pkg/api/router_test.go @@ -38,7 +38,7 @@ func TestRun(t *testing.T) { cfg.Port = port verCon := mustBuildConstraints("8.0.0") - c, err := cache.New(cache.Config{NumCounters: 100, MaxCost: 100000}) + c, err := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000}) require.NoError(t, err) bulker := ftesting.NewMockBulk() pim := mock.NewMockMonitor() diff --git a/internal/pkg/bulk/bulk_integration_test.go b/internal/pkg/bulk/bulk_integration_test.go index cc08642c9..05bb6202d 100644 --- a/internal/pkg/bulk/bulk_integration_test.go +++ b/internal/pkg/bulk/bulk_integration_test.go @@ -265,7 +265,7 @@ func TestBulkSearch(t *testing.T) { } if len(res.Hits) != 1 { - t.Fatal(fmt.Sprintf("hit mismatch: %d", len(res.Hits))) + t.Fatalf("hit mismatch: %d", len(res.Hits)) } var dst3 testT diff --git a/internal/pkg/cache/cache.go b/internal/pkg/cache/cache.go index 67b2075e6..909988702 100644 --- a/internal/pkg/cache/cache.go +++ b/internal/pkg/cache/cache.go @@ -11,15 +11,15 @@ import ( "sync" "time" - "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/elastic/fleet-server/v7/internal/pkg/apikey" + "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/model" ) type Cache interface { - Reconfigure(Config) error + Reconfigure(config.Cache) error SetAction(model.Action) GetAction(id string) (model.Action, bool) @@ -39,37 +39,17 @@ type SecurityInfo = apikey.SecurityInfo type CacheT struct { cache Cacher - cfg Config + cfg config.Cache mut sync.RWMutex } -type Config struct { - NumCounters int64 // number of keys to track frequency of - MaxCost int64 // maximum cost of cache in 'cost' units - ActionTTL time.Duration - APIKeyTTL time.Duration - EnrollKeyTTL time.Duration - ArtifactTTL time.Duration - APIKeyJitter time.Duration -} - -func (c *Config) MarshalZerologObject(e *zerolog.Event) { - e.Int64("numCounters", c.NumCounters) - e.Int64("maxCost", c.MaxCost) - e.Dur("actionTTL", c.ActionTTL) - e.Dur("enrollTTL", c.EnrollKeyTTL) - e.Dur("artifactTTL", c.ArtifactTTL) - e.Dur("apiKeyTTL", c.APIKeyTTL) - e.Dur("apiKeyJitter", c.APIKeyJitter) -} - type actionCache struct { actionID string actionType string } // New creates a new cache. -func New(cfg Config) (*CacheT, error) { +func New(cfg config.Cache) (*CacheT, error) { cache, err := newCache(cfg) if err != nil { return nil, err @@ -84,7 +64,7 @@ func New(cfg Config) (*CacheT, error) { } // Reconfigure will drop cache -func (c *CacheT) Reconfigure(cfg Config) error { +func (c *CacheT) Reconfigure(cfg config.Cache) error { c.mut.Lock() defer c.mut.Unlock() diff --git a/internal/pkg/cache/impl_integration.go b/internal/pkg/cache/impl_integration.go index 013b4a6f5..6c418745b 100644 --- a/internal/pkg/cache/impl_integration.go +++ b/internal/pkg/cache/impl_integration.go @@ -9,9 +9,11 @@ package cache import ( "time" + + "github.com/elastic/fleet-server/v7/internal/pkg/config" ) -func newCache(_ Config) (Cacher, error) { +func newCache(_ config.Cache) (Cacher, error) { return &NoCache{}, nil } diff --git a/internal/pkg/cache/impl_ristretto.go b/internal/pkg/cache/impl_ristretto.go index 582ba23e7..b8a38a018 100644 --- a/internal/pkg/cache/impl_ristretto.go +++ b/internal/pkg/cache/impl_ristretto.go @@ -9,9 +9,11 @@ package cache import ( "github.com/dgraph-io/ristretto" + + "github.com/elastic/fleet-server/v7/internal/pkg/config" ) -func newCache(cfg Config) (Cacher, error) { +func newCache(cfg config.Cache) (Cacher, error) { rcfg := &ristretto.Config{ NumCounters: cfg.NumCounters, MaxCost: cfg.MaxCost, diff --git a/internal/pkg/config/cache.go b/internal/pkg/config/cache.go index c3bc5ab39..738a74c85 100644 --- a/internal/pkg/config/cache.go +++ b/internal/pkg/config/cache.go @@ -6,6 +6,8 @@ package config import ( "time" + + "github.com/rs/zerolog" ) const ( @@ -56,3 +58,28 @@ func (c *Cache) LoadLimits(limits *envLimits) { c.APIKeyJitter = defaultAPIKeyJitter } } + +// CopyCache returns a copy of the config's Cache settings +func CopyCache(cfg *Config) Cache { + ccfg := cfg.Inputs[0].Cache + return Cache{ + NumCounters: ccfg.NumCounters, + MaxCost: ccfg.MaxCost, + ActionTTL: ccfg.ActionTTL, + EnrollKeyTTL: ccfg.EnrollKeyTTL, + ArtifactTTL: ccfg.ArtifactTTL, + APIKeyTTL: ccfg.APIKeyTTL, + APIKeyJitter: ccfg.APIKeyJitter, + } +} + +// MarshalZerologObject turns the cache settings into a zerolog event +func (c *Cache) MarshalZerologObject(e *zerolog.Event) { + e.Int64("numCounters", c.NumCounters) + e.Int64("maxCost", c.MaxCost) + e.Dur("actionTTL", c.ActionTTL) + e.Dur("enrollTTL", c.EnrollKeyTTL) + e.Dur("artifactTTL", c.ArtifactTTL) + e.Dur("apiKeyTTL", c.APIKeyTTL) + e.Dur("apiKeyJitter", c.APIKeyJitter) +} diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index ce175a853..827ade339 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -23,6 +23,8 @@ var DefaultOptions = []ucfg.Option{ ucfg.FieldReplaceValues("inputs"), } +const kRedacted = "[redacted]" + // Config is the global configuration. type Config struct { Fleet Fleet `config:"fleet"` @@ -106,6 +108,66 @@ func (c *Config) Merge(other *Config) (*Config, error) { return cfg, nil } +func redactOutput(cfg *Config) Output { + redacted := cfg.Output + + if redacted.Elasticsearch.APIKey != "" { + redacted.Elasticsearch.APIKey = kRedacted + } + + if redacted.Elasticsearch.ServiceToken != "" { + redacted.Elasticsearch.ServiceToken = kRedacted + } + + if redacted.Elasticsearch.TLS != nil { + newTLS := *redacted.Elasticsearch.TLS + + if newTLS.Certificate.Key != "" { + newTLS.Certificate.Key = kRedacted + } + if newTLS.Certificate.Passphrase != "" { + newTLS.Certificate.Passphrase = kRedacted + } + + redacted.Elasticsearch.TLS = &newTLS + } + + return redacted +} + +func redactServer(cfg *Config) Server { + redacted := cfg.Inputs[0].Server + + if redacted.TLS != nil { + newTLS := *redacted.TLS + + if newTLS.Certificate.Key != "" { + newTLS.Certificate.Key = kRedacted + } + if newTLS.Certificate.Passphrase != "" { + newTLS.Certificate.Passphrase = kRedacted + } + + redacted.TLS = &newTLS + } + + return redacted +} + +// Redact returns a copy of the config with all sensitive attributes redacted. +func (c *Config) Redact() *Config { + redacted := &Config{ + Fleet: c.Fleet, + Output: c.Output, + Inputs: make([]Input, 1), + Logging: c.Logging, + HTTP: c.HTTP, + } + redacted.Inputs[0].Server = redactServer(c) + redacted.Output = redactOutput(c) + return redacted +} + func checkDeprecatedOptions(deprecatedOpts map[string]string, c *ucfg.Config) { for opt, message := range deprecatedOpts { if c.HasField(opt) { diff --git a/internal/pkg/policy/parsed_policy_test.go b/internal/pkg/policy/parsed_policy_test.go index 32ef271a7..957a24911 100644 --- a/internal/pkg/policy/parsed_policy_test.go +++ b/internal/pkg/policy/parsed_policy_test.go @@ -6,7 +6,6 @@ package policy import ( "encoding/json" - "fmt" "testing" "github.com/elastic/fleet-server/v7/internal/pkg/model" @@ -50,7 +49,7 @@ func TestNewParsedPolicy(t *testing.T) { for _, f := range fields { if _, ok := pp.Fields[f]; !ok { - t.Error(fmt.Sprintf("Missing field %s", f)) + t.Errorf("Missing field %s", f) } } @@ -70,7 +69,7 @@ func TestNewParsedPolicy(t *testing.T) { expectedSha2 := "d4d0840fe28ca4900129a749b56cee729562c0a88c935192c659252b5b0d762a" if defaultOutput.Role.Sha2 != expectedSha2 { - t.Fatal(fmt.Sprintf("Expected sha2: '%s', got '%s'.", expectedSha2, defaultOutput.Role.Sha2)) + t.Fatalf("Expected sha2: '%s', got '%s'.", expectedSha2, defaultOutput.Role.Sha2) } } } @@ -104,7 +103,7 @@ func TestNewParsedPolicyNoES(t *testing.T) { for _, f := range fields { if _, ok := pp.Fields[f]; !ok { - t.Error(fmt.Sprintf("Missing field %s", f)) + t.Errorf("Missing field %s", f) } } diff --git a/internal/pkg/server/agent.go b/internal/pkg/server/agent.go new file mode 100644 index 000000000..56b5adfd5 --- /dev/null +++ b/internal/pkg/server/agent.go @@ -0,0 +1,247 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + "time" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/go-ucfg" + "github.com/elastic/go-ucfg/yaml" + + "github.com/elastic/fleet-server/v7/internal/pkg/build" + "github.com/elastic/fleet-server/v7/internal/pkg/config" + "github.com/elastic/fleet-server/v7/internal/pkg/reload" + "github.com/elastic/fleet-server/v7/internal/pkg/sleep" + "github.com/elastic/fleet-server/v7/internal/pkg/status" + + "github.com/rs/zerolog/log" +) + +const kAgentModeRestartLoopDelay = 2 * time.Second + +type firstCfg struct { + cfg *config.Config + err error +} + +// Agent is a fleet-server that runs under the elastic-agent. +// An Agent instance will retrieve connection information from the passed reader (normally stdin). +// Agent uses client.StateInterface to gather config data and manage its lifecylce. +type Agent struct { + cliCfg *ucfg.Config + bi build.Info + reloadables []reload.Reloadable + + agent client.Client + + mux sync.Mutex + firstCfg chan firstCfg + srv *Fleet + srvCtx context.Context + srvCanceller context.CancelFunc + startChan chan struct{} +} + +// NewAgent returns an Agent that will gather connection information from the passed reader. +func NewAgent(cliCfg *ucfg.Config, reader io.Reader, bi build.Info, reloadables ...reload.Reloadable) (*Agent, error) { + var err error + + a := &Agent{ + cliCfg: cliCfg, + bi: bi, + reloadables: reloadables, + } + a.agent, err = client.NewFromReader(reader, a) + if err != nil { + return nil, err + } + return a, nil +} + +// Run starts a Server instance using config from the configured client. +func (a *Agent) Run(ctx context.Context) error { + ctx, canceller := context.WithCancel(ctx) + defer canceller() + + a.firstCfg = make(chan firstCfg) + a.startChan = make(chan struct{}, 1) + log.Info().Msg("starting communication connection back to Elastic Agent") + err := a.agent.Start(ctx) + if err != nil { + return err + } + + // wait for the initial configuration to be sent from the + // Elastic Agent before starting the actual Fleet Server. + log.Info().Msg("waiting for Elastic Agent to send initial configuration") + var cfg firstCfg + select { + case <-ctx.Done(): + return fmt.Errorf("never received initial configuration: %w", ctx.Err()) + case cfg = <-a.firstCfg: + } + + // possible that first configuration resulted in an error + if cfg.err != nil { + // unblock startChan even though there was an error + a.startChan <- struct{}{} + return cfg.err + } + + // start fleet server with the initial configuration and its + // own context (needed so when OnStop occurs the fleet server + // is stopped and not the elastic-agent-client as well) + srvCtx, srvCancel := context.WithCancel(ctx) + defer srvCancel() + log.Info().Msg("received initial configuration starting Fleet Server") + srv, err := NewFleet(cfg.cfg, a.bi, status.NewChained(status.NewLog(), a.agent)) + if err != nil { + // unblock startChan even though there was an error + a.startChan <- struct{}{} + return err + } + a.mux.Lock() + close(a.firstCfg) + a.firstCfg = nil + a.srv = srv + a.srvCtx = srvCtx + a.srvCanceller = srvCancel + a.mux.Unlock() + + // trigger startChan so OnConfig can continue + a.startChan <- struct{}{} + + // keep trying to restart the FleetServer on failure, reporting + // the status back to Elastic Agent + res := make(chan error) + go func() { + for { + err := a.srv.Run(srvCtx) + if err == nil || errors.Is(err, context.Canceled) { + res <- err + return + } + // sleep some before calling Run again + _ = sleep.WithContext(srvCtx, kAgentModeRestartLoopDelay) + } + }() + return <-res +} + +// OnConfig defines what the fleet-server running under the elastic-agent does when it receives a new config. +// This is part of the client.StateInterface definition. +func (a *Agent) OnConfig(s string) { + a.mux.Lock() + cliCfg := ucfg.MustNewFrom(a.cliCfg, config.DefaultOptions...) + srv := a.srv + ctx := a.srvCtx + canceller := a.srvCanceller + cfgChan := a.firstCfg + startChan := a.startChan + a.mux.Unlock() + + var cfg *config.Config + var err error + defer func() { + if err != nil { + if cfgChan != nil { + // failure on first config + cfgChan <- firstCfg{ + cfg: nil, + err: err, + } + // block until startChan signalled + <-startChan + return + } + + log.Err(err).Msg("failed to reload configuration") + if canceller != nil { + canceller() + } + } + }() + + // load configuration and then merge it on top of the CLI configuration + var cfgData *ucfg.Config + cfgData, err = yaml.NewConfig([]byte(s), config.DefaultOptions...) + if err != nil { + return + } + err = cliCfg.Merge(cfgData, config.DefaultOptions...) + if err != nil { + return + } + cfg, err = config.FromConfig(cliCfg) + if err != nil { + return + } + + // Pass config if it's the initial config on startup + // TODO maybe use sync.Once to make it clear that this block only occurs on startup? + if cfgChan != nil { + // reload the generic reloadables + for _, r := range a.reloadables { + err = r.Reload(ctx, cfg) + if err != nil { + return + } + } + + // send starting configuration so Fleet Server can start + cfgChan <- firstCfg{ + cfg: cfg, + err: nil, + } + + // block handling more OnConfig calls until the Fleet Server + // has been fully started + <-startChan + } else if srv != nil { // Reload config if the server is running. + // reload the generic reloadables + for _, r := range a.reloadables { + err = r.Reload(ctx, cfg) + if err != nil { + return + } + } + + // reload the server + err = srv.Reload(ctx, cfg) + if err != nil { + return + } + } else { + err = fmt.Errorf("internal service should have been started") + return + } +} + +// OnStop defines what the fleet-server running under the elastic-agent does when the agent sends a stop signal. +// This is part of the client.StateInterface definition. +// The root context will be cancelled to stop. +func (a *Agent) OnStop() { + a.mux.Lock() + canceller := a.srvCanceller + a.mux.Unlock() + + if canceller != nil { + canceller() + } +} + +// OnError defines what the fleet-server running under the elastic-agent does when there is an error communicating with the elastic-agent. +// This is part of the client.StateInterface definition. +// Communication errors will be logged. The elastic-agent-client handles +// retries and reconnects internally automatically. +func (a *Agent) OnError(err error) { + log.Err(err) +} diff --git a/internal/pkg/server/fleet.go b/internal/pkg/server/fleet.go new file mode 100644 index 000000000..fecc81b6d --- /dev/null +++ b/internal/pkg/server/fleet.go @@ -0,0 +1,587 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import ( + "context" + "errors" + "fmt" + "net/url" + "os" + "reflect" + "runtime/debug" + "time" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "go.elastic.co/apm" + apmtransport "go.elastic.co/apm/transport" + + "github.com/elastic/fleet-server/v7/internal/pkg/action" + "github.com/elastic/fleet-server/v7/internal/pkg/api" + "github.com/elastic/fleet-server/v7/internal/pkg/build" + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/cache" + "github.com/elastic/fleet-server/v7/internal/pkg/checkin" + "github.com/elastic/fleet-server/v7/internal/pkg/config" + "github.com/elastic/fleet-server/v7/internal/pkg/coordinator" + "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/elastic/fleet-server/v7/internal/pkg/gc" + "github.com/elastic/fleet-server/v7/internal/pkg/monitor" + "github.com/elastic/fleet-server/v7/internal/pkg/policy" + "github.com/elastic/fleet-server/v7/internal/pkg/profile" + "github.com/elastic/fleet-server/v7/internal/pkg/scheduler" + "github.com/elastic/fleet-server/v7/internal/pkg/status" + "github.com/elastic/fleet-server/v7/internal/pkg/ver" + + "github.com/hashicorp/go-version" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" +) + +const kUAFleetServer = "Fleet-Server" + +// Fleet is an instance of the fleet-server. +type Fleet struct { + bi build.Info + verCon version.Constraints + + cfg *config.Config + cfgCh chan *config.Config + cache cache.Cache + reporter status.Reporter +} + +// NewFleet creates the actual fleet server service. +func NewFleet(cfg *config.Config, bi build.Info, reporter status.Reporter) (*Fleet, error) { + verCon, err := api.BuildVersionConstraint(bi.Version) + if err != nil { + return nil, err + } + + err = cfg.LoadServerLimits() + if err != nil { + return nil, fmt.Errorf("encountered error while loading server limits: %w", err) + } + + cacheCfg := config.CopyCache(cfg) + log.Info().Interface("cfg", cacheCfg).Msg("Setting cache config options") + cache, err := cache.New(cacheCfg) + if err != nil { + return nil, err + } + + return &Fleet{ + bi: bi, + verCon: verCon, + cfg: cfg, + cfgCh: make(chan *config.Config, 1), + cache: cache, + reporter: reporter, + }, nil +} + +type runFunc func(context.Context) error + +// Run runs the fleet server +func (f *Fleet) Run(ctx context.Context) error { + var curCfg *config.Config + newCfg := f.cfg + + // Replace context with cancellable ctx + // in order to automatically cancel all the go routines + // that were started in the scope of this function on function exit + ctx, cn := context.WithCancel(ctx) + defer cn() + + stop := func(cn context.CancelFunc, g *errgroup.Group) { + if cn != nil { + cn() + } + if g != nil { + err := g.Wait() + if err != nil { + log.Error().Err(err).Msg("error encountered while stopping server") + } + } + } + + start := func(ctx context.Context, runfn runFunc, ech chan<- error) (*errgroup.Group, context.CancelFunc) { + ctx, cn = context.WithCancel(ctx) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + err := runfn(ctx) + if err != nil { + ech <- err + } + return err + }) + return g, cn + } + + var ( + proCancel, srvCancel context.CancelFunc + proEg, srvEg *errgroup.Group + ) + + started := false + +LOOP: + for { + ech := make(chan error, 2) + if started { + f.reporter.Status(proto.StateObserved_CONFIGURING, "Re-configuring", nil) //nolint:errcheck // unclear on what should we do if updating the status fails? + } else { + started = true + f.reporter.Status(proto.StateObserved_STARTING, "Starting", nil) //nolint:errcheck // unclear on what should we do if updating the status fails? + } + + err := newCfg.LoadServerLimits() + if err != nil { + return fmt.Errorf("encountered error while loading server limits: %w", err) + } + + // Create or recreate cache + if configCacheChanged(curCfg, newCfg) { + log.Info().Msg("reconfigure cache on configuration change") + cacheCfg := config.CopyCache(newCfg) + err := f.cache.Reconfigure(cacheCfg) + log.Info().Err(err).Interface("cfg", cacheCfg).Msg("reconfigure cache complete") + if err != nil { + return err + } + } + + // Start or restart profiler + if configChangedProfiler(curCfg, newCfg) { + if proCancel != nil { + log.Info().Msg("stopping profiler on configuration change") + stop(proCancel, proEg) + } + proEg, proCancel = nil, nil + if newCfg.Inputs[0].Server.Profiler.Enabled { + log.Info().Msg("starting profiler on configuration change") + proEg, proCancel = start(ctx, func(ctx context.Context) error { + return profile.RunProfiler(ctx, newCfg.Inputs[0].Server.Profiler.Bind) + }, ech) + } + } + + // Start or restart server + if configChangedServer(curCfg, newCfg) { + if srvCancel != nil { + log.Info().Msg("stopping server on configuration change") + stop(srvCancel, srvEg) + } + log.Info().Msg("starting server on configuration change") + srvEg, srvCancel = start(ctx, func(ctx context.Context) error { + return f.runServer(ctx, newCfg) + }, ech) + } + + curCfg = newCfg + f.cfg = curCfg + + select { + case newCfg = <-f.cfgCh: + log.Info().Msg("Server configuration update") + case err := <-ech: + f.reporter.Status(proto.StateObserved_FAILED, fmt.Sprintf("Error - %s", err), nil) //nolint:errcheck // unclear on what should we do if updating the status fails? + log.Error().Err(err).Msg("Fleet Server failed") + return err + case <-ctx.Done(): + f.reporter.Status(proto.StateObserved_STOPPING, "Stopping", nil) //nolint:errcheck // unclear on what should we do if updating the status fails? + break LOOP + } + } + + // Server is coming down; wait for the server group to exit cleanly. + // Timeout if something is locked up. + err := safeWait(srvEg, time.Second) + + // Eat cancel error to minimize confusion in logs + if errors.Is(err, context.Canceled) { + err = nil + } + + log.Info().Err(err).Msg("Fleet Server exited") + return err +} + +func configChangedProfiler(curCfg, newCfg *config.Config) bool { + changed := true + + switch { + case curCfg == nil: + case curCfg.Inputs[0].Server.Profiler.Enabled != newCfg.Inputs[0].Server.Profiler.Enabled: + case curCfg.Inputs[0].Server.Profiler.Bind != newCfg.Inputs[0].Server.Profiler.Bind: + default: + changed = false + } + + return changed +} + +func configCacheChanged(curCfg, newCfg *config.Config) bool { + if curCfg == nil { + return false + } + return curCfg.Inputs[0].Cache != newCfg.Inputs[0].Cache +} + +func configChangedServer(curCfg, newCfg *config.Config) bool { + zlog := log.With().Interface("new", newCfg.Redact()).Logger() + + changed := true + switch { + case curCfg == nil: + zlog.Info().Msg("initial server configuration") + case !reflect.DeepEqual(curCfg.Fleet, newCfg.Fleet): + zlog.Info(). + Interface("old", curCfg.Redact()). + Msg("fleet configuration has changed") + case !reflect.DeepEqual(curCfg.Output, newCfg.Output): + zlog.Info(). + Interface("old", curCfg.Redact()). + Msg("output configuration has changed") + case !reflect.DeepEqual(curCfg.Inputs[0].Server, newCfg.Inputs[0].Server): + zlog.Info(). + Interface("old", curCfg.Redact()). + Msg("server configuration has changed") + default: + changed = false + } + + return changed +} + +func safeWait(g *errgroup.Group, to time.Duration) error { + var err error + waitCh := make(chan error) + go func() { + waitCh <- g.Wait() + }() + + select { + case err = <-waitCh: + case <-time.After(to): + log.Warn().Msg("deadlock: goroutine locked up on errgroup.Wait()") + err = errors.New("group wait timeout") + } + + return err +} + +func loggedRunFunc(ctx context.Context, tag string, runfn runFunc) func() error { + return func() error { + + log.Debug().Msg(tag + " started") + + err := runfn(ctx) + + lvl := zerolog.DebugLevel + switch { + case err == nil: + case errors.Is(err, context.Canceled): + err = nil + default: + lvl = zerolog.ErrorLevel + } + + log.WithLevel(lvl).Err(err).Msg(tag + " exited") + return err + } +} + +func initRuntime(cfg *config.Config) { + gcPercent := cfg.Inputs[0].Server.Runtime.GCPercent + if gcPercent != 0 { + old := debug.SetGCPercent(gcPercent) + + log.Info(). + Int("old", old). + Int("new", gcPercent). + Msg("SetGCPercent") + } +} + +func (f *Fleet) initBulker(ctx context.Context, tracer *apm.Tracer, cfg *config.Config) (*bulk.Bulker, error) { + es, err := es.NewClient(ctx, cfg, false, elasticsearchOptions( + cfg.Inputs[0].Server.Instrumentation.Enabled, f.bi, + )...) + if err != nil { + return nil, err + } + + blk := bulk.NewBulker(es, tracer, bulk.BulkOptsFromCfg(cfg)...) + return blk, nil +} + +func (f *Fleet) runServer(ctx context.Context, cfg *config.Config) (err error) { + initRuntime(cfg) + + // The metricsServer is only enabled if http.enabled is set in the config + metricsServer, err := api.InitMetrics(ctx, cfg, f.bi) + switch { + case err != nil: + return err + case metricsServer != nil: + defer func() { + _ = metricsServer.Stop() + }() + } + + // Bulker is started in its own context and managed in the scope of this function. This is done so + // when the `ctx` is cancelled, the bulker will remain executing until this function exits. + // This allows the child subsystems to continue to write to the data store while tearing down. + bulkCtx, bulkCancel := context.WithCancel(context.Background()) + defer bulkCancel() + + // Create the APM tracer. + tracer, err := f.initTracer(cfg.Inputs[0].Server.Instrumentation) + if err != nil { + return err + } + + // Create the bulker subsystem + bulker, err := f.initBulker(bulkCtx, tracer, cfg) + if err != nil { + return err + } + + // Execute the bulker engine in a goroutine with its orphaned context. + // Create an error channel for the case where the bulker exits + // unexpectedly (ie. not cancelled by the bulkCancel context). + errCh := make(chan error) + + go func() { + runFunc := loggedRunFunc(bulkCtx, "Bulker", bulker.Run) + + // Emit the error from bulker.Run to the local error channel. + // The error group will be listening for it. (see comments below) + errCh <- runFunc() + }() + + // Wrap context with an error group context to manage the lifecycle + // of the subsystems. An error from any subsystem, or if the + // parent context is cancelled, will cancel the group. + // see https://pkg.go.dev/golang.org/x/sync/errgroup#Group.Go + g, ctx := errgroup.WithContext(ctx) + + // Stub a function for inclusion in the errgroup that exits when + // the bulker exits. If the bulker exits before the error group, + // this will tear down the error group and g.Wait() will return. + // Otherwise it will be a noop. + g.Go(func() (err error) { + select { + case err = <-errCh: + case <-ctx.Done(): + err = ctx.Err() + } + return + }) + + if tracer != nil { + go func() { + <-ctx.Done() + log.Info().Msg("flushing instrumentation tracer...") + tracer.Flush(nil) + tracer.Close() + }() + } + + if err = f.runSubsystems(ctx, cfg, g, bulker, tracer); err != nil { + return err + } + + return g.Wait() +} + +func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgroup.Group, bulker bulk.Bulk, tracer *apm.Tracer) (err error) { + esCli := bulker.Client() + + // Check version compatibility with Elasticsearch + remoteVersion, err := ver.CheckCompatibility(ctx, esCli, f.bi.Version) + if err != nil { + if len(remoteVersion) != 0 { + return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w", + f.bi.Version, remoteVersion, err) + } + return fmt.Errorf("failed version compatibility check with elasticsearch: %w", err) + } + + // Run migrations + loggedMigration := loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error { + return dl.Migrate(ctx, bulker) + }) + if err = loggedMigration(); err != nil { + return fmt.Errorf("failed to run subsystems: %w", err) + } + + // Run scheduler for periodic GC/cleanup + gcCfg := cfg.Inputs[0].Server.GC + sched, err := scheduler.New(gc.Schedules(bulker, gcCfg.ScheduleInterval, gcCfg.CleanupAfterExpiredInterval)) + if err != nil { + return fmt.Errorf("failed to create elasticsearch GC: %w", err) + } + g.Go(loggedRunFunc(ctx, "Elasticsearch GC", sched.Run)) + + // Monitoring es client, longer timeout, no retries + monCli, err := es.NewClient(ctx, cfg, true, elasticsearchOptions( + cfg.Inputs[0].Server.Instrumentation.Enabled, f.bi, + )...) + if err != nil { + return err + } + + // Coordinator policy monitor + pim, err := monitor.New(dl.FleetPolicies, esCli, monCli, + monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize), + monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout), + ) + if err != nil { + return err + } + + g.Go(loggedRunFunc(ctx, "Policy index monitor", pim.Run)) + cord := coordinator.NewMonitor(cfg.Fleet, f.bi.Version, bulker, pim, coordinator.NewCoordinatorZero) + g.Go(loggedRunFunc(ctx, "Coordinator policy monitor", cord.Run)) + + // Policy monitor + pm := policy.NewMonitor(bulker, pim, cfg.Inputs[0].Server.Limits.PolicyThrottle) + g.Go(loggedRunFunc(ctx, "Policy monitor", pm.Run)) + + // Policy self monitor + sm := policy.NewSelfMonitor(cfg.Fleet, bulker, pim, cfg.Inputs[0].Policy.ID, f.reporter) + g.Go(loggedRunFunc(ctx, "Policy self monitor", sm.Run)) + + // Actions monitoring + var am monitor.SimpleMonitor + var ad *action.Dispatcher + var tr *action.TokenResolver + + am, err = monitor.NewSimple(dl.FleetActions, esCli, monCli, + monitor.WithExpiration(true), + monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize), + monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout), + ) + if err != nil { + return err + } + g.Go(loggedRunFunc(ctx, "Revision monitor", am.Run)) + + ad = action.NewDispatcher(am) + g.Go(loggedRunFunc(ctx, "Revision dispatcher", ad.Run)) + tr, err = action.NewTokenResolver(bulker) + if err != nil { + return err + } + + bc := checkin.NewBulk(bulker) + g.Go(loggedRunFunc(ctx, "Bulk checkin", bc.Run)) + + ct := api.NewCheckinT(f.verCon, &cfg.Inputs[0].Server, f.cache, bc, pm, am, ad, tr, bulker) + et, err := api.NewEnrollerT(f.verCon, &cfg.Inputs[0].Server, bulker, f.cache) + if err != nil { + return err + } + + at := api.NewArtifactT(&cfg.Inputs[0].Server, bulker, f.cache) + ack := api.NewAckT(&cfg.Inputs[0].Server, bulker, f.cache) + st := api.NewStatusT(&cfg.Inputs[0].Server, bulker, f.cache) + + router := api.NewRouter(&cfg.Inputs[0].Server, bulker, ct, et, at, ack, st, sm, tracer, f.bi) + + g.Go(loggedRunFunc(ctx, "Http server", func(ctx context.Context) error { + return router.Run(ctx) + })) + + return err +} + +// Reload reloads the fleet server with the latest configuration. +func (f *Fleet) Reload(ctx context.Context, cfg *config.Config) error { + select { + case f.cfgCh <- cfg: + case <-ctx.Done(): + } + return nil +} + +func (f *Fleet) initTracer(cfg config.Instrumentation) (*apm.Tracer, error) { + if !cfg.Enabled { + return nil, nil + } + + log.Info().Msg("fleet-server instrumentation is enabled") + + // TODO(marclop): Ideally, we'd use apmtransport.NewHTTPTransportOptions() + // but it doesn't exist today. Update this code once we have something + // available via the APM Go agent. + const ( + envVerifyServerCert = "ELASTIC_APM_VERIFY_SERVER_CERT" + envServerCert = "ELASTIC_APM_SERVER_CERT" + envCACert = "ELASTIC_APM_SERVER_CA_CERT_FILE" + envGlobalLabels = "ELASTIC_APM_GLOBAL_LABELS" + envTransactionSampleRate = "ELASTIC_APM_TRANSACTION_SAMPLE_RATE" + ) + if cfg.TLS.SkipVerify { + os.Setenv(envVerifyServerCert, "false") + defer os.Unsetenv(envVerifyServerCert) + } + if cfg.TLS.ServerCertificate != "" { + os.Setenv(envServerCert, cfg.TLS.ServerCertificate) + defer os.Unsetenv(envServerCert) + } + if cfg.TLS.ServerCA != "" { + os.Setenv(envCACert, cfg.TLS.ServerCA) + defer os.Unsetenv(envCACert) + } + if cfg.GlobalLabels != "" { + os.Setenv(envGlobalLabels, cfg.GlobalLabels) + defer os.Unsetenv(envGlobalLabels) + } + if cfg.TransactionSampleRate != "" { + os.Setenv(envTransactionSampleRate, cfg.TransactionSampleRate) + defer os.Unsetenv(envTransactionSampleRate) + } + transport, err := apmtransport.NewHTTPTransport() + if err != nil { + return nil, err + } + + if len(cfg.Hosts) > 0 { + hosts := make([]*url.URL, 0, len(cfg.Hosts)) + for _, host := range cfg.Hosts { + u, err := url.Parse(host) + if err != nil { + return nil, fmt.Errorf("failed parsing %s: %w", host, err) + } + hosts = append(hosts, u) + } + transport.SetServerURL(hosts...) + } + if cfg.APIKey != "" { + transport.SetAPIKey(cfg.APIKey) + } else { + transport.SetSecretToken(cfg.SecretToken) + } + return apm.NewTracerOptions(apm.TracerOptions{ + ServiceName: "fleet-server", + ServiceVersion: f.bi.Version, + ServiceEnvironment: cfg.Environment, + Transport: transport, + }) +} + +func elasticsearchOptions(instumented bool, bi build.Info) []es.ConfigOption { + options := []es.ConfigOption{es.WithUserAgent(kUAFleetServer, bi)} + if instumented { + options = append(options, es.InstrumentRoundTripper()) + } + return options +} diff --git a/internal/pkg/server/server.go b/internal/pkg/server/server.go new file mode 100644 index 000000000..b8f4c1a0a --- /dev/null +++ b/internal/pkg/server/server.go @@ -0,0 +1,15 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Package server defines the fleet-server instance. +package server + +import ( + "context" +) + +// Server defines the interface to run the service instance. +type Server interface { + Run(context.Context) error +}