Skip to content

Commit

Permalink
feat: add config file for metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <16801068+Rory-Z@users.noreply.github.com>
  • Loading branch information
Rory-Z committed Oct 17, 2023
1 parent 52071dd commit 5591ccb
Show file tree
Hide file tree
Showing 11 changed files with 233 additions and 247 deletions.
46 changes: 24 additions & 22 deletions client/client_4.x.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@ package client
import (
"emqx-exporter/collector"
"fmt"
jsoniter "github.com/json-iterator/go"
"github.com/valyala/fasthttp"
"net/http"
"strconv"
"strings"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/valyala/fasthttp"
)

var _ client = &cluster4x{}

type cluster4x struct {
version string
client *fasthttp.Client
username string
password string
version string
client *fasthttp.Client
}

func (n *cluster4x) getVersion() string {
Expand All @@ -30,8 +33,7 @@ func (n *cluster4x) getLicense() (lic *collector.LicenseInfo, err error) {
}
Code int
}{}

data, statusCode, err := callHTTPGet(n.client, "/api/v4/license")
data, statusCode, err := callHTTPGet(n.client, "/api/v4/license", n.username, n.password)
if statusCode == http.StatusNotFound {
// open source version doesn't support license api
err = nil
Expand Down Expand Up @@ -80,7 +82,7 @@ func (n *cluster4x) getClusterStatus() (cluster collector.ClusterStatus, err err
}
Code int
}{}
err = callHTTPGetWithResp(n.client, "/api/v4/nodes", &resp)
err = callHTTPGetWithResp(n.client, "/api/v4/nodes", n.username, n.password, &resp)
if err != nil {
return
}
Expand Down Expand Up @@ -121,7 +123,7 @@ func (n *cluster4x) getBrokerMetrics() (metrics *collector.Broker, err error) {
}
Code int
}{}
data, statusCode, err := callHTTPGet(n.client, "/api/v4/monitor/current_metrics")
data, statusCode, err := callHTTPGet(n.client, "/api/v4/monitor/current_metrics", n.username, n.password)
if statusCode == http.StatusNotFound {
// open source version doesn't support this api
err = nil
Expand Down Expand Up @@ -153,30 +155,30 @@ func (n *cluster4x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err
resp := struct {
Data []struct {
Metrics []struct {
Node string `json:"node"`
Node string `json:"node"`
SpeedMax float64 `json:"speed_max"`
SpeedLast5m float64 `json:"speed_last5m"`
Speed float64 `json:"speed"`
Matched int64 `json:"matched"`
Passed int64 `json:"passed"`
NoResult int64 `json:"no_result"`
Exception int64 `json:"exception"`
Failed int64 `json:"failed"`
Matched int64 `json:"matched"`
Passed int64 `json:"passed"`
NoResult int64 `json:"no_result"`
Exception int64 `json:"exception"`
Failed int64 `json:"failed"`
}
Actions []struct {
Metrics []struct {
Node string `json:"node"`
Taken int64 `json:"taken"`
Success int64 `json:"success"`
Failed int64 `json:"failed"`
Taken int64 `json:"taken"`
Success int64 `json:"success"`
Failed int64 `json:"failed"`
}
}
ID string `json:"id"`
Enabled bool
}
Code int
}{}
err = callHTTPGetWithResp(n.client, "/api/v4/rules?_limit=10000", &resp)
err = callHTTPGetWithResp(n.client, "/api/v4/rules?_limit=10000", n.username, n.password, &resp)
if err != nil {
return
}
Expand Down Expand Up @@ -226,21 +228,21 @@ func (n *cluster4x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err
}

func (n *cluster4x) getDataBridge() (bridges []collector.DataBridge, err error) {
bridgesResp := struct {
resp := struct {
Data []struct {
ID string `json:"id"`
Type string
Status bool
}
Code int
}{}
err = callHTTPGetWithResp(n.client, "/api/v4/resources", &bridgesResp)
err = callHTTPGetWithResp(n.client, "/api/v4/resources", n.username, n.password, &resp)
if err != nil {
return
}

bridges = make([]collector.DataBridge, len(bridgesResp.Data))
for i, data := range bridgesResp.Data {
bridges = make([]collector.DataBridge, len(resp.Data))
for i, data := range resp.Data {
enabled := unhealthy
if data.Status {
enabled = healthy
Expand Down
45 changes: 24 additions & 21 deletions client/client_5.x.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@ package client
import (
"emqx-exporter/collector"
"fmt"
"github.com/valyala/fasthttp"
"strconv"
"time"

"github.com/valyala/fasthttp"
)

var _ client = &cluster5x{}

type cluster5x struct {
version string
edition edition
client *fasthttp.Client
username string
password string
version string
edition edition
client *fasthttp.Client
}

func (n *cluster5x) getVersion() string {
Expand All @@ -29,7 +32,7 @@ func (n *cluster5x) getLicense() (lic *collector.LicenseInfo, err error) {
MaxConnections int64 `json:"max_connections"`
ExpiryAt string `json:"expiry_at"`
}{}
err = callHTTPGetWithResp(n.client, "/api/v5/license", &resp)
err = callHTTPGetWithResp(n.client, "/api/v5/license", n.username, n.password, &resp)
if err != nil {
return
}
Expand Down Expand Up @@ -60,7 +63,7 @@ func (n *cluster5x) getClusterStatus() (cluster collector.ClusterStatus, err err
Load5 any `json:"load5"`
Load15 any `json:"load15"`
}{{}}
err = callHTTPGetWithResp(n.client, "/api/v5/nodes", &resp)
err = callHTTPGetWithResp(n.client, "/api/v5/nodes", n.username, n.password, &resp)
if err != nil {
return
}
Expand Down Expand Up @@ -106,7 +109,7 @@ func (n *cluster5x) getBrokerMetrics() (metrics *collector.Broker, err error) {
SentMsgRate int64 `json:"sent_msg_rate"`
ReceivedMsgRate int64 `json:"received_msg_rate"`
}{}
err = callHTTPGetWithResp(n.client, "/api/v5/monitor_current", &resp)
err = callHTTPGetWithResp(n.client, "/api/v5/monitor_current", n.username, n.password, &resp)
if err != nil {
return
}
Expand All @@ -126,7 +129,7 @@ func (n *cluster5x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err
Enable bool
}
}{}
err = callHTTPGetWithResp(n.client, "/api/v5/rules?limit=10000", &resp)
err = callHTTPGetWithResp(n.client, "/api/v5/rules?limit=10000", n.username, n.password, &resp)
if err != nil {
return
}
Expand All @@ -140,12 +143,12 @@ func (n *cluster5x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err
NodeMetrics []struct {
Node string
Metrics struct {
Rate float64 `json:"matched.rate"`
RateLast5m float64 `json:"matched.rate.last5m"`
RateMax float64 `json:"matched.rate.max"`
Matched int64
Passed int64
Failed int64
Rate float64 `json:"matched.rate"`
RateLast5m float64 `json:"matched.rate.last5m"`
RateMax float64 `json:"matched.rate.max"`
Matched int64
Passed int64
Failed int64
Exception int64 `json:"failed.exception"`
NoResult int64 `json:"failed.no_result"`
ActionTotal int64 `json:"actions.total"`
Expand All @@ -154,7 +157,7 @@ func (n *cluster5x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err
}
} `json:"node_metrics"`
}{}
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/rules/%s/metrics", rule.ID), &metricsResp)
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/rules/%s/metrics", rule.ID), n.username, n.password, &metricsResp)
if err != nil {
return
}
Expand Down Expand Up @@ -187,7 +190,7 @@ func (n *cluster5x) getDataBridge() (bridges []collector.DataBridge, err error)
Type string
Status string
}{{}}
err = callHTTPGetWithResp(n.client, "/api/v5/bridges", &bridgesResp)
err = callHTTPGetWithResp(n.client, "/api/v5/bridges", n.username, n.password, &bridgesResp)
if err != nil {
return
}
Expand All @@ -211,7 +214,7 @@ func (n *cluster5x) getDataBridge() (bridges []collector.DataBridge, err error)
Dropped int64
}
}{}
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/bridges/%s:%s/metrics", data.Type, data.Name), &metricsResp)
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/bridges/%s:%s/metrics", data.Type, data.Name), n.username, n.password, &metricsResp)
if err != nil {
return
}
Expand All @@ -230,7 +233,7 @@ func (n *cluster5x) getAuthenticationMetrics() (dataSources []collector.DataSour
Backend string
Enable bool
}{{}}
err = callHTTPGetWithResp(n.client, "/api/v5/authentication", &resp)
err = callHTTPGetWithResp(n.client, "/api/v5/authentication", n.username, n.password, &resp)
if err != nil {
return
}
Expand All @@ -254,7 +257,7 @@ func (n *cluster5x) getAuthenticationMetrics() (dataSources []collector.DataSour
} `json:"node_metrics"`
Status string
}{}
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authentication/%s/status", plugin.ID), &status)
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authentication/%s/status", plugin.ID), n.username, n.password, &status)
if err != nil {
return
}
Expand Down Expand Up @@ -293,7 +296,7 @@ func (n *cluster5x) getAuthorizationMetrics() (dataSources []collector.DataSourc
Enable bool
}
}{}
err = callHTTPGetWithResp(n.client, "/api/v5/authorization/sources", &resp)
err = callHTTPGetWithResp(n.client, "/api/v5/authorization/sources", n.username, n.password, &resp)
if err != nil {
return
}
Expand All @@ -317,7 +320,7 @@ func (n *cluster5x) getAuthorizationMetrics() (dataSources []collector.DataSourc
} `json:"node_metrics"`
Status string
}{}
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authorization/sources/%s/status", plugin.Type), &status)
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authorization/sources/%s/status", plugin.Type), n.username, n.password, &status)
if err != nil {
return
}
Expand Down
103 changes: 38 additions & 65 deletions client/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,86 +3,60 @@ package client
import (
"context"
"emqx-exporter/collector"
"emqx-exporter/config"
"fmt"
"github.com/alecthomas/kingpin/v2"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"strconv"
"strings"
"sync"
"time"
)

var (
emqxNodes = kingpin.Flag("emqx.nodes", "The list of EMQX cluster node addr").Default("").String()
emqxUsername = kingpin.Flag("emqx.auth-username", "The username used for emqx api basic auth").Default("").String()
emqxPassword = kingpin.Flag("emqx.auth-password", "The password used for emqx api basic auth").Default("").String()
"github.com/go-kit/log"
"github.com/go-kit/log/level"
)

type cluster struct {
client client
nodeLock sync.RWMutex
logger log.Logger
}

func NewCluster(logger log.Logger) collector.Cluster {
addrs := strings.Split(*emqxNodes, ",")
if len(addrs) == 0 {
panic(fmt.Sprintf("Invalid emqx node addrs: %s", *emqxNodes))
}
for _, addr := range addrs {
if !strings.ContainsRune(addr, ':') {
panic(fmt.Sprintf("Invalid emqx node addr: %s", addr))
func NewCluster(metrics *config.Metrics, logger log.Logger) collector.Cluster {
c := &cluster{}

go func() {
httpClient := getHTTPClient(metrics.Target)
for {
client4 := &cluster4x{
username: metrics.APIKey,
password: metrics.APISecret,
client: httpClient,
}
if _, err := client4.getClusterStatus(); err != nil {
c.client = client4
return
}

client5 := &cluster5x{
username: metrics.APIKey,
password: metrics.APISecret,
client: httpClient,
}
if _, err := client5.getClusterStatus(); err == nil {
c.client = client5
return
}

level.Error(logger).Log("msg", "Couldn't create cluster client, will retry it after 5 seconds", "err", "no cluster node found")
c.client = nil

select {
case <-context.Background().Done():
return
case <-time.After(5 * time.Second):
}
}
}

if *emqxUsername == "" {
panic("Missing username used for emqx api basic auth")
}
if *emqxPassword == "" {
panic("Missing password used for emqx api basic auth")
}

c := &cluster{
logger: logger,
}
go c.checkNodes()
}()
return c
}

func (c *cluster) checkNodes() {
httpClient := getHTTPClient(*emqxNodes)
var currentVersion string
for {
var client client
var err4, err5 error
client = &cluster4x{client: httpClient}
_, err4 = client.getClusterStatus()
if err4 != nil {
client = &cluster5x{client: httpClient}
_, err5 = client.getClusterStatus()
}
if err4 != nil && err5 != nil {
_ = level.Warn(c.logger).Log("check nodes", "couldn't get node info", "addr", *emqxNodes,
"err4", err4.Error(), "err5", err5.Error())
client = nil
} else if currentVersion != client.getVersion() {
currentVersion = client.getVersion()
_ = level.Info(c.logger).Log("ClusterVersion", currentVersion)
}

c.nodeLock.Lock()
c.client = client
c.nodeLock.Unlock()

select {
case <-context.Background().Done():
return
case <-time.After(5 * time.Second):
}
}
}

func (c *cluster) GetLicense() (lic *collector.LicenseInfo, err error) {
client := c.getNode()
if client == nil {
Expand All @@ -102,7 +76,6 @@ func (c *cluster) GetLicense() (lic *collector.LicenseInfo, err error) {
func (c *cluster) GetClusterStatus() (cluster collector.ClusterStatus, err error) {
client := c.getNode()
if client == nil {
cluster.Status = unknown
return
}
cluster, err = client.getClusterStatus()
Expand Down
Loading

0 comments on commit 5591ccb

Please sign in to comment.