Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

testutil/promrated: update promrated querying #2844

Merged
merged 7 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 18 additions & 38 deletions testutil/promrated/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,13 @@ import (
"github.com/obolnetwork/charon/app/promauto"
)

var (
validatorLabels = []string{"pubkey_full", "cluster_name", "cluster_hash", "cluster_network"}
networkLabels = []string{"cluster_network"}

uptime = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_uptime",
Help: "Uptime of a validation key.",
}, validatorLabels)

correctness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_correctness",
Help: "Average correctness of a validation key.",
}, validatorLabels)

inclusionDelay = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_inclusion_delay",
Help: "Average inclusion delay of a validation key.",
}, validatorLabels)

attester = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_attester_effectiveness",
Help: "Attester effectiveness of a validation key.",
}, validatorLabels)

proposer = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_proposer_effectiveness",
Help: "Proposer effectiveness of a validation key.",
}, validatorLabels)
const (
clusterNetworkLabel = "cluster_network"
nodeOperatorLabel = "node_operator"
)

effectiveness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_effectiveness",
Help: "Effectiveness of a validation key.",
}, validatorLabels)
var (
networkLabels = []string{clusterNetworkLabel, nodeOperatorLabel}

networkUptime = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Expand All @@ -72,6 +40,18 @@ var (
Help: "Effectiveness of the network.",
}, networkLabels)

networkProposerEffectiveness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "network_proposer_effectiveness",
Help: "Proposer Effectiveness of the network.",
}, networkLabels)

networkAttesterEffectiveness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "network_attester_effectiveness",
Help: "Attester Effectiveness of the network.",
}, networkLabels)

ratedErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "promrated",
Name: "api_error_total",
Expand Down
80 changes: 0 additions & 80 deletions testutil/promrated/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,15 @@
package promrated

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/z"
)

const (
promQuery = "group by (cluster_name, cluster_hash, cluster_network, pubkey_full) (core_scheduler_validator_balance_gwei)"
)

type validator struct {
PubKey string `json:"pubkey_full"`
ClusterName string `json:"cluster_name"`
ClusterHash string `json:"cluster_hash"`
ClusterNetwork string `json:"cluster_network"`
}

// serveMonitoring creates a liveness endpoint and serves metrics to prometheus.
func serveMonitoring(addr string, registry *prometheus.Registry) error {
mux := http.NewServeMux()
Expand All @@ -50,69 +33,6 @@ func serveMonitoring(addr string, registry *prometheus.Registry) error {
return errors.Wrap(server.ListenAndServe(), "failed to serve prometheus metrics")
}

// getValidators queries prometheus and returns a list of validators with associated cluster and pubkey.
func getValidators(ctx context.Context, promEndpoint string, promAuth string) ([]validator, error) {
client := new(http.Client)

url, err := url.ParseRequestURI(promEndpoint)
if err != nil {
return nil, errors.Wrap(err, "parse prometheus endpoint")
}

query := url.Query()
query.Add("query", promQuery)
url.RawQuery = query.Encode()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil)
if err != nil {
return nil, errors.Wrap(err, "new prometheus request")
}

req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", promAuth))

res, err := client.Do(req)
if err != nil {
return nil, errors.Wrap(err, "requesting prom metrics")
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return nil, errors.Wrap(err, "reading body")
}

if res.StatusCode/100 != 2 {
return nil, errors.New("not ok http response", z.Str("body", string(body)))
}

return parseValidators(body)
}

// parseValidators reads prometheus response and returns a list of validators.
func parseValidators(body []byte) ([]validator, error) {
var result struct {
Data struct {
Result []struct {
Labels validator `json:"metric"`
} `json:"result"`
} `json:"data"`
}

if err := json.Unmarshal(body, &result); err != nil {
return nil, errors.Wrap(err, "deserializing json")
}

var validators []validator
for _, datum := range result.Data.Result {
if datum.Labels.ClusterName == "" || datum.Labels.ClusterNetwork == "" || datum.Labels.PubKey == "" {
continue
}
validators = append(validators, datum.Labels)
}

return validators, nil
}

