diff --git a/x-pack/osquerybeat/beater/osquerybeat.go b/x-pack/osquerybeat/beater/osquerybeat.go index f5c9ce47fbf..8e9e2a6bd15 100644 --- a/x-pack/osquerybeat/beater/osquerybeat.go +++ b/x-pack/osquerybeat/beater/osquerybeat.go @@ -15,6 +15,7 @@ import ( "github.com/gofrs/uuid" lru "github.com/hashicorp/golang-lru" + "golang.org/x/sync/semaphore" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -35,6 +36,8 @@ var ( const ( scheduledOsqueriesTypesCacheSize = 256 // Default number of queries types kept in memory to avoid fetching GetQueryColumns all the time adhocOsqueriesTypesCacheSize = 256 // The final cache size equals the number of periodic queries plus this value, in order to have additional cache for ad-hoc queries + + limitQueryAtTime = 1 // Always run only one osquery query at a time. Addresses the issue: https://github.com/elastic/beats/issues/25297 ) // osquerybeat configuration. @@ -49,6 +52,9 @@ type osquerybeat struct { // Beat lifecycle context, cancelled on Stop cancel context.CancelFunc mx sync.Mutex + + // limiter to run one query at a time + limitSem *semaphore.Weighted } // New creates an instance of osquerybeat. @@ -61,9 +67,10 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { } bt := &osquerybeat{ - b: b, - config: c, - log: log, + b: b, + config: c, + log: log, + limitSem: semaphore.NewWeighted(limitQueryAtTime), } return bt, nil @@ -309,7 +316,24 @@ func (bt *osquerybeat) query(ctx context.Context, q interface{}) error { return nil } -func (bt *osquerybeat) executeQuery(ctx context.Context, log *logp.Logger, index, id, query, responseID string, req map[string]interface{}) error { +func (bt *osquerybeat) executeQueryWithLimiter(ctx context.Context, log *logp.Logger, query string) ([]map[string]interface{}, error) { + // This limits the execution of query to one at a time. + // Concurrent use of osqueryd socket lead to failures/errors. + // Example: osquery failed: *osquery.ExtensionResponse error reading struct: error reading field 0: read unix ->/var/run/404419649/osquery.sock: i/o timeout" + // The scheduled and ad-hoc queries use the same code path at the moment. + // The plan for the next release is to switch the scheduled queries to use osqueryd scheduler instead. + err := bt.limitSem.Acquire(ctx, limitQueryAtTime) + if err != nil { + return nil, err + } + defer bt.limitSem.Release(limitQueryAtTime) + + // "If ctx is already done, Acquire may still succeed without blocking." + // https://github.com/golang/sync/blob/master/semaphore/semaphore.go#L68 + if ctx.Err() != nil { + return nil, err + } + log.Debugf("Execute query: %s", query) start := time.Now() @@ -318,10 +342,19 @@ func (bt *osquerybeat) executeQuery(ctx context.Context, log *logp.Logger, index if err != nil { log.Errorf("Failed to execute query, err: %v", err) - return err + return nil, err } log.Infof("Completed query in: %v", time.Since(start)) + return hits, nil +} + +func (bt *osquerybeat) executeQuery(ctx context.Context, log *logp.Logger, index, id, query, responseID string, req map[string]interface{}) error { + + hits, err := bt.executeQueryWithLimiter(ctx, log, query) + if err != nil { + return err + } for _, hit := range hits { reqData := req["data"]