Skip to content

Commit

Permalink
testutil/promrated: update promrated querying (#2844)
Browse files Browse the repository at this point in the history
Promrated is needed to fulfill a slightly different purpose these days, refactoring to reflect that and adding a needed endpoint.

category: refactor
ticket: #2843
  • Loading branch information
LukeHackett12 committed Feb 1, 2024
1 parent 5f0f964 commit cfe844b
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 342 deletions.
56 changes: 18 additions & 38 deletions testutil/promrated/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,13 @@ import (
"github.com/obolnetwork/charon/app/promauto"
)

var (
validatorLabels = []string{"pubkey_full", "cluster_name", "cluster_hash", "cluster_network"}
networkLabels = []string{"cluster_network"}

uptime = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_uptime",
Help: "Uptime of a validation key.",
}, validatorLabels)

correctness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_correctness",
Help: "Average correctness of a validation key.",
}, validatorLabels)

inclusionDelay = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_inclusion_delay",
Help: "Average inclusion delay of a validation key.",
}, validatorLabels)

attester = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_attester_effectiveness",
Help: "Attester effectiveness of a validation key.",
}, validatorLabels)

proposer = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_proposer_effectiveness",
Help: "Proposer effectiveness of a validation key.",
}, validatorLabels)
const (
clusterNetworkLabel = "cluster_network"
nodeOperatorLabel = "node_operator"
)

effectiveness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_effectiveness",
Help: "Effectiveness of a validation key.",
}, validatorLabels)
var (
networkLabels = []string{clusterNetworkLabel, nodeOperatorLabel}

networkUptime = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Expand All @@ -72,6 +40,18 @@ var (
Help: "Effectiveness of the network.",
}, networkLabels)

networkProposerEffectiveness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "network_proposer_effectiveness",
Help: "Proposer Effectiveness of the network.",
}, networkLabels)

networkAttesterEffectiveness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "network_attester_effectiveness",
Help: "Attester Effectiveness of the network.",
}, networkLabels)

ratedErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "promrated",
Name: "api_error_total",
Expand Down
80 changes: 0 additions & 80 deletions testutil/promrated/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,15 @@
package promrated

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/z"
)

const (
promQuery = "group by (cluster_name, cluster_hash, cluster_network, pubkey_full) (core_scheduler_validator_balance_gwei)"
)

type validator struct {
PubKey string `json:"pubkey_full"`
ClusterName string `json:"cluster_name"`
ClusterHash string `json:"cluster_hash"`
ClusterNetwork string `json:"cluster_network"`
}

// serveMonitoring creates a liveness endpoint and serves metrics to prometheus.
func serveMonitoring(addr string, registry *prometheus.Registry) error {
mux := http.NewServeMux()
Expand All @@ -50,69 +33,6 @@ func serveMonitoring(addr string, registry *prometheus.Registry) error {
return errors.Wrap(server.ListenAndServe(), "failed to serve prometheus metrics")
}

// getValidators queries prometheus and returns a list of validators with associated cluster and pubkey.
func getValidators(ctx context.Context, promEndpoint string, promAuth string) ([]validator, error) {
client := new(http.Client)

url, err := url.ParseRequestURI(promEndpoint)
if err != nil {
return nil, errors.Wrap(err, "parse prometheus endpoint")
}

query := url.Query()
query.Add("query", promQuery)
url.RawQuery = query.Encode()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil)
if err != nil {
return nil, errors.Wrap(err, "new prometheus request")
}

req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", promAuth))

res, err := client.Do(req)
if err != nil {
return nil, errors.Wrap(err, "requesting prom metrics")
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return nil, errors.Wrap(err, "reading body")
}

if res.StatusCode/100 != 2 {
return nil, errors.New("not ok http response", z.Str("body", string(body)))
}

return parseValidators(body)
}

// parseValidators reads prometheus response and returns a list of validators.
func parseValidators(body []byte) ([]validator, error) {
var result struct {
Data struct {
Result []struct {
Labels validator `json:"metric"`
} `json:"result"`
} `json:"data"`
}

if err := json.Unmarshal(body, &result); err != nil {
return nil, errors.Wrap(err, "deserializing json")
}

var validators []validator
for _, datum := range result.Data.Result {
if datum.Labels.ClusterName == "" || datum.Labels.ClusterNetwork == "" || datum.Labels.PubKey == "" {
continue
}
validators = append(validators, datum.Labels)
}

return validators, nil
}