func writeResponse(w http.ResponseWriter, status int, msg string) {
w.WriteHeader(status)
_, _ = w.Write([]byte(msg))
Expand Down
65 changes: 0 additions & 65 deletions testutil/promrated/prometheus_internal_test.go

This file was deleted.

80 changes: 27 additions & 53 deletions testutil/promrated/promrated.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
type Config struct {
RatedEndpoint string
RatedAuth string
PromEndpoint string
PromAuth string
MonitoringAddr string
Networks []string
NodeOperators []string
}

// Run blocks running the promrated program until the context is canceled or a fatal error occurs.
func Run(ctx context.Context, config Config) error {
log.Info(ctx, "Promrated started",
z.Str("rated_endpoint", redactURL(config.RatedEndpoint)),
z.Str("prom_auth", config.PromAuth),

Check warning on line 32 in testutil/promrated/promrated.go

View check run for this annotation

Codecov / codecov/patch

testutil/promrated/promrated.go#L32

Added line #L32 was not covered by tests
z.Str("monitoring_addr", config.MonitoringAddr),
)

Expand All @@ -42,7 +43,7 @@
serverErr <- serveMonitoring(config.MonitoringAddr, promRegistry)
}()

ticker := time.NewTicker(12 * time.Hour)
ticker := time.NewTicker(24 * time.Hour)

Check warning on line 46 in testutil/promrated/promrated.go

View check run for this annotation

Codecov / codecov/patch

testutil/promrated/promrated.go#L46

Added line #L46 was not covered by tests
LukeHackett12 marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()

onStartup := make(chan struct{}, 1)
Expand All @@ -65,45 +66,10 @@

// report the validator effectiveness metrics for prometheus.
func reportMetrics(ctx context.Context, config Config) {
validators, err := getValidators(ctx, config.PromEndpoint, config.PromAuth)
if err != nil {
log.Error(ctx, "Failed fetching validators from prometheus", err)
return
}

for _, validator := range validators {
log.Info(ctx, "Fetched validator from prometheus",
z.Str("pubkey", validator.PubKey),
z.Str("cluster_name", validator.ClusterName),
z.Str("cluster_network", validator.ClusterNetwork),
)

if contains(config.Networks, validator.ClusterNetwork) {
stats, err := getValidatorStatistics(ctx, config.RatedEndpoint, config.RatedAuth, validator)
if err != nil {
log.Error(ctx, "Getting validator statistics", err, z.Str("pubkey", validator.PubKey))
continue
}

clusterLabels := prometheus.Labels{
"pubkey_full": validator.PubKey,
"cluster_name": validator.ClusterName,
"cluster_hash": validator.ClusterHash,
"cluster_network": validator.ClusterNetwork,
}

uptime.With(clusterLabels).Set(stats.Uptime)
correctness.With(clusterLabels).Set(stats.AvgCorrectness)
inclusionDelay.With(clusterLabels).Set(stats.AvgInclusionDelay)
attester.With(clusterLabels).Set(stats.AttesterEffectiveness)
proposer.With(clusterLabels).Set(stats.ProposerEffectiveness)
effectiveness.With(clusterLabels).Set(stats.ValidatorEffectiveness)
}
}

for _, network := range config.Networks {
networkLabels := prometheus.Labels{
"cluster_network": network,
clusterNetworkLabel: network,
nodeOperatorLabel: "all",

Check warning on line 72 in testutil/promrated/promrated.go

View check run for this annotation

Codecov / codecov/patch

testutil/promrated/promrated.go#L71-L72

Added lines #L71 - L72 were not covered by tests
}

stats, err := getNetworkStatistics(ctx, config.RatedEndpoint, config.RatedAuth, network)
Expand All @@ -112,24 +78,32 @@
continue
}

networkUptime.With(networkLabels).Set(stats.AvgUptime)
networkCorrectness.With(networkLabels).Set(stats.AvgCorrectness)
networkInclusionDelay.With(networkLabels).Set(stats.AvgInclusionDelay)
networkEffectiveness.With(networkLabels).Set(stats.ValidatorEffectiveness)
}
}
setMetrics(networkLabels, stats)

