Skip to content

Commit

Permalink
Merge pull request #1117 from ripienaar/check_exporter
Browse files Browse the repository at this point in the history
Initial, hidden, prometheus exporter for checks
  • Loading branch information
ripienaar committed Aug 14, 2024
2 parents 5a9c849 + 8fad52e commit 1a3c058
Show file tree
Hide file tree
Showing 5 changed files with 389 additions and 128 deletions.
197 changes: 82 additions & 115 deletions cli/server_check_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,20 @@ type SrvCheckCmd struct {
jsReplicaSeenCritical time.Duration
jsReplicaLagCritical uint64

srvName string
srvCPUWarn int
srvCPUCrit int
srvMemWarn int
srvMemCrit int
srvConnWarn int
srvConnCrit int
srvSubsWarn int
srvSubCrit int
srvUptimeWarn time.Duration
srvUptimeCrit time.Duration
srvAuthRequire bool
srvTLSRequired bool
srvJSRequired bool
srvName string
srvCPUWarn int
srvCPUCrit int
srvMemWarn int
srvMemCrit int
srvConnWarn int
srvConnCrit int
srvSubsWarn int
srvSubCrit int
srvUptimeWarn time.Duration
srvUptimeCrit time.Duration
srvAuthRequired bool
srvTLSRequired bool
srvJSRequired bool

msgSubject string
msgAgeWarn time.Duration
Expand All @@ -112,7 +112,10 @@ type SrvCheckCmd struct {
credentialRequiresExpire bool
credential string

useMetadata bool
exporterConfigFile string
exporterPort int
exporterCertificate string
exporterKey string
}

func configureServerCheckCommand(srv *fisk.CmdClause) {
Expand Down Expand Up @@ -150,7 +153,6 @@ When set these settings will be used, but can be overridden using --lag-critical
stream.Flag("msgs-critical", "Critical if there are fewer than this many messages in the stream").PlaceHolder("MSGS").IsSetByUser(&c.streamMessagesCritIsSet).Uint64Var(&c.streamMessagesCrit)
stream.Flag("subjects-warn", "Critical threshold for subjects in the stream").PlaceHolder("SUBJECTS").Default("-1").IsSetByUser(&c.subjectsWarnIsSet).IntVar(&c.subjectsWarn)
stream.Flag("subjects-critical", "Warning threshold for subjects in the stream").PlaceHolder("SUBJECTS").Default("-1").IsSetByUser(&c.subjectsCritIsSet).IntVar(&c.subjectsCrit)
stream.Flag("metadata", "Sets monitoring thresholds from Stream metadata").Default("true").BoolVar(&c.useMetadata)

consumer := check.Command("consumer", "Checks the health of a consumer").Action(c.checkConsumer)
consumer.HelpLong(`These settings can be set using Consumer Metadata in the following form:
Expand All @@ -166,7 +168,6 @@ When set these settings will be used, but can be overridden using --waiting-crit
consumer.Flag("last-delivery-critical", "Time to allow since the last delivery").Default("0s").IsSetByUser(&c.consumerLastDeliveryCriticalIsSet).DurationVar(&c.consumerLastDeliveryCritical)
consumer.Flag("last-ack-critical", "Time to allow since the last ack").Default("0s").IsSetByUser(&c.consumerLastAckCriticalIsSet).DurationVar(&c.consumerLastAckCritical)
consumer.Flag("redelivery-critical", "Maximum number of redeliveries to allow").Default("-1").IsSetByUser(&c.consumerRedeliveryCriticalIsSet).IntVar(&c.consumerRedeliveryCritical)
consumer.Flag("metadata", "Sets monitoring thresholds from Consumer metadata").Default("true").BoolVar(&c.useMetadata)

msg := check.Command("message", "Checks properties of a message stored in a stream").Action(c.checkMsg)
msg.Flag("stream", "The streams to check").Required().StringVar(&c.sourcesStream)
Expand Down Expand Up @@ -206,7 +207,7 @@ When set these settings will be used, but can be overridden using --waiting-crit
serv.Flag("subs-critical", "Critical threshold for number of active subscriptions, supports inversion").IntVar(&c.srvSubCrit)
serv.Flag("uptime-warn", "Warning threshold for server uptime as duration").DurationVar(&c.srvUptimeWarn)
serv.Flag("uptime-critical", "Critical threshold for server uptime as duration").DurationVar(&c.srvUptimeCrit)
serv.Flag("auth-required", "Checks that authentication is enabled").UnNegatableBoolVar(&c.srvAuthRequire)
serv.Flag("auth-required", "Checks that authentication is enabled").UnNegatableBoolVar(&c.srvAuthRequired)
serv.Flag("tls-required", "Checks that TLS is required").UnNegatableBoolVar(&c.srvTLSRequired)
serv.Flag("js-required", "Checks that JetStream is enabled").UnNegatableBoolVar(&c.srvJSRequired)

Expand All @@ -221,6 +222,12 @@ When set these settings will be used, but can be overridden using --waiting-crit
cred.Flag("validity-warn", "Warning threshold for time before expiry").DurationVar(&c.credentialValidityWarn)
cred.Flag("validity-critical", "Critical threshold for time before expiry").DurationVar(&c.credentialValidityCrit)
cred.Flag("require-expiry", "Requires the credential to have expiry set").Default("true").BoolVar(&c.credentialRequiresExpire)

exporter := check.Command("exporter", "Prometheus exporter for server checks").Hidden().Action(c.exporterAction)
exporter.Flag("config", "Exporter configuration").Required().ExistingFileVar(&c.exporterConfigFile)
exporter.Flag("port", "Port to listen on").Default("8080").IntVar(&c.exporterPort)
exporter.Flag("https-key", "Key for HTTPS").ExistingFileVar(&c.exporterKey)
exporter.Flag("https-certificate", "Certificate for HTTPS").ExistingFileVar(&c.exporterCertificate)
}

var (
Expand All @@ -246,47 +253,31 @@ func (c *SrvCheckCmd) checkConsumer(_ *fisk.ParseContext) error {
check := &monitor.Result{Name: fmt.Sprintf("%s_%s", c.sourcesStream, c.consumerName), Check: "consumer", OutFile: checkRenderOutFile, NameSpace: opts().PrometheusNamespace, RenderFormat: checkRenderFormat}
defer check.GenericExit()

_, mgr, err := prepareHelper("", natsOpts()...)
check.CriticalExitIfErr(err, "connection failed: %s", err)

cons, err := mgr.LoadConsumer(c.sourcesStream, c.consumerName)
if err != nil {
check.Critical("consumer load failure: %v", err)
return nil
}

checkOpts := &monitor.ConsumerHealthCheckOptions{}
if c.useMetadata {
checkOpts, err = monitor.ExtractConsumerHealthCheckOptions(cons.Metadata())
if err != nil {
check.Critical("invalid metadata: %v", err)
return nil
}
}
if !c.useMetadata || c.consumerAckOutstandingCriticalIsSet {
if c.consumerAckOutstandingCriticalIsSet {
checkOpts.AckOutstandingCritical = c.consumerAckOutstandingCritical
}
if !c.useMetadata || c.consumerWaitingCriticalIsSet {
if c.consumerWaitingCriticalIsSet {
checkOpts.WaitingCritical = c.consumerWaitingCritical
}
if !c.useMetadata || c.consumerUnprocessedCriticalIsSet {
if c.consumerUnprocessedCriticalIsSet {
checkOpts.UnprocessedCritical = c.consumerUnprocessedCritical
}
if !c.useMetadata || c.consumerLastDeliveryCriticalIsSet {
checkOpts.LastDeliveryCritical = c.consumerLastDeliveryCritical
if c.consumerLastDeliveryCriticalIsSet {
checkOpts.LastDeliveryCritical = c.consumerLastDeliveryCritical.Seconds()
}
if !c.useMetadata || c.consumerLastAckCriticalIsSet {
checkOpts.LastAckCritical = c.consumerLastAckCritical
if c.consumerLastAckCriticalIsSet {
checkOpts.LastAckCritical = c.consumerLastAckCritical.Seconds()
}
if !c.useMetadata || c.consumerRedeliveryCriticalIsSet {
if c.consumerRedeliveryCriticalIsSet {
checkOpts.RedeliveryCritical = c.consumerRedeliveryCritical
}

logger := api.NewDiscardLogger()
if opts().Trace {
logger = api.NewDefaultLogger(api.TraceLevel)
}
err = monitor.ConsumerHealthCheck(cons, check, *checkOpts, logger)
err := monitor.ConsumerHealthCheck(opts().Config.ServerURL(), natsOpts(), check, *checkOpts, logger)
if err != nil {
return fmt.Errorf("health check failed: %v", err)
}
Expand All @@ -298,12 +289,7 @@ func (c *SrvCheckCmd) checkKV(_ *fisk.ParseContext) error {
check := &monitor.Result{Name: c.kvBucket, Check: "kv", OutFile: checkRenderOutFile, NameSpace: opts().PrometheusNamespace, RenderFormat: checkRenderFormat}
defer check.GenericExit()

nc, _, err := prepareHelper("", natsOpts()...)
if check.CriticalIfErr(err, "connection failed: %s", err) {
return nil
}

return monitor.CheckKVBucketAndKey(nc, check, monitor.KVCheckOptions{
return monitor.CheckKVBucketAndKey(opts().Config.ServerURL(), natsOpts(), check, monitor.KVCheckOptions{
Bucket: c.kvBucket,
Key: c.kvKey,
ValuesCritical: c.kvValuesCrit,
Expand All @@ -315,24 +301,29 @@ func (c *SrvCheckCmd) checkSrv(_ *fisk.ParseContext) error {
check := &monitor.Result{Name: c.srvName, Check: "server", OutFile: checkRenderOutFile, NameSpace: opts().PrometheusNamespace, RenderFormat: checkRenderFormat}
defer check.GenericExit()

nc, _, err := prepareHelper("", natsOpts()...)
if check.CriticalIfErr(err, "connection failed: %s", err) {
return nil
}

return monitor.CheckServer(nc, check, opts().Timeout, monitor.ServerCheckOptions{})
return monitor.CheckServer(opts().Config.ServerURL(), natsOpts(), check, opts().Timeout, monitor.ServerCheckOptions{
Name: c.srvName,
CPUWarning: c.srvCPUWarn,
CPUCritical: c.srvCPUCrit,
MemoryWarning: c.srvMemWarn,
MemoryCritical: c.srvMemCrit,
ConnectionsWarning: c.srvConnWarn,
ConnectionsCritical: c.srvConnCrit,
SubscriptionsWarning: c.srvSubsWarn,
SubscriptionsCritical: c.srvSubCrit,
UptimeWarning: c.srvUptimeWarn.Seconds(),
UptimeCritical: c.srvUptimeCrit.Seconds(),
AuthenticationRequired: c.srvAuthRequired,
TLSRequired: c.srvTLSRequired,
JetStreamRequired: c.srvJSRequired,
})
}

func (c *SrvCheckCmd) checkJS(_ *fisk.ParseContext) error {
check := &monitor.Result{Name: "JetStream", Check: "jetstream", OutFile: checkRenderOutFile, NameSpace: opts().PrometheusNamespace, RenderFormat: checkRenderFormat}
defer check.GenericExit()

nc, _, err := prepareHelper("", natsOpts()...)
if check.CriticalIfErr(err, "connection failed: %s", err) {
return nil
}

return monitor.CheckJetStreamAccount(nc, check, monitor.JetStreamAccountOptions{
return monitor.CheckJetStreamAccount(opts().Config.ServerURL(), natsOpts(), check, monitor.JetStreamAccountOptions{
MemoryWarning: c.jsMemWarn,
MemoryCritical: c.jsMemCritical,
FileWarning: c.jsStoreWarn,
Expand All @@ -342,7 +333,7 @@ func (c *SrvCheckCmd) checkJS(_ *fisk.ParseContext) error {
ConsumersWarning: c.jsConsumersWarn,
ConsumersCritical: c.jsConsumersCritical,
CheckReplicas: c.jsReplicas,
ReplicaSeenCritical: c.jsReplicaSeenCritical,
ReplicaSeenCritical: c.jsReplicaSeenCritical.Seconds(),
ReplicaLagCritical: c.jsReplicaLagCritical,
})
}
Expand All @@ -351,79 +342,60 @@ func (c *SrvCheckCmd) checkRaft(_ *fisk.ParseContext) error {
check := &monitor.Result{Name: "JetStream Meta Cluster", Check: "meta"}
defer check.GenericExit()

nc, _, err := prepareHelper("", natsOpts()...)
if check.CriticalIfErr(err, "connection failed: %s", err) {
return nil
}

return monitor.CheckJetstreamMeta(nc, check, monitor.CheckMetaOptions{
return monitor.CheckJetstreamMeta(opts().Config.ServerURL(), natsOpts(), check, monitor.CheckMetaOptions{
ExpectServers: c.raftExpect,
LagCritical: c.raftLagCritical,
SeenCritical: c.raftSeenCritical,
SeenCritical: c.raftSeenCritical.Seconds(),
})
}

func (c *SrvCheckCmd) checkStream(_ *fisk.ParseContext) error {
check := &monitor.Result{Name: c.sourcesStream, Check: "stream", OutFile: checkRenderOutFile, NameSpace: opts().PrometheusNamespace, RenderFormat: checkRenderFormat}
defer check.GenericExit()

_, mgr, err := prepareHelper("", natsOpts()...)
if check.CriticalIfErr(err, "connection failed: %s", err) {
return nil
}

stream, err := mgr.LoadStream(c.sourcesStream)
if check.CriticalIfErr(err, "could not load stream %s: %s", c.sourcesStream, err) {
return nil
checkOpts := &monitor.StreamHealthCheckOptions{
StreamName: c.sourcesStream,
}

checkOpts := &monitor.StreamHealthCheckOptions{}
if c.useMetadata {
checkOpts, err = monitor.ExtractStreamHealthCheckOptions(stream.Metadata())
if check.CriticalIfErr(err, "Invalid metadata: %s", err) {
return nil
}
}

if !c.useMetadata || c.sourcesLagCriticalIsSet {
if c.sourcesLagCriticalIsSet {
checkOpts.SourcesLagCritical = c.sourcesLagCritical
}
if !c.useMetadata || c.sourcesSeenCriticalIsSet {
checkOpts.SourcesSeenCritical = c.sourcesSeenCritical
if c.sourcesSeenCriticalIsSet {
checkOpts.SourcesSeenCritical = c.sourcesSeenCritical.Seconds()
}
if !c.useMetadata || c.sourcesMinSourcesIsSet {
if c.sourcesMinSourcesIsSet {
checkOpts.MinSources = c.sourcesMinSources
}
if !c.useMetadata || c.sourcesMaxSourcesIsSet {
if c.sourcesMaxSourcesIsSet {
checkOpts.MaxSources = c.sourcesMaxSources
}
if !c.useMetadata || c.raftExpectIsSet {
if c.raftExpectIsSet {
checkOpts.ClusterExpectedPeers = c.raftExpect
}
if !c.useMetadata || c.raftLagCriticalIsSet {
if c.raftLagCriticalIsSet {
checkOpts.ClusterLagCritical = c.raftLagCritical
}
if !c.useMetadata || c.raftSeenCriticalIsSet {
checkOpts.ClusterSeenCritical = c.raftSeenCritical
if c.raftSeenCriticalIsSet {
checkOpts.ClusterSeenCritical = c.raftSeenCritical.Seconds()
}
if !c.useMetadata || c.streamMessagesWarnIsSet {
if c.streamMessagesWarnIsSet {
checkOpts.MessagesWarn = c.streamMessagesWarn
}
if !c.useMetadata || c.streamMessagesCritIsSet {
if c.streamMessagesCritIsSet {
checkOpts.MessagesCrit = c.streamMessagesCrit
}
if !c.useMetadata || c.subjectsWarnIsSet {
if c.subjectsWarnIsSet {
checkOpts.SubjectsWarn = c.subjectsWarn
}
if !c.useMetadata || c.subjectsCritIsSet {
if c.subjectsCritIsSet {
checkOpts.SubjectsCrit = c.subjectsCrit
}

logger := api.NewDiscardLogger()
if opts().Trace {
logger = api.NewDefaultLogger(api.TraceLevel)
}
err = monitor.StreamHealthCheck(stream, check, *checkOpts, logger)
err := monitor.StreamHealthCheck(opts().Config.ServerURL(), natsOpts(), check, *checkOpts, logger)
check.CriticalIfErr(err, "Healthcheck failed: %s", err)

return nil
Expand All @@ -433,16 +405,11 @@ func (c *SrvCheckCmd) checkMsg(_ *fisk.ParseContext) error {
check := &monitor.Result{Name: "Stream Message", Check: "message", OutFile: checkRenderOutFile, NameSpace: opts().PrometheusNamespace, RenderFormat: checkRenderFormat}
defer check.GenericExit()

_, mgr, err := prepareHelper("", natsOpts()...)
if check.CriticalIfErr(err, "connection failed") {
return nil
}

return monitor.CheckStreamMessage(mgr, check, monitor.CheckStreamMessageOptions{
return monitor.CheckStreamMessage(opts().Config.ServerURL(), natsOpts(), check, monitor.CheckStreamMessageOptions{
StreamName: c.sourcesStream,
Subject: c.msgSubject,
AgeWarning: c.msgAgeWarn,
AgeCritical: c.msgAgeCrit,
AgeWarning: c.msgAgeWarn.Seconds(),
AgeCritical: c.msgAgeCrit.Seconds(),
Content: c.msgRegexp.String(),
BodyAsTimestamp: c.msgBodyAsTs,
})
Expand All @@ -460,12 +427,12 @@ func (c *SrvCheckCmd) checkConnection(_ *fisk.ParseContext) error {
}

return monitor.CheckConnection(opts().Config.ServerURL(), natsOpts(), opts().Timeout, check, monitor.ConnectionCheckOptions{
ConnectTimeWarning: c.connectWarning,
ConnectTimeCritical: c.connectCritical,
ServerRttWarning: c.rttWarning,
ServerRttCritical: c.rttCritical,
RequestRttWarning: c.reqWarning,
RequestRttCritical: c.reqCritical,
ConnectTimeWarning: c.connectWarning.Seconds(),
ConnectTimeCritical: c.connectCritical.Seconds(),
ServerRttWarning: c.rttWarning.Seconds(),
ServerRttCritical: c.rttCritical.Seconds(),
RequestRttWarning: c.reqWarning.Seconds(),
RequestRttCritical: c.reqCritical.Seconds(),
})
}

Expand All @@ -475,8 +442,8 @@ func (c *SrvCheckCmd) checkCredentialAction(_ *fisk.ParseContext) error {

return monitor.CheckCredential(check, monitor.CredentialCheckOptions{
File: c.credential,
ValidityWarning: c.credentialValidityWarn,
ValidityCritical: c.credentialValidityCrit,
ValidityWarning: c.credentialValidityWarn.Seconds(),
ValidityCritical: c.credentialValidityCrit.Seconds(),
RequiresExpiry: c.credentialRequiresExpire,
})
}
42 changes: 42 additions & 0 deletions cli/server_check_exporter_command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cli

import (
"fmt"
"net/http"

"github.com/choria-io/fisk"
"github.com/nats-io/natscli/internal/exporter"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func (c *SrvCheckCmd) exporterAction(_ *fisk.ParseContext) error {
exp, err := exporter.NewExporter(opts().PrometheusNamespace, c.exporterConfigFile)
if err != nil {
return err
}

prometheus.MustRegister(exp)
http.Handle("/metrics", promhttp.Handler())

if c.exporterCertificate != "" && c.exporterKey != "" {
log.Printf("NATS CLI Prometheus Exporter listening on https://0.0.0.0:%da/metrics", c.exporterPort)
return http.ListenAndServeTLS(fmt.Sprintf(":%d", c.exporterPort), c.exporterCertificate, c.exporterKey, nil)
} else {
log.Printf("NATS CLI Prometheus Exporter listening on http://0.0.0.0:%d/metrics", c.exporterPort)
return http.ListenAndServe(fmt.Sprintf(":%d", c.exporterPort), nil)
}
}
Loading

0 comments on commit 1a3c058

Please sign in to comment.