Skip to content

Commit

Permalink
Add AlertBackfill functionality to promxy
Browse files Browse the repository at this point in the history
This optional config flag will enable promxy to backfill alert status by
re-querying the alert statement on startup -- in the event that the
downstream store doesn't have the corresponding ALERTS_FOR_STATE series.

Fixes #50
  • Loading branch information
jacksontj committed Aug 19, 2023
1 parent f032b91 commit 7d7deea
Show file tree
Hide file tree
Showing 12 changed files with 843 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cmd/promxy/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
test-robust:
go build -race -mod=vendor -tags netgo,builtinassets && ./promxy --log-level=debug --http.shutdown-delay=0s --config=demo_robust.conf --web.external-url=http://localhost:8082/promxy
go build -race -mod=vendor -tags netgo,builtinassets && ./promxy --rules.alertbackfill --log-level=debug --http.shutdown-delay=0s --config=demo_robust.conf --web.external-url=http://localhost:8082/promxy


test-local:
Expand Down
8 changes: 7 additions & 1 deletion cmd/promxy/alert_example.rule
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,10 @@ groups:
severity: page
annotations:
summary: High request latency

- alert: testAlert
expr: prometheus_build_info == 1
for: 10m
labels:
severity: page
annotations:
summary: example always-firing alert
6 changes: 6 additions & 0 deletions cmd/promxy/demo_robust.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ global:
external_labels:
source: promxy

# Rule files specifies a list of globs. Rules and alerts are read from
# all matching files.
rule_files:
- "*rule"


##
### Promxy configuration
##
Expand Down
18 changes: 16 additions & 2 deletions cmd/promxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"go.uber.org/atomic"
"k8s.io/klog"

