Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement distributed execution #139

Merged
merged 10 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions api/remote.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.

package api

import (
"time"

"github.com/prometheus/prometheus/promql"
)

type RemoteEndpoints interface {
Engines() []RemoteEngine
}

type RemoteEngine interface {
NewInstantQuery(opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error)
NewRangeQuery(opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error)
}

type staticEndpoints struct {
engines []RemoteEngine
}

func (m staticEndpoints) Engines() []RemoteEngine {
return m.engines
}

func NewStaticEndpoints(engines []RemoteEngine) RemoteEndpoints {
return &staticEndpoints{engines: engines}
}
54 changes: 52 additions & 2 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ package engine

import (
"context"

v1 "github.com/prometheus/prometheus/web/api/v1"

"io"
"math"
"runtime"
"sort"
"time"

"github.com/thanos-community/promql-engine/api"

"github.com/efficientgo/core/errors"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -20,7 +25,6 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/stats"
v1 "github.com/prometheus/prometheus/web/api/v1"

"github.com/thanos-community/promql-engine/execution"
"github.com/thanos-community/promql-engine/execution/model"
Expand Down Expand Up @@ -59,7 +63,53 @@ func (o Opts) getLogicalOptimizers() []logicalplan.Optimizer {
return o.LogicalOptimizers
}

func New(opts Opts) v1.QueryEngine {
type localEngine struct {
q storage.Queryable
engine *compatibilityEngine
}

func NewLocalEngine(opts Opts, q storage.Queryable) *localEngine {
return &localEngine{
q: q,
engine: New(opts),
}
}

func (l localEngine) NewInstantQuery(opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
return l.engine.NewInstantQuery(l.q, opts, qs, ts)
}

func (l localEngine) NewRangeQuery(opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
return l.engine.NewRangeQuery(l.q, opts, qs, start, end, interval)
}

type distributedEngine struct {
endpoints api.RemoteEndpoints
localEngine *compatibilityEngine
}

func NewDistributedEngine(opts Opts, endpoints api.RemoteEndpoints) v1.QueryEngine {
opts.LogicalOptimizers = append(
opts.LogicalOptimizers,
logicalplan.DistributedExecutionOptimizer{Endpoints: endpoints},
)
return &distributedEngine{
endpoints: endpoints,
localEngine: New(opts),
}
}

func (l distributedEngine) SetQueryLogger(log promql.QueryLogger) {}

func (l distributedEngine) NewInstantQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
return l.localEngine.NewInstantQuery(q, opts, qs, ts)
}

func (l distributedEngine) NewRangeQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
return l.localEngine.NewRangeQuery(q, opts, qs, start, end, interval)
}

func New(opts Opts) *compatibilityEngine {
if opts.Logger == nil {
opts.Logger = log.NewNopLogger()
}
Expand Down
106 changes: 106 additions & 0 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"testing"
"time"

"github.com/thanos-community/promql-engine/api"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/histogram"
Expand Down Expand Up @@ -1226,6 +1228,92 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
}
}

func TestDistributedAggregations(t *testing.T) {
localOpts := engine.Opts{
EngineOpts: promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 1e10,
EnableNegativeOffset: true,
EnableAtModifier: true,
},
}

start := time.Unix(0, 0)
end := time.Unix(120, 0)
step := time.Second * 30

ssetA := []storage.Series{
newMockSeries(
[]string{labels.MetricName, "bar", "region", "east", "pod", "nginx-1"},
[]int64{0, 30000, 60000, 90000, 120000},
[]float64{1, 2, 3, 4, 5},
),
newMockSeries(
[]string{labels.MetricName, "bar", "region", "east", "pod", "nginx-2"},
[]int64{0, 30000, 60000, 90000, 120000},
[]float64{2, 3, 4, 5, 6},
),
}
ssetB := []storage.Series{
newMockSeries(
[]string{labels.MetricName, "bar", "region", "west-1", "pod", "nginx-1"},
[]int64{0, 30000, 60000, 90000, 120000},
[]float64{3, 4, 5, 6, 7},
),
newMockSeries(
[]string{labels.MetricName, "bar", "region", "west-2", "pod", "nginx-1"},
[]int64{0, 30000, 60000, 90000, 120000},
[]float64{4, 5, 6, 7, 8},
),
newMockSeries(
[]string{labels.MetricName, "bar", "region", "west-1", "pod", "nginx-2"},
[]int64{0, 30000, 60000, 90000, 120000},
[]float64{5, 6, 7, 8, 9},
),
}

queries := []struct {
name string
query string
expectFallback bool
}{
{name: "sum", query: `sum by (pod) (bar)`},
{name: "avg", query: `avg by (pod) (bar)`},
{name: "count", query: `count by (pod) (bar)`},
{name: "group", query: `group by (pod) (bar)`},
{name: "topk", query: `topk by (pod) (1, bar)`},
{name: "bottomk", query: `bottomk by (pod) (1, bar)`},
{name: "double aggregation", query: `max by (pod) (sum by (pod) (bar))`},
{name: "aggregation with function operand", query: `sum by (pod) (rate(bar[1m]))`},
{name: "binary aggregation", query: `sum by (region) (bar) / sum by (pod) (bar)`},
{name: "unsupported aggregation", query: `count_values("pod", bar)`, expectFallback: true},
}

