Skip to content

Commit

Permalink
Serialize queries execution over osqueryd socket (elastic#25298)
Browse files Browse the repository at this point in the history
* Serialize queries execution over osqueryd socket

* Updated to use sem
  • Loading branch information
aleksmaus committed Apr 26, 2021
1 parent 3d16785 commit 0ca7b94
Showing 1 changed file with 38 additions and 5 deletions.
43 changes: 38 additions & 5 deletions x-pack/osquerybeat/beater/osquerybeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/gofrs/uuid"
lru "github.com/hashicorp/golang-lru"
"golang.org/x/sync/semaphore"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -35,6 +36,8 @@ var (
const (
scheduledOsqueriesTypesCacheSize = 256 // Default number of queries types kept in memory to avoid fetching GetQueryColumns all the time
adhocOsqueriesTypesCacheSize = 256 // The final cache size equals the number of periodic queries plus this value, in order to have additional cache for ad-hoc queries

limitQueryAtTime = 1 // Always run only one osquery query at a time. Addresses the issue: https://github.com/elastic/beats/issues/25297
)

// osquerybeat configuration.
Expand All @@ -49,6 +52,9 @@ type osquerybeat struct {
// Beat lifecycle context, cancelled on Stop
cancel context.CancelFunc
mx sync.Mutex

// limiter to run one query at a time
limitSem *semaphore.Weighted
}

// New creates an instance of osquerybeat.
Expand All @@ -61,9 +67,10 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
}

bt := &osquerybeat{
b: b,
config: c,
log: log,
b: b,
config: c,
log: log,
limitSem: semaphore.NewWeighted(limitQueryAtTime),
}

return bt, nil
Expand Down Expand Up @@ -309,7 +316,24 @@ func (bt *osquerybeat) query(ctx context.Context, q interface{}) error {
return nil
}

func (bt *osquerybeat) executeQuery(ctx context.Context, log *logp.Logger, index, id, query, responseID string, req map[string]interface{}) error {
func (bt *osquerybeat) executeQueryWithLimiter(ctx context.Context, log *logp.Logger, query string) ([]map[string]interface{}, error) {
// This limits the execution of query to one at a time.
// Concurrent use of osqueryd socket lead to failures/errors.
// Example: osquery failed: *osquery.ExtensionResponse error reading struct: error reading field 0: read unix ->/var/run/404419649/osquery.sock: i/o timeout"
// The scheduled and ad-hoc queries use the same code path at the moment.
// The plan for the next release is to switch the scheduled queries to use osqueryd scheduler instead.
err := bt.limitSem.Acquire(ctx, limitQueryAtTime)
if err != nil {
return nil, err
}
defer bt.limitSem.Release(limitQueryAtTime)

// "If ctx is already done, Acquire may still succeed without blocking."
// https://github.com/golang/sync/blob/master/semaphore/semaphore.go#L68
if ctx.Err() != nil {
return nil, err
}

log.Debugf("Execute query: %s", query)

start := time.Now()
Expand All @@ -318,10 +342,19 @@ func (bt *osquerybeat) executeQuery(ctx context.Context, log *logp.Logger, index

if err != nil {
log.Errorf("Failed to execute query, err: %v", err)
return err
return nil, err
}

log.Infof("Completed query in: %v", time.Since(start))
return hits, nil
}

func (bt *osquerybeat) executeQuery(ctx context.Context, log *logp.Logger, index, id, query, responseID string, req map[string]interface{}) error {

hits, err := bt.executeQueryWithLimiter(ctx, log, query)
if err != nil {
return err
}

for _, hit := range hits {
reqData := req["data"]
Expand Down

0 comments on commit 0ca7b94

Please sign in to comment.