"github.com/jacksontj/promxy/pkg/alertbackfill"
proxyconfig "github.com/jacksontj/promxy/pkg/config"
"github.com/jacksontj/promxy/pkg/logging"
"github.com/jacksontj/promxy/pkg/middleware"
Expand Down Expand Up @@ -104,6 +105,7 @@ type cliOpts struct {
ForOutageTolerance time.Duration `long:"rules.alert.for-outage-tolerance" description:"Max time to tolerate prometheus outage for restoring for state of alert." default:"1h"`
ForGracePeriod time.Duration `long:"rules.alert.for-grace-period" description:"Minimum duration between alert and restored for state. This is maintained only for alerts with configured for time greater than grace period." default:"10m"`
ResendDelay time.Duration `long:"rules.alert.resend-delay" description:"Minimum amount of time to wait before resending an alert to Alertmanager." default:"1m"`
AlertBackfill bool `long:"rules.alertbackfill" description:"Enable promxy to recalculate alert state on startup when the downstream datastore doesn't have an ALERTS_FOR_STATE"`

ShutdownDelay time.Duration `long:"http.shutdown-delay" description:"time to wait before shutting down the http server, this allows for a grace period for upstreams (e.g. LoadBalancers) to discover the new stopping status through healthchecks" default:"10s"`
ShutdownTimeout time.Duration `long:"http.shutdown-timeout" description:"max time to wait for a graceful shutdown of the HTTP server" default:"60s"`
Expand Down Expand Up @@ -307,19 +309,31 @@ func main() {
logrus.Infof("Notifier manager stopped")
}()

var ruleQueryable storage.Queryable
// If alertbackfill is enabled; wire it up!
if opts.AlertBackfill {
ruleQueryable = alertbackfill.NewAlertBackfillQueryable(engine, proxyStorage)
} else {
ruleQueryable = proxyStorage
}
ruleManager := rules.NewManager(&rules.ManagerOptions{
Context: ctx, // base context for all background tasks
ExternalURL: externalUrl, // URL listed as URL for "who fired this alert"
QueryFunc: rules.EngineQueryFunc(engine, proxyStorage),
NotifyFunc: sendAlerts(notifierManager, externalUrl.String()),
Appendable: proxyStorage,
Queryable: proxyStorage,
Queryable: ruleQueryable,
Logger: logger,
Registerer: prometheus.DefaultRegisterer,
OutageTolerance: opts.ForOutageTolerance,
ForGracePeriod: opts.ForGracePeriod,
ResendDelay: opts.ResendDelay,
})

if q, ok := ruleQueryable.(*alertbackfill.AlertBackfillQueryable); ok {
q.SetRuleGroupFetcher(ruleManager.RuleGroups)
}

go ruleManager.Run()

reloadables = append(reloadables, proxyconfig.WrapPromReloadable(&proxyconfig.ApplyConfigFunc{func(cfg *config.Config) error {
Expand Down Expand Up @@ -413,7 +427,7 @@ func main() {
r.NotFound = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Have our fallback rules
if strings.HasPrefix(r.URL.Path, path.Join(webOptions.RoutePrefix, "/debug")) {
http.DefaultServeMux.ServeHTTP(w, r)
http.StripPrefix(webOptions.RoutePrefix, http.DefaultServeMux).ServeHTTP(w, r)
} else if r.URL.Path == path.Join(webOptions.RoutePrefix, "/-/ready") {
if stopping {
w.WriteHeader(http.StatusServiceUnavailable)
Expand Down
7 changes: 0 additions & 7 deletions cmd/promxy/recording_example.rule

This file was deleted.

72 changes: 72 additions & 0 deletions pkg/alertbackfill/alertstate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package alertbackfill

import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)

// GenerateAlertStateMatrix will generate the a model.Matrix which is the equivalent
// of the `ALERTS_FOR_STATE` series that would have been generated for the given data
func GenerateAlertStateMatrix(v model.Matrix, matchers []*labels.Matcher, alertLabels labels.Labels, step time.Duration) model.Matrix {
matrix := make(model.Matrix, 0, v.Len())
MATRIXSAMPLE_LOOP:
for _, item := range v {
// clone the metric -- and convert the labels
metric := item.Metric.Clone()

// Add the labels which the alert would add
for _, label := range alertLabels {
metric[model.LabelName(label.Name)] = model.LabelValue(label.Value)
}

// Filter to results that match our matchers
for _, matcher := range matchers {
switch matcher.Name {
// Overwrite the __name__ and alertname
case model.MetricNameLabel, model.AlertNameLabel:
metric[model.LabelName(matcher.Name)] = model.LabelValue(matcher.Value)
default:
if !matcher.Matches(string(metric[model.LabelName(matcher.Name)])) {
continue MATRIXSAMPLE_LOOP
}
}
}

var (
activeAt model.SampleValue
lastPoint time.Time
)
// Now we have to convert the *actual* result into the series that is stored for the ALERTS
samples := make([]model.SamplePair, len(item.Values))
for x, sample := range item.Values {
sampleTime := sample.Timestamp.Time()

// If we are missing a point in the matrix; then we are going to assume
// that the series cleared, so we need to reset activeAt
if sampleTime.Sub(lastPoint) > step {
activeAt = 0
}
lastPoint = sampleTime

// if there is no `activeAt` set; lets set this timestamp (earliest timestamp in the steps that has a point)
if activeAt == 0 {
activeAt = model.SampleValue(sample.Timestamp.Unix())
}

samples[x] = model.SamplePair{
Timestamp: sample.Timestamp,
// The timestamp is a unix timestapm of ActiveAt, so we'll set this to the timestamp instead of the value
Value: activeAt,
}
}

matrix = append(matrix, &model.SampleStream{
Metric: metric,
Values: samples,
})
}

return matrix
}
187 changes: 187 additions & 0 deletions pkg/alertbackfill/alertstate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package alertbackfill

import (
"strconv"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)

func TestGenerateAlertStateMatrix(t *testing.T) {
start := model.Time(0).Add(time.Minute)
tests := []struct {
in model.Matrix
matchers []*labels.Matcher
alertLabels labels.Labels
step time.Duration

out model.Matrix
}{
// Simple test case
{
in: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
"__name__": "prometheus_build_info",
"job": "prometheus",
"replica": "a",
},
Values: []model.SamplePair{
{start.Add(time.Minute * 0), 1},
{start.Add(time.Minute * 1), 1},
{start.Add(time.Minute * 2), 1},
{start.Add(time.Minute * 3), 1},
{start.Add(time.Minute * 4), 1},
// Have a gap!
{start.Add(time.Minute * 6), 1},
},
},
&model.SampleStream{
Metric: model.Metric{
"__name__": "prometheus_build_info",
"job": "prometheus",
"replica": "b",
},
Values: []model.SamplePair{
{start.Add(time.Minute * 0), 1},
{start.Add(time.Minute * 1), 1},
{start.Add(time.Minute * 2), 1},
{start.Add(time.Minute * 3), 1},
{start.Add(time.Minute * 4), 1},
},
},
},
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "ALERTS_FOR_STATE"),
labels.MustNewMatcher(labels.MatchEqual, model.AlertNameLabel, "testalert"),
labels.MustNewMatcher(labels.MatchEqual, "replica", "a"),
},
alertLabels: labels.Labels{
labels.Label{"severity", "page"},
},
step: time.Minute,
out: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
"__name__": "ALERTS_FOR_STATE",
"alertname": "testalert",
"job": "prometheus",
"replica": "a",
"severity": "page",
},
Values: []model.SamplePair{
{start.Add(time.Minute * 0), model.SampleValue(start.Unix())},
{start.Add(time.Minute * 1), model.SampleValue(start.Unix())},
{start.Add(time.Minute * 2), model.SampleValue(start.Unix())},
{start.Add(time.Minute * 3), model.SampleValue(start.Unix())},
{start.Add(time.Minute * 4), model.SampleValue(start.Unix())},
{start.Add(time.Minute * 6), model.SampleValue(start.Add(time.Minute * 6).Unix())},
},
},
},
},

// Example from `prometheus_build_info == 1`
{
in: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
"__name__": "prometheus_build_info",
"branch": "HEAD",
"goversion": "go1.16.4",
"instance": "demo.do.prometheus.io:9090",
"job": "prometheus",
"replica": "a",
"revision": "24c9b61221f7006e87cd62b9fe2901d43e19ed53",
"version": "2.27.0",
},
Values: []model.SamplePair{
{model.TimeFromUnix(1692419110), 1},
{model.TimeFromUnix(1692419115), 1},
{model.TimeFromUnix(1692419120), 1},
{model.TimeFromUnix(1692419125), 1},
{model.TimeFromUnix(1692419130), 1},
},
},
&model.SampleStream{
Metric: model.Metric{
"__name__": "prometheus_build_info",
"branch": "HEAD",
"goversion": "go1.16.4",
"instance": "demo.do.prometheus.io:9090",
"job": "prometheus",
"replica": "b",
"revision": "24c9b61221f7006e87cd62b9fe2901d43e19ed53",
"version": "2.27.0",
},
Values: []model.SamplePair{
{model.TimeFromUnix(1692419110), 1},
{model.TimeFromUnix(1692419115), 1},
{model.TimeFromUnix(1692419120), 1},
{model.TimeFromUnix(1692419125), 1},
{model.TimeFromUnix(1692419130), 1},
},
},
},
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "ALERTS_FOR_STATE"),
labels.MustNewMatcher(labels.MatchEqual, model.AlertNameLabel, "testAlert"),
labels.MustNewMatcher(labels.MatchEqual, "branch", "HEAD"),
labels.MustNewMatcher(labels.MatchEqual, "goversion", "go1.16.4"),
labels.MustNewMatcher(labels.MatchEqual, "instance", "demo.do.prometheus.io:9090"),
labels.MustNewMatcher(labels.MatchEqual, "job", "prometheus"),
labels.MustNewMatcher(labels.MatchEqual, "replica", "a"),
labels.MustNewMatcher(labels.MatchEqual, "revision", "24c9b61221f7006e87cd62b9fe2901d43e19ed53"),
labels.MustNewMatcher(labels.MatchEqual, "severity", "pageMORE"),
labels.MustNewMatcher(labels.MatchEqual, "version", "2.27.0"),
},
alertLabels: labels.Labels{
labels.Label{"severity", "pageMORE"},
},
step: time.Second * 5,
out: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
"__name__": "ALERTS_FOR_STATE",
"alertname": "testAlert",
"branch": "HEAD",
"goversion": "go1.16.4",
"instance": "demo.do.prometheus.io:9090",
"job": "prometheus",
"replica": "a",
"revision": "24c9b61221f7006e87cd62b9fe2901d43e19ed53",
"version": "2.27.0",
"severity": "pageMORE",
},
Values: []model.SamplePair{
{model.TimeFromUnix(1692419110), 1692419110},
{model.TimeFromUnix(1692419115), 1692419110},
{model.TimeFromUnix(1692419120), 1692419110},
{model.TimeFromUnix(1692419125), 1692419110},
{model.TimeFromUnix(1692419130), 1692419110},
},
},
},
},
}

for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
out := GenerateAlertStateMatrix(test.in, test.matchers, test.alertLabels, test.step)

if test.out.String() != out.String() {
t.Fatalf("mismatch in series expected=%v actual=%v", test.out, out)
}

for i, sampleStream := range out {
for _, matcher := range test.matchers {
if !matcher.Matches(string(sampleStream.Metric[model.LabelName(matcher.Name)])) {
t.Fatalf("out series=%d label %s=%s doesn't match matcher %v", i, matcher.Name, sampleStream.Metric[model.LabelName(matcher.Name)], matcher.Value)
}
}
}
})
}
}
Loading

0 comments on commit 7d7deea

Please sign in to comment.