func writeResponse(w http.ResponseWriter, status int, msg string) {
w.WriteHeader(status)
_, _ = w.Write([]byte(msg))
Expand Down
65 changes: 0 additions & 65 deletions testutil/promrated/prometheus_internal_test.go

This file was deleted.

81 changes: 28 additions & 53 deletions testutil/promrated/promrated.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ import (
type Config struct {
RatedEndpoint string
RatedAuth string
PromEndpoint string
PromAuth string
MonitoringAddr string
Networks []string
NodeOperators []string
}

// Run blocks running the promrated program until the context is canceled or a fatal error occurs.
func Run(ctx context.Context, config Config) error {
log.Info(ctx, "Promrated started",
z.Str("rated_endpoint", redactURL(config.RatedEndpoint)),
z.Str("prom_auth", config.PromAuth),
z.Str("monitoring_addr", config.MonitoringAddr),
)

Expand All @@ -42,7 +43,8 @@ func Run(ctx context.Context, config Config) error {
serverErr <- serveMonitoring(config.MonitoringAddr, promRegistry)
}()

ticker := time.NewTicker(12 * time.Hour)
// Metrics are produced daily so can preserve Rated CUs
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()

onStartup := make(chan struct{}, 1)
Expand All @@ -65,45 +67,10 @@ func Run(ctx context.Context, config Config) error {

// report the validator effectiveness metrics for prometheus.
func reportMetrics(ctx context.Context, config Config) {
validators, err := getValidators(ctx, config.PromEndpoint, config.PromAuth)
if err != nil {
log.Error(ctx, "Failed fetching validators from prometheus", err)
return
}

for _, validator := range validators {
log.Info(ctx, "Fetched validator from prometheus",
z.Str("pubkey", validator.PubKey),
z.Str("cluster_name", validator.ClusterName),
z.Str("cluster_network", validator.ClusterNetwork),
)

if contains(config.Networks, validator.ClusterNetwork) {
stats, err := getValidatorStatistics(ctx, config.RatedEndpoint, config.RatedAuth, validator)
if err != nil {
log.Error(ctx, "Getting validator statistics", err, z.Str("pubkey", validator.PubKey))
continue
}

clusterLabels := prometheus.Labels{
"pubkey_full": validator.PubKey,
"cluster_name": validator.ClusterName,
"cluster_hash": validator.ClusterHash,
"cluster_network": validator.ClusterNetwork,
}

uptime.With(clusterLabels).Set(stats.Uptime)
correctness.With(clusterLabels).Set(stats.AvgCorrectness)
inclusionDelay.With(clusterLabels).Set(stats.AvgInclusionDelay)
attester.With(clusterLabels).Set(stats.AttesterEffectiveness)
proposer.With(clusterLabels).Set(stats.ProposerEffectiveness)
effectiveness.With(clusterLabels).Set(stats.ValidatorEffectiveness)
}
}

for _, network := range config.Networks {
networkLabels := prometheus.Labels{
"cluster_network": network,
clusterNetworkLabel: network,
nodeOperatorLabel: "all",
}

stats, err := getNetworkStatistics(ctx, config.RatedEndpoint, config.RatedAuth, network)
Expand All @@ -112,24 +79,32 @@ func reportMetrics(ctx context.Context, config Config) {
continue
}

networkUptime.With(networkLabels).Set(stats.AvgUptime)
networkCorrectness.With(networkLabels).Set(stats.AvgCorrectness)
networkInclusionDelay.With(networkLabels).Set(stats.AvgInclusionDelay)
networkEffectiveness.With(networkLabels).Set(stats.ValidatorEffectiveness)
}
}
setMetrics(networkLabels, stats)

for _, nodeOperator := range config.NodeOperators {
nodeOperatorLabels := prometheus.Labels{
clusterNetworkLabel: network,
nodeOperatorLabel: nodeOperator,
}

// contains checks if array contains a string s.
func contains(arr []string, s string) bool {
result := false
for _, x := range arr {
if x == s {
result = true
break
stats, err = getNodeOperatorStatistics(ctx, config.RatedEndpoint, config.RatedAuth, nodeOperator, network)
if err != nil {
log.Error(ctx, "Getting node operator statistics", err, z.Str("network", network), z.Str("node_operator", nodeOperator))
continue
}

setMetrics(nodeOperatorLabels, stats)
}
}
}

return result
func setMetrics(labels prometheus.Labels, stats networkEffectivenessData) {
networkUptime.With(labels).Set(stats.AvgUptime)
networkCorrectness.With(labels).Set(stats.AvgCorrectness)
networkInclusionDelay.With(labels).Set(stats.AvgInclusionDelay)
networkEffectiveness.With(labels).Set(stats.AvgValidatorEffectiveness)
networkProposerEffectiveness.With(labels).Set(stats.AvgProposerEffectiveness)
networkAttesterEffectiveness.With(labels).Set(stats.AvgAttesterEffectiveness)
}

// redactURL returns a redacted version of the given URL.
Expand Down
4 changes: 2 additions & 2 deletions testutil/promrated/promrated/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func newRootCmd(runFunc func(context.Context, promrated.Config) error) *cobra.Co
func bindPromratedFlag(flags *pflag.FlagSet, config *promrated.Config) {
flags.StringVar(&config.RatedEndpoint, "rated-endpoint", "https://api.rated.network", "Rated API endpoint to poll for validator metrics.")
flags.StringVar(&config.RatedAuth, "rated-auth-token", "token", "[REQUIRED] Token for Rated API.")
flags.StringVar(&config.MonitoringAddr, "monitoring-address", "127.0.0.1:9100", "Listening address (ip and port) for the prometheus monitoring http server.")
flags.StringVar(&config.PromEndpoint, "prom-endpoint", "https://vm.monitoring.gcp.obol.tech/query", "Endpoint for VMetrics Prometheus API.")
flags.StringVar(&config.MonitoringAddr, "monitoring-address", "127.0.0.1:9200", "Listening address (ip and port) for the prometheus monitoring http server.")
flags.StringVar(&config.PromAuth, "prom-auth-token", "token", "[REQUIRED] Token for VMetrics Promtetheus API.")
flags.StringSliceVar(&config.Networks, "networks", nil, "Comma separated list of one or networks to monitor.")
flags.StringSliceVar(&config.NodeOperators, "node-operators", nil, "Comma separated list of one or node operators to monitor.")
}
Loading

0 comments on commit cfe844b

Please sign in to comment.