allSeries := storageWithSeries(append(ssetA, ssetB...)...)
for _, tcase := range queries {
t.Run(tcase.name, func(t *testing.T) {
distOpts := localOpts
distOpts.DisableFallback = !tcase.expectFallback
distEngine := engine.NewDistributedEngine(distOpts, api.NewStaticEndpoints([]api.RemoteEngine{
engine.NewLocalEngine(localOpts, storageWithSeries(ssetA...)),
engine.NewLocalEngine(localOpts, storageWithSeries(ssetB...)),
}))
distQry, err := distEngine.NewRangeQuery(allSeries, nil, tcase.query, start, end, step)
testutil.Ok(t, err)

distResult := distQry.Exec(context.Background())
promEngine := promql.NewEngine(localOpts.EngineOpts)
promQry, err := promEngine.NewRangeQuery(allSeries, nil, tcase.query, start, end, step)
testutil.Ok(t, err)
promResult := promQry.Exec(context.Background())

roundValues(promResult)
roundValues(distResult)
testutil.Equals(t, promResult, distResult)
})
}
}

func TestBinopEdgeCases(t *testing.T) {
opts := promql.EngineOpts{
Timeout: 1 * time.Hour,
Expand Down Expand Up @@ -2630,3 +2718,21 @@ type samplesByLabels []promql.Sample
func (b samplesByLabels) Len() int { return len(b) }
func (b samplesByLabels) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b samplesByLabels) Less(i, j int) bool { return labels.Compare(b[i].Metric, b[j].Metric) < 0 }

// roundValues rounds all values to 10 decimal points and
// can be used to eliminate floating point division errors
// when comparing two promql results.
func roundValues(r *promql.Result) {
switch result := r.Value.(type) {
case promql.Matrix:
for i := range result {
for j := range result[i].Points {
result[i].Points[j].V = math.Floor(result[i].Points[j].V*1e10) / 1e10
}
}
case promql.Vector:
for i := range result {
result[i].V = math.Floor(result[i].V*10e10) / 10e10
}
}
}
101 changes: 73 additions & 28 deletions execution/exchange/coalesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"sync"

"github.com/efficientgo/core/errors"
"github.com/prometheus/prometheus/model/labels"

"github.com/thanos-community/promql-engine/execution/model"
Expand All @@ -28,16 +29,18 @@ type coalesceOperator struct {
once sync.Once
series []labels.Labels

pool *model.VectorPool
mu sync.Mutex
wg sync.WaitGroup
operators []model.VectorOperator
pool *model.VectorPool
mu sync.Mutex
wg sync.WaitGroup
operators []model.VectorOperator
sampleOffsets []uint64
}

func NewCoalesce(pool *model.VectorPool, operators ...model.VectorOperator) model.VectorOperator {
return &coalesceOperator{
pool: pool,
operators: operators,
pool: pool,
operators: operators,
sampleOffsets: make([]uint64, len(operators)),
}
}

Expand Down Expand Up @@ -65,11 +68,17 @@ func (c *coalesceOperator) Next(ctx context.Context) ([]model.StepVector, error)
default:
}

var err error
c.once.Do(func() { err = c.loadSeries(ctx) })
if err != nil {
return nil, err
}

var out []model.StepVector = nil
var errChan = make(errorChan, len(c.operators))
for _, o := range c.operators {
for idx, o := range c.operators {
c.wg.Add(1)
go func(o model.VectorOperator) {
go func(opIdx int, o model.VectorOperator) {
defer c.wg.Done()

in, err := o.Next(ctx)
Expand All @@ -80,6 +89,13 @@ func (c *coalesceOperator) Next(ctx context.Context) ([]model.StepVector, error)
if in == nil {
return
}

for _, vector := range in {
for i := range vector.SampleIDs {
vector.SampleIDs[i] += c.sampleOffsets[opIdx]
}
}

c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -91,12 +107,16 @@ func (c *coalesceOperator) Next(ctx context.Context) ([]model.StepVector, error)
}

for i := 0; i < len(in); i++ {
if len(in[i].Samples) > 0 {
out[i].T = in[i].T
}

out[i].Samples = append(out[i].Samples, in[i].Samples...)
out[i].SampleIDs = append(out[i].SampleIDs, in[i].SampleIDs...)
o.GetPool().PutStepVector(in[i])
}
o.GetPool().PutVectors(in)
}(o)
}(idx, o)
}
c.wg.Wait()
close(errChan)
Expand All @@ -113,29 +133,54 @@ func (c *coalesceOperator) Next(ctx context.Context) ([]model.StepVector, error)
}

func (c *coalesceOperator) loadSeries(ctx context.Context) error {
size := 0
var wg sync.WaitGroup
var mu sync.Mutex
var numSeries uint64
allSeries := make([][]labels.Labels, len(c.operators))
errChan := make(errorChan, len(c.operators))
for i := 0; i < len(c.operators); i++ {
series, err := c.operators[i].Series(ctx)
if err != nil {
return err
}
size += len(series)
wg.Add(1)
go func(i int) {
defer wg.Done()
defer func() {
e := recover()
if e == nil {
return
}

switch err := e.(type) {
case error:
errChan <- errors.Wrapf(err, "unexpected error")
}

}()
series, err := c.operators[i].Series(ctx)
if err != nil {
errChan <- err
return
}

allSeries[i] = series
mu.Lock()
numSeries += uint64(len(series))
mu.Unlock()
}(i)
}
wg.Wait()
close(errChan)
if err := errChan.getError(); err != nil {
return err
}

idx := 0
result := make([]labels.Labels, size)
for _, o := range c.operators {
series, err := o.Series(ctx)
if err != nil {
return err
}
for i := 0; i < len(series); i++ {
result[idx] = series[i]
idx++
}
var offset uint64
c.sampleOffsets = make([]uint64, len(c.operators))
c.series = make([]labels.Labels, 0, numSeries)
for i, series := range allSeries {
c.sampleOffsets[i] = offset
c.series = append(c.series, series...)
offset += uint64(len(series))
}
c.series = result
c.pool.SetStepSize(len(c.series))

c.pool.SetStepSize(len(c.series))
return nil
}
Loading