Skip to content

Commit

Permalink
refactor: refactor mqtt probe
Browse files Browse the repository at this point in the history
disconnect mqtt when every prometherus scrape compliete

Signed-off-by: Rory Z <16801068+Rory-Z@users.noreply.github.com>
  • Loading branch information
Rory-Z committed Jan 11, 2024
1 parent c876518 commit ca35096
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 66 deletions.
8 changes: 4 additions & 4 deletions prober/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ func Handler(w http.ResponseWriter, r *http.Request, probes []config.Probe, logg
registry.MustRegister(probeDurationGauge)

start := time.Now()
if ProbeMQTT(probe, logger) {
mp := newMQTTProbe(probe, logger)
if mp != nil && mp.Probe(probe, logger) {
probeSuccessGauge.Set(1)
} else {
probeSuccessGauge.Set(0)
}
probeDurationGauge.Set(time.Since(start).Seconds())

h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
h.ServeHTTP(w, r)
probeDurationGauge.Set(time.Since(start).Seconds())
promhttp.HandlerFor(registry, promhttp.HandlerOpts{}).ServeHTTP(w, r)
}
87 changes: 25 additions & 62 deletions prober/mqtt.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package prober

import (
"context"
"emqx-exporter/config"
"fmt"
"sync"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
Expand All @@ -17,43 +14,12 @@ type MQTTProbe struct {
MsgChan <-chan mqtt.Message
}

type mqttProbeManager struct {
probes map[string]*MQTTProbe
sync.RWMutex
}

var manager mqttProbeManager

func init() {
manager = mqttProbeManager{
probes: make(map[string]*MQTTProbe),
}
go func() {
for {
manager.Lock()
defer manager.Unlock()
for target, probe := range manager.probes {
if probe == nil {
delete(manager.probes, target)
continue
}
}
manager.Unlock()

select {
case <-context.Background().Done():
return
case <-time.After(60 * time.Second):
}
}
}()
}

func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
func newMQTTProbe(probe config.Probe, logger log.Logger) *MQTTProbe {
var isReady = make(chan struct{})
var msgChan = make(chan mqtt.Message)

opt := mqtt.NewClientOptions().AddBroker(probe.Scheme + "://" + probe.Target)
opt.SetCleanSession(true)
opt.SetClientID(probe.ClientID)
opt.SetUsername(probe.Username)
opt.SetPassword(probe.Password)
Expand All @@ -63,7 +29,7 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
}
opt.SetOnConnectHandler(func(c mqtt.Client) {
optReader := c.OptionsReader()
level.Info(logger).Log("msg", "Connected to MQTT broker", "target", probe.Target, "client_id", optReader.ClientID())
level.Debug(logger).Log("msg", "Connected to MQTT broker", "target", probe.Target, "client_id", optReader.ClientID())
token := c.Subscribe(probe.Topic, probe.QoS, func(c mqtt.Client, m mqtt.Message) {
msgChan <- m
})
Expand All @@ -73,59 +39,56 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
return
}
isReady <- struct{}{}
level.Info(logger).Log("msg", "Subscribed to MQTT topic", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS)
level.Debug(logger).Log("msg", "Subscribed to MQTT topic", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS)
})
opt.SetConnectionLostHandler(func(c mqtt.Client, err error) {
level.Error(logger).Log("msg", "Lost connection to MQTT broker", "target", probe.Target, "err", err)
})
c := mqtt.NewClient(opt)
if token := c.Connect(); token.Wait() && token.Error() != nil {
level.Error(logger).Log("msg", "Failed to connect to MQTT broker", "target", probe.Target, "err", token.Error())
return nil, token.Error()
return nil
}

select {
case <-isReady:
case <-time.After(60 * time.Second):
return nil, fmt.Errorf("MQTT probe connect timeout")
case <-time.After(time.Duration(probe.KeepAlive) * time.Second):
level.Error(logger).Log("msg", "MQTT probe connect timeout", "target", probe.Target)
return nil
}

return &MQTTProbe{
Client: c,
MsgChan: msgChan,
}, nil
}
}

func ProbeMQTT(probe config.Probe, logger log.Logger) bool {
mqttProbe, ok := manager.probes[probe.Target]
if !ok {
var err error
if mqttProbe, err = initMQTTProbe(probe, logger); err != nil {
return false
}
manager.Lock()
defer manager.Unlock()
manager.probes[probe.Target] = mqttProbe
}
func (mp *MQTTProbe) Probe(probe config.Probe, logger log.Logger) bool {
defer mp.Client.Disconnect(0)

if !mqttProbe.Client.IsConnected() {
if !mp.Client.IsConnected() {
level.Error(logger).Log("msg", "MQTT client is not connected", "target", probe.Target)
return false
}

level.Info(logger).Log("msg", "Publishing MQTT message", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS)
if token := mqttProbe.Client.Publish(probe.Topic, probe.QoS, false, "hello world"); token.Wait() && token.Error() != nil {
level.Debug(logger).Log("msg", "Publishing MQTT message", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS)

message := "from emqx-exporter MQTT probe"
if token := mp.Client.Publish(probe.Topic, probe.QoS, false, message); token.Wait() && token.Error() != nil {
level.Error(logger).Log("msg", "Failed to publish MQTT message", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS, "err", token.Error())
return false
}

select {
case msg := <-mqttProbe.MsgChan:
if msg == nil {
return false
case msg := <-mp.MsgChan:
if msg != nil && string(msg.Payload()) == message {
level.Debug(logger).Log("msg", "MQTT probe receive message success", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS)
return true
}
level.Error(logger).Log("msg", "MQTT probe receive message failed", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS)
return false
case <-time.After(time.Duration(probe.KeepAlive) * time.Second):
level.Info(logger).Log("msg", "MQTT probe receive message timeout", "target", probe.Target)
level.Error(logger).Log("msg", "MQTT probe receive message timeout", "target", probe.Target)
return false
}

return true
}

0 comments on commit ca35096

Please sign in to comment.