diff --git a/cmd/launcher/control.go b/cmd/launcher/control.go index 6b2742805..ee0dff5d4 100644 --- a/cmd/launcher/control.go +++ b/cmd/launcher/control.go @@ -12,7 +12,7 @@ import ( ) func createHTTPClient(ctx context.Context, k types.Knapsack) (*control.HTTPClient, error) { - k.Slogger().Log(context.TODO(), slog.LevelDebug, + k.Slogger().Log(ctx, slog.LevelDebug, "creating control http client", ) @@ -35,7 +35,7 @@ func createControlService(ctx context.Context, store types.GetterSetter, k types ctx, span := traces.StartSpan(ctx) defer span.End() - k.Slogger().Log(context.TODO(), slog.LevelDebug, + k.Slogger().Log(ctx, slog.LevelDebug, "creating control service", ) diff --git a/cmd/launcher/extension.go b/cmd/launcher/extension.go index 59b06f487..250fff4b5 100644 --- a/cmd/launcher/extension.go +++ b/cmd/launcher/extension.go @@ -6,45 +6,16 @@ import ( "fmt" "log/slog" "os" - "path/filepath" "github.com/go-kit/kit/log" - "github.com/kolide/kit/actor" - "github.com/kolide/launcher/cmd/launcher/internal" "github.com/kolide/launcher/ee/agent/types" - kolidelog "github.com/kolide/launcher/ee/log/osquerylogs" - "github.com/kolide/launcher/pkg/augeas" "github.com/kolide/launcher/pkg/contexts/ctxlog" "github.com/kolide/launcher/pkg/osquery" - "github.com/kolide/launcher/pkg/osquery/runtime" - ktable "github.com/kolide/launcher/pkg/osquery/table" "github.com/kolide/launcher/pkg/service" "github.com/kolide/launcher/pkg/traces" - "github.com/osquery/osquery-go/plugin/config" - "github.com/osquery/osquery-go/plugin/distributed" - osquerylogger "github.com/osquery/osquery-go/plugin/logger" ) -// actorQuerier is a type wrapper over kolide/kit/actor. This should -// probably all be refactored into reasonable interfaces. But that's -// going to be pretty extensive work. -type actorQuerier struct { - actor.Actor - querier func(query string) ([]map[string]string, error) -} - -func (aq actorQuerier) Query(query string) ([]map[string]string, error) { - return aq.querier(query) -} - -// TODO: the extension, runtime, and client are all kind of entangled -// here. Untangle the underlying libraries and separate into units -func createExtensionRuntime(ctx context.Context, k types.Knapsack, launcherClient service.KolideService) ( - run *actorQuerier, - restart func() error, // restart osqueryd runner - shutdown func() error, // shutdown osqueryd runner - err error, -) { +func createExtensionRuntime(ctx context.Context, k types.Knapsack, launcherClient service.KolideService) (*osquery.Extension, error) { ctx, span := traces.StartSpan(ctx) defer span.End() @@ -58,7 +29,7 @@ func createExtensionRuntime(ctx context.Context, k types.Knapsack, launcherClien } else if k.EnrollSecretPath() != "" { content, err := os.ReadFile(k.EnrollSecretPath()) if err != nil { - return nil, nil, nil, fmt.Errorf("could not read enroll_secret_path: %s: %w", k.EnrollSecretPath(), err) + return nil, fmt.Errorf("could not read enroll_secret_path: %s: %w", k.EnrollSecretPath(), err) } enrollSecret = string(bytes.TrimSpace(content)) } @@ -95,199 +66,5 @@ func createExtensionRuntime(ctx context.Context, k types.Knapsack, launcherClien } // create the extension - ext, err := osquery.NewExtension(ctx, launcherClient, k, extOpts) - if err != nil { - return nil, nil, nil, fmt.Errorf("starting grpc extension: %w", err) - } - - var runnerOptions []runtime.OsqueryInstanceOption - - if k.Transport() == "osquery" { - var err error - runnerOptions, err = osqueryRunnerOptions(logger, k) - if err != nil { - return nil, nil, nil, fmt.Errorf("creating osquery runner options: %w", err) - } - } else { - runnerOptions = grpcRunnerOptions(logger, k, ext) - } - - runner := runtime.LaunchUnstartedInstance(runnerOptions...) - - restartFunc := func() error { - slogger.Log(context.TODO(), slog.LevelDebug, - "osquery instance runner restart called", - ) - - return runner.Restart() - } - - osqCtx, osqCancel := context.WithCancel(ctx) - - return &actorQuerier{ - Actor: actor.Actor{ - // and the methods for starting and stopping the extension - Execute: func() error { - // Attempt to enroll before starting up osquery. If we can't enroll now, don't error out -- - // we'll attempt again the first time osquery calls launcher plugins. - _, invalid, err := ext.Enroll(osqCtx) - if err != nil { - slogger.Log(osqCtx, slog.LevelDebug, - "error performing enrollment", - "err", err, - ) - } else if invalid { - slogger.Log(osqCtx, slog.LevelDebug, - "invalid enroll secret", - "err", err, - ) - } - - // Start the osqueryd instance -- pass in cancel so the osquery runner can let - // this function know to stop waiting when the runner shuts down - if err := runner.Start(osqCancel); err != nil { - return fmt.Errorf("launching osquery instance: %w", err) - } - - // If we're using osquery transport, we don't need the extension - if k.Transport() == "osquery" { - slogger.Log(osqCtx, slog.LevelDebug, - "using osquery transport, skipping extension startup", - ) - - // TODO: remove when underlying libs are refactored - // everything exits right now, so block this actor on the context finishing - <-osqCtx.Done() - return nil - } - - // The runner allows querying the osqueryd instance from the extension. - ext.SetQuerier(runner) - - // start the extension - ext.Start() - - slogger.Log(osqCtx, slog.LevelInfo, - "extension started", - ) - - // TODO: remove when underlying libs are refactored - // everything exits right now, so block this actor on the context finishing - <-osqCtx.Done() - return nil - }, - Interrupt: func(_ error) { - ext.Shutdown() - if runner != nil { - if err := runner.Shutdown(); err != nil { - slogger.Log(osqCtx, slog.LevelInfo, - "error shutting down runtime", - "err", err, - ) - slogger.Log(osqCtx, slog.LevelDebug, - "error shutting down runtime", - "err", err, - "stack", fmt.Sprintf("%+v", err), - ) - } - } - osqCancel() - }, - }, - querier: runner.Query, - }, - restartFunc, - runner.Shutdown, - nil -} - -// commonRunnerOptions returns osquery runtime options common to all transports -func commonRunnerOptions(logger log.Logger, k types.Knapsack) []runtime.OsqueryInstanceOption { - // create the logging adapters for osquery - osqueryStderrLogger := kolidelog.NewOsqueryLogAdapter( - k.Slogger().With( - "component", "osquery", - "osqlevel", "stderr", - ), - k.RootDirectory(), - kolidelog.WithLevel(slog.LevelInfo), - ) - osqueryStdoutLogger := kolidelog.NewOsqueryLogAdapter( - k.Slogger().With( - "component", "osquery", - "osqlevel", "stdout", - ), - k.RootDirectory(), - kolidelog.WithLevel(slog.LevelDebug), - ) - - return []runtime.OsqueryInstanceOption{ - runtime.WithKnapsack(k), - runtime.WithOsquerydBinary(k.OsquerydPath()), - runtime.WithRootDirectory(k.RootDirectory()), - runtime.WithOsqueryExtensionPlugins(ktable.LauncherTables(k)...), - runtime.WithStdout(osqueryStdoutLogger), - runtime.WithStderr(osqueryStderrLogger), - runtime.WithLogger(logger), - runtime.WithOsqueryVerbose(k.OsqueryVerbose()), - runtime.WithOsqueryFlags(k.OsqueryFlags()), - runtime.WithAugeasLensFunction(augeas.InstallLenses), - runtime.WithAutoloadedExtensions(k.AutoloadedExtensions()...), - runtime.WithUpdateDirectory(k.UpdateDirectory()), - runtime.WithUpdateChannel(k.UpdateChannel()), - } -} - -// osqueryRunnerOptions returns the osquery runtime options when using native osquery transport -func osqueryRunnerOptions(logger log.Logger, k types.Knapsack) ([]runtime.OsqueryInstanceOption, error) { - // As osquery requires TLS server certs, we'll use our embedded defaults if not specified - caCertFile := k.RootPEM() - if caCertFile == "" { - var err error - caCertFile, err = internal.InstallCaCerts(k.RootDirectory()) - if err != nil { - return nil, fmt.Errorf("writing CA certs: %w", err) - } - } - - runtimeOptions := append( - commonRunnerOptions(logger, k), - runtime.WithConfigPluginFlag("tls"), - runtime.WithDistributedPluginFlag("tls"), - runtime.WithLoggerPluginFlag("tls"), - runtime.WithTlsConfigEndpoint(k.OsqueryTlsConfigEndpoint()), - runtime.WithTlsDistributedReadEndpoint(k.OsqueryTlsDistributedReadEndpoint()), - runtime.WithTlsDistributedWriteEndpoint(k.OsqueryTlsDistributedWriteEndpoint()), - runtime.WithTlsEnrollEndpoint(k.OsqueryTlsEnrollEndpoint()), - runtime.WithTlsHostname(k.KolideServerURL()), - runtime.WithTlsLoggerEndpoint(k.OsqueryTlsLoggerEndpoint()), - runtime.WithTlsServerCerts(caCertFile), - ) - - // Enroll secrets... Either we pass a file, or we write a - // secret, and pass _that_ file - if k.EnrollSecretPath() != "" { - runtimeOptions = append(runtimeOptions, runtime.WithEnrollSecretPath(k.EnrollSecretPath())) - } else if k.EnrollSecret() != "" { - filename := filepath.Join(k.RootDirectory(), "secret") - os.WriteFile(filename, []byte(k.EnrollSecret()), 0400) - runtimeOptions = append(runtimeOptions, runtime.WithEnrollSecretPath(filename)) - } - - return runtimeOptions, nil -} - -// grpcRunnerOptions returns the osquery runtime options when using launcher transports. (Eg: grpc or jsonrpc) -func grpcRunnerOptions(logger log.Logger, k types.Knapsack, ext *osquery.Extension) []runtime.OsqueryInstanceOption { - return append( - commonRunnerOptions(logger, k), - runtime.WithConfigPluginFlag("kolide_grpc"), - runtime.WithLoggerPluginFlag("kolide_grpc"), - runtime.WithDistributedPluginFlag("kolide_grpc"), - runtime.WithOsqueryExtensionPlugins( - config.NewPlugin("kolide_grpc", ext.GenerateConfigs), - distributed.NewPlugin("kolide_grpc", ext.GetQueries, ext.WriteResults), - osquerylogger.NewPlugin("kolide_grpc", ext.LogString), - ), - ) + return osquery.NewExtension(ctx, launcherClient, k, extOpts) } diff --git a/cmd/launcher/launcher.go b/cmd/launcher/launcher.go index 5054497a2..fdcee4f52 100644 --- a/cmd/launcher/launcher.go +++ b/cmd/launcher/launcher.go @@ -38,8 +38,10 @@ import ( "github.com/kolide/launcher/ee/debug/checkups" desktopRunner "github.com/kolide/launcher/ee/desktop/runner" "github.com/kolide/launcher/ee/localserver" + kolidelog "github.com/kolide/launcher/ee/log/osquerylogs" "github.com/kolide/launcher/ee/powereventwatcher" "github.com/kolide/launcher/ee/tuf" + "github.com/kolide/launcher/pkg/augeas" "github.com/kolide/launcher/pkg/backoff" "github.com/kolide/launcher/pkg/contexts/ctxlog" "github.com/kolide/launcher/pkg/debug" @@ -49,11 +51,16 @@ import ( "github.com/kolide/launcher/pkg/log/teelogger" "github.com/kolide/launcher/pkg/osquery" "github.com/kolide/launcher/pkg/osquery/runsimple" + osqueryruntime "github.com/kolide/launcher/pkg/osquery/runtime" osqueryInstanceHistory "github.com/kolide/launcher/pkg/osquery/runtime/history" + "github.com/kolide/launcher/pkg/osquery/table" "github.com/kolide/launcher/pkg/rungroup" "github.com/kolide/launcher/pkg/service" "github.com/kolide/launcher/pkg/traces" "github.com/kolide/launcher/pkg/traces/exporter" + "github.com/osquery/osquery-go/plugin/config" + "github.com/osquery/osquery-go/plugin/distributed" + osquerylogger "github.com/osquery/osquery-go/plugin/logger" "go.etcd.io/bbolt" ) @@ -308,12 +315,52 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl return fmt.Errorf("error initializing osquery instance history: %w", err) } - // create the osquery extension for launcher. This is where osquery itself is launched. - extension, runnerRestart, _, err := createExtensionRuntime(ctx, k, client) + // create the osquery extension + extension, err := createExtensionRuntime(ctx, k, client) if err != nil { return fmt.Errorf("create extension with runtime: %w", err) } - runGroup.Add("osqueryExtension", extension.Execute, extension.Interrupt) + runGroup.Add("osqueryExtension", extension.Execute, extension.Shutdown) + // create the runner that will launch osquery + osqueryRunner := osqueryruntime.New( + k, + osqueryruntime.WithKnapsack(k), + osqueryruntime.WithOsquerydBinary(k.OsquerydPath()), + osqueryruntime.WithRootDirectory(k.RootDirectory()), + osqueryruntime.WithOsqueryExtensionPlugins(table.LauncherTables(k)...), + osqueryruntime.WithLogger(logger), + osqueryruntime.WithOsqueryVerbose(k.OsqueryVerbose()), + osqueryruntime.WithOsqueryFlags(k.OsqueryFlags()), + osqueryruntime.WithStdout(kolidelog.NewOsqueryLogAdapter( + k.Slogger().With( + "component", "osquery", + "osqlevel", "stdout", + ), + k.RootDirectory(), + kolidelog.WithLevel(slog.LevelDebug), + )), + osqueryruntime.WithStderr(kolidelog.NewOsqueryLogAdapter( + k.Slogger().With( + "component", "osquery", + "osqlevel", "stderr", + ), + k.RootDirectory(), + kolidelog.WithLevel(slog.LevelInfo), + )), + osqueryruntime.WithAugeasLensFunction(augeas.InstallLenses), + osqueryruntime.WithAutoloadedExtensions(k.AutoloadedExtensions()...), + osqueryruntime.WithUpdateDirectory(k.UpdateDirectory()), + osqueryruntime.WithUpdateChannel(k.UpdateChannel()), + osqueryruntime.WithConfigPluginFlag("kolide_grpc"), + osqueryruntime.WithLoggerPluginFlag("kolide_grpc"), + osqueryruntime.WithDistributedPluginFlag("kolide_grpc"), + osqueryruntime.WithOsqueryExtensionPlugins( + config.NewPlugin("kolide_grpc", extension.GenerateConfigs), + distributed.NewPlugin("kolide_grpc", extension.GetQueries, extension.WriteResults), + osquerylogger.NewPlugin("kolide_grpc", extension.LogString), + ), + ) + runGroup.Add("osqueryRunner", osqueryRunner.Run, osqueryRunner.Interrupt) versionInfo := version.Version() k.SystemSlogger().Log(ctx, slog.LevelInfo, @@ -323,7 +370,7 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl ) if traceExporter != nil { - traceExporter.SetOsqueryClient(extension) + traceExporter.SetOsqueryClient(osqueryRunner) } // Create the control service and services that depend on it @@ -436,7 +483,7 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl ) } - ls.SetQuerier(extension) + ls.SetQuerier(osqueryRunner) runGroup.Add("localserver", ls.Start, ls.Interrupt) } @@ -452,8 +499,8 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl k, metadataClient, mirrorClient, - extension, - tuf.WithOsqueryRestart(runnerRestart), + osqueryRunner, + tuf.WithOsqueryRestart(osqueryRunner.Restart), ) if err != nil { return fmt.Errorf("creating TUF autoupdater updater: %w", err) diff --git a/cmd/launcher/run_socket.go b/cmd/launcher/run_socket.go index 6602aa7b3..dcef8bf8f 100644 --- a/cmd/launcher/run_socket.go +++ b/cmd/launcher/run_socket.go @@ -1,7 +1,6 @@ package main import ( - "context" "flag" "fmt" "os" @@ -9,9 +8,14 @@ import ( "path/filepath" "syscall" + "github.com/go-kit/kit/log" "github.com/kolide/kit/env" "github.com/kolide/kit/fsutil" "github.com/kolide/launcher/ee/agent" + "github.com/kolide/launcher/ee/agent/flags" + "github.com/kolide/launcher/ee/agent/knapsack" + "github.com/kolide/launcher/ee/agent/storage/inmemory" + "github.com/kolide/launcher/pkg/launcher" "github.com/kolide/launcher/pkg/osquery/runtime" "github.com/kolide/launcher/pkg/osquery/table" ) @@ -49,11 +53,17 @@ func runSocket(args []string) error { opts = append(opts, runtime.WithOsqueryExtensionPlugins(table.LauncherTables(nil)...)) } - _, cancel := context.WithCancel(context.Background()) - runner, err := runtime.LaunchInstance(cancel, opts...) + // were passing an empty array here just to get the default options + cmdlineopts, err := launcher.ParseOptions("socket", make([]string, 0)) if err != nil { - return fmt.Errorf("creating osquery instance: %w", err) + return err } + logger := log.NewLogfmtLogger(os.Stdout) + fcOpts := []flags.Option{flags.WithCmdLineOpts(cmdlineopts)} + flagController := flags.NewFlagController(logger, inmemory.NewStore(), fcOpts...) + k := knapsack.New(nil, flagController, nil, nil, nil) + runner := runtime.New(k, opts...) + go runner.Run() fmt.Println(*flPath) diff --git a/pkg/osquery/extension.go b/pkg/osquery/extension.go index ce71add2f..76f05e734 100644 --- a/pkg/osquery/extension.go +++ b/pkg/osquery/extension.go @@ -44,23 +44,7 @@ type Extension struct { enrollMutex sync.Mutex done chan struct{} interrupted bool - wg sync.WaitGroup slogger *slog.Logger - - initialRunner *initialRunner -} - -// SetQuerier sets an osquery client on the extension, allowing -// the extension to query the running osqueryd instance. -func (e *Extension) SetQuerier(client Querier) { - if e.initialRunner != nil { - e.initialRunner.client = client - } -} - -// Querier allows querying osquery. -type Querier interface { - Query(sql string) ([]map[string]string, error) } const ( @@ -126,6 +110,8 @@ func NewExtension(ctx context.Context, client service.KolideService, k types.Kna _, span := traces.StartSpan(ctx) defer span.End() + slogger := k.Slogger().With("component", "osquery_extension") + if opts.EnrollSecret == "" { return nil, errors.New("empty enroll secret") } @@ -160,57 +146,53 @@ func NewExtension(ctx context.Context, client service.KolideService, k types.Kna return nil, fmt.Errorf("setting up agent keys: %w", err) } - identifier, err := IdentifierFromDB(configStore) - if err != nil { - return nil, fmt.Errorf("get host identifier from db when creating new extension: %w", err) - } - nodekey, err := NodeKey(configStore) if err != nil { - k.Slogger().Log(context.TODO(), slog.LevelDebug, + slogger.Log(ctx, slog.LevelDebug, "NewExtension got error reading nodekey. Ignoring", "err", err, ) return nil, fmt.Errorf("reading nodekey from db: %w", err) } else if nodekey == "" { - k.Slogger().Log(context.TODO(), slog.LevelDebug, + slogger.Log(ctx, slog.LevelDebug, "NewExtension did not find a nodekey. Likely first enroll", ) } else { - k.Slogger().Log(context.TODO(), slog.LevelDebug, + slogger.Log(ctx, slog.LevelDebug, "NewExtension found existing nodekey", ) } - initialRunner := &initialRunner{ - slogger: k.Slogger().With("component", "initial_runner"), - identifier: identifier, - store: k.InitialResultsStore(), - enabled: false, // currently, we don't want to run the initial runner, even if the flag is set - } - return &Extension{ - slogger: k.Slogger().With("component", "osquery_extension"), + slogger: slogger, serviceClient: client, knapsack: k, NodeKey: nodekey, Opts: opts, done: make(chan struct{}), - initialRunner: initialRunner, }, nil } -// Start begins the goroutines responsible for background processing (currently -// just the log buffer flushing routine). It should be shut down by calling the -// Shutdown() method. -func (e *Extension) Start() { - e.wg.Add(1) - go e.writeLogsLoopRunner() +func (e *Extension) Execute() error { + // Process logs until shutdown + ticker := e.Opts.Clock.NewTicker(e.Opts.LoggingInterval) + defer ticker.Stop() + for { + e.writeAndPurgeLogs() + + // select to either exit or write another batch of logs + select { + case <-e.done: + return nil + case <-ticker.Chan(): + // Resume loop + } + } } // Shutdown should be called to cleanup the resources and goroutines associated // with this extension. -func (e *Extension) Shutdown() { +func (e *Extension) Shutdown(_ error) { // Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls. if e.interrupted { return @@ -218,7 +200,6 @@ func (e *Extension) Shutdown() { e.interrupted = true close(e.done) - e.wg.Wait() } // getHostIdentifier returns the UUID identifier associated with this host. If @@ -588,10 +569,6 @@ func (e *Extension) generateConfigsWithReenroll(ctx context.Context, reenroll bo } config = e.setVerbose(config, osqueryVerbose) - if err := e.initialRunner.Execute(config, e.writeLogsWithReenroll); err != nil { - return "", fmt.Errorf("initial run results: %w", err) - } - return config, nil } @@ -697,23 +674,6 @@ func (e *Extension) writeAndPurgeLogs() { } } -func (e *Extension) writeLogsLoopRunner() { - defer e.wg.Done() - ticker := e.Opts.Clock.NewTicker(e.Opts.LoggingInterval) - defer ticker.Stop() - for { - e.writeAndPurgeLogs() - - // select to either exit or write another batch of logs - select { - case <-e.done: - return - case <-ticker.Chan(): - // Resume loop - } - } -} - // numberOfBufferedLogs returns the number of logs buffered for a given type. func (e *Extension) numberOfBufferedLogs(typ logger.LogType) (int, error) { bucketName, err := bucketNameFromLogType(typ) @@ -1018,3 +978,11 @@ func (e *Extension) writeResultsWithReenroll(ctx context.Context, results []dist return nil } + +func minInt(a, b int) int { + if a < b { + return a + } + + return b +} diff --git a/pkg/osquery/extension_test.go b/pkg/osquery/extension_test.go index 649ac258b..7a8c33615 100644 --- a/pkg/osquery/extension_test.go +++ b/pkg/osquery/extension_test.go @@ -57,14 +57,15 @@ func makeKnapsack(t *testing.T, db *bbolt.DB) types.Knapsack { m.On("OsquerydPath").Maybe().Return("") m.On("LatestOsquerydPath", testifymock.Anything).Maybe().Return("") m.On("ConfigStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.ConfigStore.String())) - m.On("InitialResultsStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.InitialResultsStore.String())) m.On("Slogger").Return(multislogger.New().Logger) return m } func TestNewExtensionEmptyEnrollSecret(t *testing.T) { + m := mocks.NewKnapsack(t) + m.On("Slogger").Return(multislogger.New().Logger) - e, err := NewExtension(context.TODO(), &mock.KolideService{}, nil, ExtensionOpts{}) + e, err := NewExtension(context.TODO(), &mock.KolideService{}, m, ExtensionOpts{}) assert.NotNil(t, err) assert.Nil(t, e) } @@ -165,7 +166,6 @@ func TestExtensionEnrollTransportError(t *testing.T) { e, err := NewExtension(context.TODO(), m, k, ExtensionOpts{EnrollSecret: "enroll_secret"}) require.Nil(t, err) - e.SetQuerier(mockClient{}) key, invalid, err := e.Enroll(context.Background()) assert.True(t, m.RequestEnrollmentFuncInvoked) @@ -174,23 +174,6 @@ func TestExtensionEnrollTransportError(t *testing.T) { assert.NotNil(t, err) } -type mockClient struct{} - -func (mockClient) Query(sql string) ([]map[string]string, error) { - return []map[string]string{ - { - "os_version": "", - "launcher_version": "", - "os_build": "", - "platform": "", - "hostname": "", - "hardware_vendor": "", - "hardware_model": "", - "osquery_version": "", - }, - }, nil -} - func TestExtensionEnrollSecretInvalid(t *testing.T) { m := &mock.KolideService{ @@ -203,7 +186,6 @@ func TestExtensionEnrollSecretInvalid(t *testing.T) { defer cleanup() e, err := NewExtension(context.TODO(), m, k, ExtensionOpts{EnrollSecret: "enroll_secret"}) require.Nil(t, err) - e.SetQuerier(mockClient{}) key, invalid, err := e.Enroll(context.Background()) assert.True(t, m.RequestEnrollmentFuncInvoked) @@ -228,7 +210,6 @@ func TestExtensionEnroll(t *testing.T) { expectedEnrollSecret := "foo_secret" e, err := NewExtension(context.TODO(), m, k, ExtensionOpts{EnrollSecret: expectedEnrollSecret}) require.Nil(t, err) - e.SetQuerier(mockClient{}) key, invalid, err := e.Enroll(context.Background()) require.Nil(t, err) @@ -248,7 +229,6 @@ func TestExtensionEnroll(t *testing.T) { e, err = NewExtension(context.TODO(), m, k, ExtensionOpts{EnrollSecret: expectedEnrollSecret}) require.Nil(t, err) - e.SetQuerier(mockClient{}) // Still should not re-enroll (because node key stored in DB) key, invalid, err = e.Enroll(context.Background()) require.Nil(t, err) @@ -341,7 +321,6 @@ func TestExtensionGenerateConfigsEnrollmentInvalid(t *testing.T) { e, err := NewExtension(context.TODO(), m, k, ExtensionOpts{EnrollSecret: "enroll_secret"}) require.Nil(t, err) e.NodeKey = "bad_node_key" - e.SetQuerier(mockClient{}) configs, err := e.GenerateConfigs(context.Background()) assert.True(t, m.RequestConfigFuncInvoked) @@ -408,7 +387,6 @@ func TestExtensionWriteLogsEnrollmentInvalid(t *testing.T) { e, err := NewExtension(context.TODO(), m, k, ExtensionOpts{EnrollSecret: "enroll_secret"}) require.Nil(t, err) e.NodeKey = "bad_node_key" - e.SetQuerier(mockClient{}) err = e.writeLogsWithReenroll(context.Background(), logger.LogTypeString, []string{"foobar"}, true) assert.True(t, m.PublishLogsFuncInvoked) @@ -507,7 +485,6 @@ func TestExtensionWriteBufferedLogsEmpty(t *testing.T) { k := mocks.NewKnapsack(t) k.On("ConfigStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.ConfigStore.String())) - k.On("InitialResultsStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.InitialResultsStore.String())) k.On("BboltDB").Return(db) k.On("Slogger").Return(multislogger.New().Logger).Maybe() @@ -546,7 +523,6 @@ func TestExtensionWriteBufferedLogs(t *testing.T) { k := mocks.NewKnapsack(t) k.On("ConfigStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.ConfigStore.String())) - k.On("InitialResultsStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.InitialResultsStore.String())) k.On("BboltDB").Return(db) k.On("Slogger").Return(multislogger.New().Logger).Maybe() @@ -614,7 +590,6 @@ func TestExtensionWriteBufferedLogsEnrollmentInvalid(t *testing.T) { k := mocks.NewKnapsack(t) k.On("ConfigStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.ConfigStore.String())) - k.On("InitialResultsStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.InitialResultsStore.String())) k.On("BboltDB").Return(db) k.On("OsquerydPath").Maybe().Return("") k.On("LatestOsquerydPath", testifymock.Anything).Maybe().Return("") @@ -622,7 +597,6 @@ func TestExtensionWriteBufferedLogsEnrollmentInvalid(t *testing.T) { e, err := NewExtension(context.TODO(), m, k, ExtensionOpts{EnrollSecret: "enroll_secret"}) require.Nil(t, err) - e.SetQuerier(mockClient{}) e.LogString(context.Background(), logger.LogTypeStatus, "status foo") e.LogString(context.Background(), logger.LogTypeStatus, "status bar") @@ -657,13 +631,11 @@ func TestExtensionWriteBufferedLogsLimit(t *testing.T) { defer cleanup() // Create the status logs bucket ahead of time - // agentbbolt.NewStore(log.NewNopLogger(), db, storage.InitialResultsStore.String()) agentbbolt.NewStore(log.NewNopLogger(), db, storage.StatusLogsStore.String()) agentbbolt.NewStore(log.NewNopLogger(), db, storage.ResultLogsStore.String()) k := mocks.NewKnapsack(t) k.On("ConfigStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.ConfigStore.String())) - k.On("InitialResultsStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.InitialResultsStore.String())) k.On("BboltDB").Return(db) k.On("Slogger").Return(multislogger.New().Logger) @@ -733,13 +705,11 @@ func TestExtensionWriteBufferedLogsDropsBigLog(t *testing.T) { defer cleanup() // Create the status logs bucket ahead of time - // agentbbolt.NewStore(log.NewNopLogger(), db, storage.InitialResultsStore.String()) agentbbolt.NewStore(log.NewNopLogger(), db, storage.StatusLogsStore.String()) agentbbolt.NewStore(log.NewNopLogger(), db, storage.ResultLogsStore.String()) k := mocks.NewKnapsack(t) k.On("ConfigStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.ConfigStore.String())) - k.On("InitialResultsStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.InitialResultsStore.String())) k.On("BboltDB").Return(db) k.On("Slogger").Return(multislogger.New().Logger) @@ -818,13 +788,11 @@ func TestExtensionWriteLogsLoop(t *testing.T) { defer cleanup() // Create the status logs bucket ahead of time - // agentbbolt.NewStore(log.NewNopLogger(), db, storage.InitialResultsStore.String()) agentbbolt.NewStore(log.NewNopLogger(), db, storage.StatusLogsStore.String()) agentbbolt.NewStore(log.NewNopLogger(), db, storage.ResultLogsStore.String()) k := mocks.NewKnapsack(t) k.On("ConfigStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.ConfigStore.String())) - k.On("InitialResultsStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.InitialResultsStore.String())) k.On("BboltDB").Return(db) k.On("Slogger").Return(multislogger.New().Logger) @@ -851,7 +819,7 @@ func TestExtensionWriteLogsLoop(t *testing.T) { } // Should write first 10 logs - e.Start() + go e.Execute() testutil.FatalAfterFunc(t, 1*time.Second, func() { // PublishLogsFunc runs twice for each run of the loop <-done @@ -898,7 +866,7 @@ func TestExtensionWriteLogsLoop(t *testing.T) { assert.Nil(t, gotResultLogs) testutil.FatalAfterFunc(t, 3*time.Second, func() { - e.Shutdown() + e.Shutdown(errors.New("test error")) }) // Confirm we can call Shutdown multiple times without blocking @@ -906,7 +874,7 @@ func TestExtensionWriteLogsLoop(t *testing.T) { expectedInterrupts := 3 for i := 0; i < expectedInterrupts; i += 1 { go func() { - e.Shutdown() + e.Shutdown(errors.New("test error")) interruptComplete <- struct{}{} }() } @@ -956,7 +924,6 @@ func TestExtensionPurgeBufferedLogs(t *testing.T) { k := mocks.NewKnapsack(t) k.On("ConfigStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.ConfigStore.String())) - k.On("InitialResultsStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.InitialResultsStore.String())) k.On("BboltDB").Return(db) k.On("Slogger").Return(multislogger.New().Logger) @@ -1023,7 +990,6 @@ func TestExtensionGetQueriesEnrollmentInvalid(t *testing.T) { k := mocks.NewKnapsack(t) k.On("ConfigStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.ConfigStore.String())) - k.On("InitialResultsStore").Return(storageci.NewStore(t, log.NewNopLogger(), storage.InitialResultsStore.String())) k.On("OsquerydPath").Maybe().Return("") k.On("LatestOsquerydPath", testifymock.Anything).Maybe().Return("") k.On("Slogger").Return(multislogger.New().Logger) @@ -1031,7 +997,6 @@ func TestExtensionGetQueriesEnrollmentInvalid(t *testing.T) { e, err := NewExtension(context.TODO(), m, k, ExtensionOpts{EnrollSecret: "enroll_secret"}) require.Nil(t, err) e.NodeKey = "bad_node_key" - e.SetQuerier(mockClient{}) queries, err := e.GetQueries(context.Background()) assert.True(t, m.RequestQueriesFuncInvoked) @@ -1103,7 +1068,6 @@ func TestExtensionWriteResultsEnrollmentInvalid(t *testing.T) { e, err := NewExtension(context.TODO(), m, k, ExtensionOpts{EnrollSecret: "enroll_secret"}) require.Nil(t, err) e.NodeKey = "bad_node_key" - e.SetQuerier(mockClient{}) err = e.WriteResults(context.Background(), []distributed.Result{}) assert.True(t, m.PublishResultsFuncInvoked) @@ -1146,12 +1110,10 @@ func TestLauncherRsaKeys(t *testing.T) { configStore, err := storageci.NewStore(t, log.NewNopLogger(), storage.ConfigStore.String()) require.NoError(t, err) - initialResultsStore, err := storageci.NewStore(t, log.NewNopLogger(), storage.InitialResultsStore.String()) require.NoError(t, err) k := mocks.NewKnapsack(t) k.On("ConfigStore").Return(configStore) - k.On("InitialResultsStore").Return(initialResultsStore) k.On("Slogger").Return(multislogger.New().Logger) _, err = NewExtension(context.TODO(), m, k, ExtensionOpts{EnrollSecret: "enroll_secret"}) diff --git a/pkg/osquery/initial-runner.go b/pkg/osquery/initial-runner.go deleted file mode 100644 index d12ab8a36..000000000 --- a/pkg/osquery/initial-runner.go +++ /dev/null @@ -1,146 +0,0 @@ -package osquery - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "log/slog" - "strings" - "time" - - "github.com/kolide/launcher/ee/agent/types" - "github.com/osquery/osquery-go/plugin/logger" -) - -type initialRunner struct { - slogger *slog.Logger - enabled bool - identifier string - client Querier - store types.GetterSetter -} - -func (i *initialRunner) Execute(configBlob string, writeFn func(ctx context.Context, l logger.LogType, results []string, reeenroll bool) error) error { - if !i.enabled { - i.slogger.Log(context.TODO(), slog.LevelDebug, - "initial runner not enabled", - ) - return nil - } - var config OsqueryConfig - if err := json.Unmarshal([]byte(configBlob), &config); err != nil { - return fmt.Errorf("unmarshal osquery config blob: %w", err) - } - - var allQueries []string - for packName, pack := range config.Packs { - // only run queries from kolide packs - if !strings.Contains(packName, "_kolide_") { - continue - } - - // Run all the queries, snapshot and differential - for query := range pack.Queries { - queryName := fmt.Sprintf("pack:%s:%s", packName, query) - allQueries = append(allQueries, queryName) - } - } - - toRun, err := i.queriesToRun(allQueries) - if err != nil { - return fmt.Errorf("checking if query should run: %w", err) - } - - var initialRunResults []OsqueryResultLog - for packName, pack := range config.Packs { - for query, queryContent := range pack.Queries { - queryName := fmt.Sprintf("pack:%s:%s", packName, query) - if _, ok := toRun[queryName]; !ok { - continue - } - resp, err := i.client.Query(queryContent.Query) - // returning here causes the rest of the queries not to run - // this is a bummer because often configs have queries with bad syntax/tables that do not exist. - // log the error and move on. - // using debug to not fill disks. the worst that will happen is that the result will come in later. - i.slogger.Log(context.TODO(), slog.LevelDebug, - "querying for initial results", - "query_name", queryName, - "err", err, - "results", len(resp), - ) - if err != nil || len(resp) == 0 { - continue - } - - initialRunResults = append(initialRunResults, OsqueryResultLog{ - Name: queryName, - HostIdentifier: i.identifier, - UnixTime: int(time.Now().UTC().Unix()), - DiffResults: &DiffResults{Added: resp}, - }) - } - } - - cctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - for _, result := range initialRunResults { - var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(result); err != nil { - return fmt.Errorf("encoding initial run result: %w", err) - } - if err := writeFn(cctx, logger.LogTypeString, []string{buf.String()}, true); err != nil { - i.slogger.Log(cctx, slog.LevelDebug, - "writing initial result log to server", - "query_name", result.Name, - "err", err, - ) - continue - } - } - - // note: caching would happen always on first use, even if the runner is not enabled. - // This avoids the problem of queries not being known even though they've been in the config for a long time. - if err := i.cacheRanQueries(toRun); err != nil { - return err - } - - return nil -} - -func (i *initialRunner) queriesToRun(allFromConfig []string) (map[string]struct{}, error) { - known := make(map[string]struct{}) - - for _, q := range allFromConfig { - knownQuery, err := i.store.Get([]byte(q)) - if err != nil { - return nil, fmt.Errorf("check store for queries to run: %w", err) - } - if knownQuery != nil { - continue - } - known[q] = struct{}{} - } - - return known, nil -} - -func (i *initialRunner) cacheRanQueries(known map[string]struct{}) error { - for q := range known { - if err := i.store.Set([]byte(q), []byte(q)); err != nil { - return fmt.Errorf("cache initial result query %q: %w", q, err) - } - } - - return nil -} - -func minInt(a, b int) int { - if a < b { - return a - } - - return b -} diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index 0d3a6f915..7c457abe2 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log/slog" "os" "os/exec" "runtime" @@ -11,8 +12,9 @@ import ( "sync" "time" - "github.com/go-kit/kit/log/level" "github.com/kolide/launcher/ee/agent/flags/keys" + "github.com/kolide/launcher/ee/agent/types" + "github.com/kolide/launcher/ee/tuf" "github.com/kolide/launcher/pkg/autoupdate" "github.com/kolide/launcher/pkg/backoff" @@ -22,7 +24,7 @@ import ( "github.com/kolide/launcher/pkg/traces" "github.com/osquery/osquery-go/plugin/config" "github.com/osquery/osquery-go/plugin/distributed" - "github.com/osquery/osquery-go/plugin/logger" + osquerylogger "github.com/osquery/osquery-go/plugin/logger" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -45,109 +47,94 @@ const ( type Runner struct { instance *OsqueryInstance instanceLock sync.Mutex + slogger *slog.Logger + knapsack types.Knapsack shutdown chan struct{} interrupted bool opts []OsqueryInstanceOption } -// LaunchInstance will launch an instance of osqueryd via a very configurable -// API as defined by the various OsqueryInstanceOption functional options. The -// returned instance should be shut down via the Shutdown() method. -// For example, a more customized caller might do something like the following: -// -// instance, err := LaunchInstance( -// WithOsquerydBinary("/usr/local/bin/osqueryd"), -// WithRootDirectory("/var/foobar"), -// WithConfigPluginFlag("custom"), -// WithOsqueryExtensionPlugins( -// config.NewPlugin("custom", custom.GenerateConfigs), -// logger.NewPlugin("custom", custom.LogString), -// tables.NewPlugin("foobar", custom.FoobarColumns, custom.FoobarGenerate), -// ), -// ) -func LaunchInstance(cancel context.CancelFunc, opts ...OsqueryInstanceOption) (*Runner, error) { +func New(k types.Knapsack, opts ...OsqueryInstanceOption) *Runner { runner := newRunner(opts...) - if err := runner.Start(cancel); err != nil { - return nil, err - } - return runner, nil -} + runner.slogger = k.Slogger().With("component", "osquery_runner") + runner.knapsack = k + + k.RegisterChangeObserver(runner, + keys.WatchdogEnabled, keys.WatchdogMemoryLimitMB, keys.WatchdogUtilizationLimitPercent, keys.WatchdogDelaySec, + ) -// LaunchUnstartedInstance sets up a osqueryd instance similar to LaunchInstance, but gives the caller control over -// when the instance will run. Useful for controlling startup and shutdown goroutines. -func LaunchUnstartedInstance(opts ...OsqueryInstanceOption) *Runner { - runner := newRunner(opts...) return runner } -func (r *Runner) Start(cancel context.CancelFunc) error { +func (r *Runner) Run() error { if err := r.launchOsqueryInstance(); err != nil { return fmt.Errorf("starting instance: %w", err) } - r.instance.knapsack.RegisterChangeObserver(r, - keys.WatchdogEnabled, keys.WatchdogMemoryLimitMB, keys.WatchdogUtilizationLimitPercent, keys.WatchdogDelaySec, - ) - go func() { - // This loop waits for the completion of the async routines, - // and either restarts the instance (if Shutdown was not - // called), or stops (if Shutdown was called). - for { - // Wait for async processes to exit - <-r.instance.doneCtx.Done() - level.Info(r.instance.logger).Log("msg", "osquery instance exited") + // This loop waits for the completion of the async routines, + // and either restarts the instance (if Shutdown was not + // called), or stops (if Shutdown was called). + for { + // Wait for async processes to exit + <-r.instance.doneCtx.Done() + r.slogger.Log(context.TODO(), slog.LevelInfo, + "osquery instance exited", + ) - select { - case <-r.shutdown: - // Intentional shutdown, this loop can exit - if err := r.instance.stats.Exited(nil); err != nil { - level.Info(r.instance.logger).Log("msg", "error recording osquery instance exit to history", "err", err) - } - cancel() // let the extension know to shut down - return - default: - // Don't block + select { + case <-r.shutdown: + // Intentional shutdown, this loop can exit + if err := r.instance.stats.Exited(nil); err != nil { + r.slogger.Log(context.TODO(), slog.LevelWarn, + "error recording osquery instance exit to history", + "err", err, + ) } + return nil + default: + // Don't block + } - // Error case - err := r.instance.errgroup.Wait() - level.Info(r.instance.logger).Log( - "msg", "unexpected restart of instance", + // Error case -- osquery instance shut down and needs to be restarted + err := r.instance.errgroup.Wait() + r.slogger.Log(context.TODO(), slog.LevelInfo, + "unexpected restart of instance", + "err", err, + ) + + if err := r.instance.stats.Exited(err); err != nil { + r.slogger.Log(context.TODO(), slog.LevelWarn, + "error recording osquery instance exit to history", "err", err, ) + } - if err := r.instance.stats.Exited(err); err != nil { - level.Info(r.instance.logger).Log("msg", "error recording osquery instance exit to history", "err", err) - } - - r.instanceLock.Lock() - opts := r.instance.opts - r.instance = newInstance() - r.instance.opts = opts - for _, opt := range r.opts { - opt(r.instance) - } - if err := r.launchOsqueryInstance(); err != nil { - level.Info(r.instance.logger).Log( - "msg", "fatal error restarting instance, shutting down", + r.instanceLock.Lock() + opts := r.instance.opts + r.instance = newInstance() + r.instance.opts = opts + for _, opt := range r.opts { + opt(r.instance) + } + if err := r.launchOsqueryInstance(); err != nil { + r.slogger.Log(context.TODO(), slog.LevelWarn, + "fatal error restarting instance, shutting down", + "err", err, + ) + r.instanceLock.Unlock() + if err := r.Shutdown(); err != nil { + r.slogger.Log(context.TODO(), slog.LevelWarn, + "could not perform shutdown", "err", err, ) - r.instanceLock.Unlock() - if err := r.Shutdown(); err != nil { - level.Error(r.instance.logger).Log( - "msg", "could not perform shutdown", - "err", err, - ) - } - cancel() // let the extension know to shut down - return } - r.instanceLock.Unlock() - + // Failed to restart instance -- exit rungroup so launcher can reload + return fmt.Errorf("restarting instance after unexpected exit: %w", err) } - }() - return nil + + r.instanceLock.Unlock() + } } func (r *Runner) Query(query string) ([]map[string]string, error) { @@ -156,6 +143,15 @@ func (r *Runner) Query(query string) ([]map[string]string, error) { return r.instance.Query(query) } +func (r *Runner) Interrupt(_ error) { + if err := r.Shutdown(); err != nil { + r.slogger.Log(context.TODO(), slog.LevelWarn, + "could not shut down runner on interrupt", + "err", err, + ) + } +} + // Shutdown instructs the runner to permanently stop the running instance (no // restart will be attempted). func (r *Runner) Shutdown() error { @@ -179,17 +175,25 @@ func (r *Runner) Shutdown() error { // that we care about, which are enable_watchdog, watchdog_delay_sec, watchdog_memory_limit_mb, // and watchdog_utilization_limit_percent. func (r *Runner) FlagsChanged(flagKeys ...keys.FlagKey) { - level.Debug(r.instance.logger).Log("msg", "control server flags changed, restarting instance to apply", "flags", fmt.Sprintf("%+v", flagKeys)) + r.slogger.Log(context.TODO(), slog.LevelDebug, + "control server flags changed, restarting instance to apply", + "flags", fmt.Sprintf("%+v", flagKeys), + ) if err := r.Restart(); err != nil { - level.Error(r.instance.logger).Log("msg", "could not restart osquery instance after flag change") + r.slogger.Log(context.TODO(), slog.LevelError, + "could not restart osquery instance after flag change", + "err", err, + ) } } // Restart allows you to cleanly shutdown the current instance and launch a new // instance with the same configurations. func (r *Runner) Restart() error { - level.Debug(r.instance.logger).Log("msg", "runner.Restart called") + r.slogger.Log(context.TODO(), slog.LevelDebug, + "runner.Restart called", + ) r.instanceLock.Lock() defer r.instanceLock.Unlock() // Cancelling will cause all of the cleanup routines to execute, and a @@ -256,8 +260,8 @@ func (r *Runner) launchOsqueryInstance() error { // The extensions files should be owned by the process's UID or by root. // Osquery will refuse to load the extension otherwise. if err := ensureProperPermissions(o, path); err != nil { - level.Info(o.logger).Log( - "msg", "unable to ensure proper permissions on extension path", + r.slogger.Log(ctx, slog.LevelInfo, + "unable to ensure proper permissions on extension path", "path", path, "err", err, ) @@ -291,10 +295,10 @@ func (r *Runner) launchOsqueryInstance() error { // If a logger plugin has not been set by the caller, we set a logger // plugin that outputs logs to the default application logger. if o.opts.loggerPluginFlag == "" { - logString := func(ctx context.Context, typ logger.LogType, logText string) error { + logString := func(ctx context.Context, typ osquerylogger.LogType, logText string) error { return nil } - o.opts.extensionPlugins = append(o.opts.extensionPlugins, logger.NewPlugin("internal_noop", logString)) + o.opts.extensionPlugins = append(o.opts.extensionPlugins, osquerylogger.NewPlugin("internal_noop", logString)) o.opts.loggerPluginFlag = "internal_noop" } @@ -326,8 +330,8 @@ func (r *Runner) launchOsqueryInstance() error { var currentOsquerydBinaryPath string currentOsquerydBinary, err := tuf.CheckOutLatest(ctx, "osqueryd", o.opts.rootDirectory, o.opts.updateDirectory, o.opts.updateChannel, o.logger) if err != nil { - level.Debug(o.logger).Log( - "msg", "could not get latest version of osqueryd from new autoupdate library, falling back", + r.slogger.Log(ctx, slog.LevelDebug, + "could not get latest version of osqueryd from new autoupdate library, falling back", "err", err, ) currentOsquerydBinaryPath = autoupdate.FindNewest( @@ -352,17 +356,17 @@ func (r *Runner) launchOsqueryInstance() error { // Assign a PGID that matches the PID. This lets us kill the entire process group later. o.cmd.SysProcAttr = setpgid() - level.Info(o.logger).Log( - "msg", "launching osqueryd", - "arg0", o.cmd.Path, + r.slogger.Log(ctx, slog.LevelInfo, + "launching osqueryd", + "path", o.cmd.Path, "args", strings.Join(o.cmd.Args, " "), ) // remove any socket already at the extension socket path to ensure // that it's not left over from a previous instance if err := os.RemoveAll(paths.extensionSocketPath); err != nil { - level.Info(o.logger).Log( - "msg", "error removing osquery extension socket", + r.slogger.Log(ctx, slog.LevelWarn, + "error removing osquery extension socket", "path", paths.extensionSocketPath, "err", err, ) @@ -376,17 +380,22 @@ func (r *Runner) launchOsqueryInstance() error { // update system and falling back to an earlier version. msgPairs := append( getOsqueryInfoForLog(o.cmd.Path), - "msg", "Fatal error starting osquery. Could not exec.", "err", err, ) - level.Info(o.logger).Log(msgPairs...) + r.slogger.Log(ctx, slog.LevelWarn, + "fatal error starting osquery -- could not exec.", + msgPairs..., + ) traces.SetError(span, fmt.Errorf("fatal error starting osqueryd process: %w", err)) return fmt.Errorf("fatal error starting osqueryd process: %w", err) } span.AddEvent("launched_osqueryd") - level.Info(o.logger).Log("msg", "launched osquery process", "osqueryd_pid", o.cmd.Process.Pid) + r.slogger.Log(ctx, slog.LevelInfo, + "launched osquery process", + "osqueryd_pid", o.cmd.Process.Pid, + ) // wait for osquery to create the socket before moving on, // this is intended to serve as a kind of health check @@ -395,7 +404,10 @@ func (r *Runner) launchOsqueryInstance() error { if err := backoff.WaitFor(func() error { _, err := os.Stat(paths.extensionSocketPath) if err != nil { - level.Debug(o.logger).Log("msg", "osquery extension socket not created yet ... will retry", "path", paths.extensionSocketPath) + r.slogger.Log(ctx, slog.LevelDebug, + "osquery extension socket not created yet ... will retry", + "path", paths.extensionSocketPath, + ) } return err }, 1*time.Minute, 1*time.Second); err != nil { @@ -407,7 +419,10 @@ func (r *Runner) launchOsqueryInstance() error { stats, err := history.NewInstance() if err != nil { - level.Info(o.logger).Log("msg", fmt.Sprint("osquery instance history error: ", err.Error())) + r.slogger.Log(ctx, slog.LevelWarn, + "could not create new osquery instance history", + "err", err, + ) } o.stats = stats @@ -415,39 +430,56 @@ func (r *Runner) launchOsqueryInstance() error { // successfully started. ("successful" is independent of exit // code. eg: this runs if we could exec. Failure to exec is above.) o.errgroup.Go(func() error { - defer level.Info(o.logger).Log("msg", "exiting errgroup", "errgroup", "monitor osquery process") + defer r.slogger.Log(ctx, slog.LevelInfo, + "exiting errgroup", + "errgroup", "monitor osquery process", + ) err := o.cmd.Wait() switch { case err == nil, isExitOk(err): - level.Info(o.logger).Log("msg", "osquery exited successfully") + r.slogger.Log(ctx, slog.LevelInfo, + "osquery exited successfully", + ) // TODO: should this return nil? return errors.New("osquery process exited successfully") default: msgPairs := append( getOsqueryInfoForLog(o.cmd.Path), - "msg", "Error running osquery command", "err", err, ) - level.Info(o.logger).Log(msgPairs...) + r.slogger.Log(ctx, slog.LevelWarn, + "error running osquery command", + msgPairs..., + ) return fmt.Errorf("running osqueryd command: %w", err) } }) // Kill osquery process on shutdown o.errgroup.Go(func() error { - defer level.Info(o.logger).Log("msg", "exiting errgroup", "errgroup", "kill osquery process on shutdown") + defer r.slogger.Log(ctx, slog.LevelInfo, + "exiting errgroup", + "errgroup", "kill osquery process on shutdown", + ) <-o.doneCtx.Done() - level.Debug(o.logger).Log("msg", "Starting osquery shutdown") + r.slogger.Log(ctx, slog.LevelDebug, + "starting osquery shutdown", + ) if o.cmd.Process != nil { // kill osqueryd and children if err := killProcessGroup(o.cmd); err != nil { if strings.Contains(err.Error(), "process already finished") || strings.Contains(err.Error(), "no such process") { - level.Debug(o.logger).Log("msg", "tried to stop osquery, but process already gone") + r.slogger.Log(ctx, slog.LevelDebug, + "tried to stop osquery, but process already gone", + ) } else { - level.Info(o.logger).Log("msg", "killing osquery process", "err", err) + r.slogger.Log(ctx, slog.LevelWarn, + "error killing osquery process", + "err", err, + ) } } } @@ -478,7 +510,10 @@ func (r *Runner) launchOsqueryInstance() error { if len(o.opts.extensionPlugins) > 0 { if err := o.StartOsqueryExtensionManagerServer("kolide_grpc", paths.extensionSocketPath, o.extensionManagerClient, o.opts.extensionPlugins); err != nil { - level.Info(o.logger).Log("msg", "Unable to create initial extension server. Stopping", "err", err) + r.slogger.Log(ctx, slog.LevelInfo, + "unable to create initial extension server, stopping", + "err", err, + ) traces.SetError(span, fmt.Errorf("could not create an extension server: %w", err)) return fmt.Errorf("could not create an extension server: %w", err) } @@ -486,7 +521,10 @@ func (r *Runner) launchOsqueryInstance() error { } if err := o.stats.Connected(o); err != nil { - level.Info(o.logger).Log("msg", "osquery instance history", "error", err) + r.slogger.Log(ctx, slog.LevelWarn, + "could not set connection time for osquery instance history", + "err", err, + ) } // Now spawn an extension manage to for the tables. We need to @@ -497,7 +535,10 @@ func (r *Runner) launchOsqueryInstance() error { // TODO: Consider chunking, if we find we can only have so // many tables per extension manager o.errgroup.Go(func() error { - defer level.Info(o.logger).Log("msg", "exiting errgroup", "errgroup", "kolide extension manager server launch") + defer r.slogger.Log(ctx, slog.LevelInfo, + "exiting errgroup", + "errgroup", "kolide extension manager server launch", + ) plugins := table.PlatformTables(o.logger, currentOsquerydBinaryPath) @@ -506,7 +547,10 @@ func (r *Runner) launchOsqueryInstance() error { } if err := o.StartOsqueryExtensionManagerServer("kolide", paths.extensionSocketPath, o.extensionManagerClient, plugins); err != nil { - level.Info(o.logger).Log("msg", "Unable to create tables extension server. Stopping", "err", err) + r.slogger.Log(ctx, slog.LevelWarn, + "unable to create tables extension server, stopping", + "err", err, + ) return fmt.Errorf("could not create a table extension server: %w", err) } return nil @@ -514,13 +558,20 @@ func (r *Runner) launchOsqueryInstance() error { // Health check on interval o.errgroup.Go(func() error { - defer level.Info(o.logger).Log("msg", "exiting errgroup", "errgroup", "health check on interval") + defer r.slogger.Log(ctx, slog.LevelInfo, + "exiting errgroup", + "errgroup", "health check on interval", + ) if o.knapsack != nil && o.knapsack.OsqueryHealthcheckStartupDelay() != 0*time.Second { - level.Debug(o.logger).Log("msg", "entering delay before starting osquery healthchecks") + r.slogger.Log(ctx, slog.LevelDebug, + "entering delay before starting osquery healthchecks", + ) select { case <-time.After(o.knapsack.OsqueryHealthcheckStartupDelay()): - level.Debug(o.logger).Log("msg", "exiting delay before starting osquery healthchecks") + r.slogger.Log(ctx, slog.LevelDebug, + "exiting delay before starting osquery healthchecks", + ) case <-o.doneCtx.Done(): return o.doneCtx.Err() } @@ -550,17 +601,28 @@ func (r *Runner) launchOsqueryInstance() error { if err == nil { // err was nil, clear failed attempts if i > 1 { - level.Debug(o.logger).Log("msg", "Health check passed. Clearing error", "attempt", i) + r.slogger.Log(ctx, slog.LevelDebug, + "healthcheck passed, clearing error", + "attempt", i, + ) } break } if i == maxHealthChecks { - level.Info(o.logger).Log("msg", "Health check failed. Giving up", "attempt", i, "err", err) + r.slogger.Log(ctx, slog.LevelInfo, + "healthcheck failed, giving up", + "attempt", i, + "err", err, + ) return fmt.Errorf("health check failed: %w", err) } - level.Debug(o.logger).Log("msg", "Health check failed. Will retry", "attempt", i, "err", err) + r.slogger.Log(ctx, slog.LevelDebug, + "healthcheck failed, will retry", + "attempt", i, + "err", err, + ) time.Sleep(1 * time.Second) } } @@ -569,7 +631,10 @@ func (r *Runner) launchOsqueryInstance() error { // Cleanup temp dir o.errgroup.Go(func() error { - defer level.Info(o.logger).Log("msg", "exiting errgroup", "errgroup", "cleanup temp dir") + defer r.slogger.Log(ctx, slog.LevelInfo, + "exiting errgroup", + "errgroup", "cleanup temp dir", + ) <-o.doneCtx.Done() if o.usingTempDir && o.rmRootDirectory != nil { diff --git a/pkg/osquery/runtime/runtime_test.go b/pkg/osquery/runtime/runtime_test.go index cac4182a3..6567a3223 100644 --- a/pkg/osquery/runtime/runtime_test.go +++ b/pkg/osquery/runtime/runtime_test.go @@ -5,7 +5,9 @@ package runtime import ( "context" + "errors" "fmt" + "log/slog" "os" "os/exec" "path/filepath" @@ -23,6 +25,7 @@ import ( "github.com/kolide/launcher/ee/agent/storage" storageci "github.com/kolide/launcher/ee/agent/storage/ci" typesMocks "github.com/kolide/launcher/ee/agent/types/mocks" + "github.com/kolide/launcher/pkg/log/multislogger" "github.com/kolide/launcher/pkg/osquery/runtime/history" "github.com/kolide/launcher/pkg/packaging" "github.com/kolide/launcher/pkg/threadsafebuffer" @@ -342,15 +345,16 @@ func TestBadBinaryPath(t *testing.T) { k := typesMocks.NewKnapsack(t) k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() k.On("WatchdogEnabled").Return(false) - _, cancel := context.WithCancel(context.TODO()) - runner, err := LaunchInstance( - cancel, + k.On("Slogger").Return(multislogger.New().Logger) + k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + + runner := New( + k, WithKnapsack(k), WithRootDirectory(rootDirectory), WithOsquerydBinary("/foobar"), ) - assert.Error(t, err) - assert.Nil(t, runner) + assert.Error(t, runner.Run()) k.AssertExpectations(t) } @@ -365,17 +369,20 @@ func TestWithOsqueryFlags(t *testing.T) { k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() k.On("WatchdogEnabled").Return(false) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + k.On("Slogger").Return(multislogger.New().Logger) - _, cancel := context.WithCancel(context.TODO()) - runner, err := LaunchInstance( - cancel, + runner := New( + k, WithKnapsack(k), WithRootDirectory(rootDirectory), WithOsquerydBinary(testOsqueryBinaryDirectory), WithOsqueryFlags([]string{"verbose=false"}), ) - require.NoError(t, err) + go runner.Run() + waitHealthy(t, runner) assert.Equal(t, []string{"verbose=false"}, runner.instance.opts.osqueryFlags) + + runner.Interrupt(errors.New("test error")) } func TestFlagsChanged(t *testing.T) { @@ -394,16 +401,21 @@ func TestFlagsChanged(t *testing.T) { k.On("WatchdogUtilizationLimitPercent").Return(20) k.On("WatchdogDelaySec").Return(120) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + k.On("Slogger").Return(multislogger.New().Logger) - _, cancel := context.WithCancel(context.TODO()) - runner, err := LaunchInstance( - cancel, + // Start the runner + runner := New( + k, WithKnapsack(k), WithRootDirectory(rootDirectory), WithOsquerydBinary(testOsqueryBinaryDirectory), WithOsqueryFlags([]string{"verbose=false"}), ) - require.NoError(t, err) + go runner.Run() + + // Wait for the instance to start + time.Sleep(2 * time.Second) + waitHealthy(t, runner) // Confirm watchdog is disabled watchdogDisabled := false @@ -457,6 +469,8 @@ func TestFlagsChanged(t *testing.T) { require.True(t, watchdogDelaySecFound, "watchdog delay sec not set") k.AssertExpectations(t) + + runner.Interrupt(errors.New("test error")) } // waitHealthy expects the instance to be healthy within 30 seconds, or else @@ -479,15 +493,15 @@ func TestSimplePath(t *testing.T) { k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() k.On("WatchdogEnabled").Return(false) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + k.On("Slogger").Return(multislogger.New().Logger) - _, cancel := context.WithCancel(context.TODO()) - runner, err := LaunchInstance( - cancel, + runner := New( + k, WithKnapsack(k), WithRootDirectory(rootDirectory), WithOsquerydBinary(testOsqueryBinaryDirectory), ) - require.NoError(t, err) + go runner.Run() waitHealthy(t, runner) @@ -507,15 +521,15 @@ func TestMultipleShutdowns(t *testing.T) { k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() k.On("WatchdogEnabled").Return(false) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + k.On("Slogger").Return(multislogger.New().Logger) - _, cancel := context.WithCancel(context.TODO()) - runner, err := LaunchInstance( - cancel, + runner := New( + k, WithKnapsack(k), WithRootDirectory(rootDirectory), WithOsquerydBinary(testOsqueryBinaryDirectory), ) - require.NoError(t, err) + go runner.Run() waitHealthy(t, runner) @@ -562,14 +576,15 @@ func TestOsqueryDies(t *testing.T) { k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() k.On("WatchdogEnabled").Return(false) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + k.On("Slogger").Return(multislogger.New().Logger) - _, cancel := context.WithCancel(context.TODO()) - runner, err := LaunchInstance( - cancel, + runner := New( + k, WithKnapsack(k), WithRootDirectory(rootDirectory), WithOsquerydBinary(testOsqueryBinaryDirectory), ) + go runner.Run() require.NoError(t, err) waitHealthy(t, runner) @@ -652,17 +667,17 @@ func TestExtensionSocketPath(t *testing.T) { k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() k.On("WatchdogEnabled").Return(false) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + k.On("Slogger").Return(multislogger.New().Logger) extensionSocketPath := filepath.Join(rootDirectory, "sock") - _, cancel := context.WithCancel(context.TODO()) - runner, err := LaunchInstance( - cancel, + runner := New( + k, WithKnapsack(k), WithRootDirectory(rootDirectory), WithExtensionSocketPath(extensionSocketPath), WithOsquerydBinary(testOsqueryBinaryDirectory), ) - require.NoError(t, err) + go runner.Run() waitHealthy(t, runner) @@ -677,6 +692,8 @@ func TestExtensionSocketPath(t *testing.T) { require.NoError(t, err) assert.Equal(t, int32(0), resp.Status.Code) assert.Equal(t, "OK", resp.Status.Message) + + require.NoError(t, runner.Shutdown()) } func TestOsquerySlowStart(t *testing.T) { @@ -691,10 +708,11 @@ func TestOsquerySlowStart(t *testing.T) { k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() k.On("WatchdogEnabled").Return(false) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + slogger := multislogger.New(slog.NewJSONHandler(&logBytes, &slog.HandlerOptions{Level: slog.LevelDebug})) + k.On("Slogger").Return(slogger.Logger) - _, cancel := context.WithCancel(context.TODO()) - runner, err := LaunchInstance( - cancel, + runner := New( + k, WithKnapsack(k), WithRootDirectory(rootDirectory), WithOsquerydBinary(testOsqueryBinaryDirectory), @@ -714,7 +732,7 @@ func TestOsquerySlowStart(t *testing.T) { return nil }), ) - require.NoError(t, err) + go runner.Run() waitHealthy(t, runner) // ensure that we actually had to wait on the socket @@ -743,15 +761,15 @@ func setupOsqueryInstanceForTests(t *testing.T) (runner *Runner, teardown func() k.On("WatchdogUtilizationLimitPercent").Return(20) k.On("WatchdogDelaySec").Return(120) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe() + k.On("Slogger").Return(multislogger.New().Logger) - _, cancel := context.WithCancel(context.TODO()) - runner, err = LaunchInstance( - cancel, + runner = New( + k, WithKnapsack(k), WithRootDirectory(rootDirectory), WithOsquerydBinary(testOsqueryBinaryDirectory), ) - require.NoError(t, err) + go runner.Run() waitHealthy(t, runner) osqueryPID := runner.instance.cmd.Process.Pid diff --git a/pkg/traces/exporter/exporter.go b/pkg/traces/exporter/exporter.go index 5b74e712c..1dd72a3e8 100644 --- a/pkg/traces/exporter/exporter.go +++ b/pkg/traces/exporter/exporter.go @@ -10,7 +10,6 @@ import ( "github.com/kolide/launcher/ee/agent/flags/keys" "github.com/kolide/launcher/ee/agent/storage" "github.com/kolide/launcher/ee/agent/types" - "github.com/kolide/launcher/pkg/osquery" "github.com/kolide/launcher/pkg/traces" "github.com/kolide/launcher/pkg/traces/bufspanprocessor" osquerygotraces "github.com/osquery/osquery-go/traces" @@ -115,7 +114,7 @@ func NewTraceExporter(ctx context.Context, k types.Knapsack, initialTraceBuffer return t, nil } -func (t *TraceExporter) SetOsqueryClient(client osquery.Querier) { +func (t *TraceExporter) SetOsqueryClient(client querier) { t.osqueryClient = client go t.addAttributesFromOsquery()