diff --git a/prober/handler.go b/prober/handler.go index d2be2d1..d5bed8b 100644 --- a/prober/handler.go +++ b/prober/handler.go @@ -57,13 +57,13 @@ func Handler(w http.ResponseWriter, r *http.Request, probes []config.Probe, logg registry.MustRegister(probeDurationGauge) start := time.Now() - if ProbeMQTT(probe, logger) { + mp := newMQTTProbe(probe, logger) + if mp != nil && mp.Probe(probe, logger) { probeSuccessGauge.Set(1) } else { probeSuccessGauge.Set(0) } - probeDurationGauge.Set(time.Since(start).Seconds()) - h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{}) - h.ServeHTTP(w, r) + probeDurationGauge.Set(time.Since(start).Seconds()) + promhttp.HandlerFor(registry, promhttp.HandlerOpts{}).ServeHTTP(w, r) } diff --git a/prober/mqtt.go b/prober/mqtt.go index 6403bc8..d7e6285 100644 --- a/prober/mqtt.go +++ b/prober/mqtt.go @@ -1,10 +1,7 @@ package prober import ( - "context" "emqx-exporter/config" - "fmt" - "sync" "time" mqtt "github.com/eclipse/paho.mqtt.golang" @@ -17,43 +14,12 @@ type MQTTProbe struct { MsgChan <-chan mqtt.Message } -type mqttProbeManager struct { - probes map[string]*MQTTProbe - sync.RWMutex -} - -var manager mqttProbeManager - -func init() { - manager = mqttProbeManager{ - probes: make(map[string]*MQTTProbe), - } - go func() { - for { - manager.Lock() - defer manager.Unlock() - for target, probe := range manager.probes { - if probe == nil { - delete(manager.probes, target) - continue - } - } - manager.Unlock() - - select { - case <-context.Background().Done(): - return - case <-time.After(60 * time.Second): - } - } - }() -} - -func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) { +func newMQTTProbe(probe config.Probe, logger log.Logger) *MQTTProbe { var isReady = make(chan struct{}) var msgChan = make(chan mqtt.Message) opt := mqtt.NewClientOptions().AddBroker(probe.Scheme + "://" + probe.Target) + opt.SetCleanSession(true) opt.SetClientID(probe.ClientID) opt.SetUsername(probe.Username) opt.SetPassword(probe.Password) @@ -63,7 +29,7 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) { } opt.SetOnConnectHandler(func(c mqtt.Client) { optReader := c.OptionsReader() - level.Info(logger).Log("msg", "Connected to MQTT broker", "target", probe.Target, "client_id", optReader.ClientID()) + level.Debug(logger).Log("msg", "Connected to MQTT broker", "target", probe.Target, "client_id", optReader.ClientID()) token := c.Subscribe(probe.Topic, probe.QoS, func(c mqtt.Client, m mqtt.Message) { msgChan <- m }) @@ -73,7 +39,7 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) { return } isReady <- struct{}{} - level.Info(logger).Log("msg", "Subscribed to MQTT topic", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS) + level.Debug(logger).Log("msg", "Subscribed to MQTT topic", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS) }) opt.SetConnectionLostHandler(func(c mqtt.Client, err error) { level.Error(logger).Log("msg", "Lost connection to MQTT broker", "target", probe.Target, "err", err) @@ -81,51 +47,48 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) { c := mqtt.NewClient(opt) if token := c.Connect(); token.Wait() && token.Error() != nil { level.Error(logger).Log("msg", "Failed to connect to MQTT broker", "target", probe.Target, "err", token.Error()) - return nil, token.Error() + return nil } select { case <-isReady: - case <-time.After(60 * time.Second): - return nil, fmt.Errorf("MQTT probe connect timeout") + case <-time.After(time.Duration(probe.KeepAlive) * time.Second): + level.Error(logger).Log("msg", "MQTT probe connect timeout", "target", probe.Target) + return nil } return &MQTTProbe{ Client: c, MsgChan: msgChan, - }, nil + } } -func ProbeMQTT(probe config.Probe, logger log.Logger) bool { - mqttProbe, ok := manager.probes[probe.Target] - if !ok { - var err error - if mqttProbe, err = initMQTTProbe(probe, logger); err != nil { - return false - } - manager.Lock() - defer manager.Unlock() - manager.probes[probe.Target] = mqttProbe - } +func (mp *MQTTProbe) Probe(probe config.Probe, logger log.Logger) bool { + defer mp.Client.Disconnect(0) - if !mqttProbe.Client.IsConnected() { + if !mp.Client.IsConnected() { + level.Error(logger).Log("msg", "MQTT client is not connected", "target", probe.Target) return false } - level.Info(logger).Log("msg", "Publishing MQTT message", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS) - if token := mqttProbe.Client.Publish(probe.Topic, probe.QoS, false, "hello world"); token.Wait() && token.Error() != nil { + level.Debug(logger).Log("msg", "Publishing MQTT message", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS) + + message := "from emqx-exporter MQTT probe" + if token := mp.Client.Publish(probe.Topic, probe.QoS, false, message); token.Wait() && token.Error() != nil { + level.Error(logger).Log("msg", "Failed to publish MQTT message", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS, "err", token.Error()) return false } select { - case msg := <-mqttProbe.MsgChan: - if msg == nil { - return false + case msg := <-mp.MsgChan: + if msg != nil && string(msg.Payload()) == message { + level.Debug(logger).Log("msg", "MQTT probe receive message success", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS) + return true } + level.Error(logger).Log("msg", "MQTT probe receive message failed", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS) + return false case <-time.After(time.Duration(probe.KeepAlive) * time.Second): - level.Info(logger).Log("msg", "MQTT probe receive message timeout", "target", probe.Target) + level.Error(logger).Log("msg", "MQTT probe receive message timeout", "target", probe.Target) return false } - - return true }