for _, nodeOperator := range config.NodeOperators {
nodeOperatorLabels := prometheus.Labels{
clusterNetworkLabel: network,
nodeOperatorLabel: nodeOperator,
}

Check warning on line 87 in testutil/promrated/promrated.go

View check run for this annotation

Codecov / codecov/patch

testutil/promrated/promrated.go#L81-L87

Added lines #L81 - L87 were not covered by tests

// contains checks if array contains a string s.
func contains(arr []string, s string) bool {
result := false
for _, x := range arr {
if x == s {
result = true
break
stats, err = getNodeOperatorStatistics(ctx, config.RatedEndpoint, config.RatedAuth, nodeOperator, network)
if err != nil {
log.Error(ctx, "Getting node operator statistics", err, z.Str("network", network), z.Str("node_operator", nodeOperator))
continue

Check warning on line 92 in testutil/promrated/promrated.go

View check run for this annotation

Codecov / codecov/patch

testutil/promrated/promrated.go#L89-L92

Added lines #L89 - L92 were not covered by tests
}

setMetrics(nodeOperatorLabels, stats)

Check warning on line 95 in testutil/promrated/promrated.go

View check run for this annotation

Codecov / codecov/patch

testutil/promrated/promrated.go#L95

Added line #L95 was not covered by tests
}
}
}

return result
func setMetrics(labels prometheus.Labels, stats networkEffectivenessData) {
networkUptime.With(labels).Set(stats.AvgUptime)
networkCorrectness.With(labels).Set(stats.AvgCorrectness)
networkInclusionDelay.With(labels).Set(stats.AvgInclusionDelay)
networkEffectiveness.With(labels).Set(stats.ValidatorEffectiveness)
networkProposerEffectiveness.With(labels).Set(stats.ProposerEffectiveness)
networkAttesterEffectiveness.With(labels).Set(stats.AttesterEffectiveness)

Check warning on line 106 in testutil/promrated/promrated.go

View check run for this annotation

Codecov / codecov/patch

testutil/promrated/promrated.go#L100-L106

Added lines #L100 - L106 were not covered by tests
}

// redactURL returns a redacted version of the given URL.
Expand Down
4 changes: 2 additions & 2 deletions testutil/promrated/promrated/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func newRootCmd(runFunc func(context.Context, promrated.Config) error) *cobra.Co
func bindPromratedFlag(flags *pflag.FlagSet, config *promrated.Config) {
flags.StringVar(&config.RatedEndpoint, "rated-endpoint", "https://api.rated.network", "Rated API endpoint to poll for validator metrics.")
flags.StringVar(&config.RatedAuth, "rated-auth-token", "token", "[REQUIRED] Token for Rated API.")
flags.StringVar(&config.MonitoringAddr, "monitoring-address", "127.0.0.1:9100", "Listening address (ip and port) for the prometheus monitoring http server.")
flags.StringVar(&config.PromEndpoint, "prom-endpoint", "https://vm.monitoring.gcp.obol.tech/query", "Endpoint for VMetrics Prometheus API.")
flags.StringVar(&config.MonitoringAddr, "monitoring-address", "127.0.0.1:9200", "Listening address (ip and port) for the prometheus monitoring http server.")
flags.StringVar(&config.PromAuth, "prom-auth-token", "token", "[REQUIRED] Token for VMetrics Promtetheus API.")
flags.StringSliceVar(&config.Networks, "networks", nil, "Comma separated list of one or networks to monitor.")
flags.StringSliceVar(&config.NodeOperators, "node-operators", nil, "Comma separated list of one or node operators to monitor.")
}
Loading
Loading