diff --git a/config/config.go b/config/config.go index 13f8f16..9b2ea48 100644 --- a/config/config.go +++ b/config/config.go @@ -173,12 +173,12 @@ func (sc *SafeConfig) ReloadConfig(confFile string) (err error) { if probe.ClientID == "" { hostname, _ := os.Hostname() hostname = strings.Replace(hostname, ".", "-", -1) - probe.ClientID = "emqx-exporter-probe-" + hostname + fmt.Sprintf("%d", index) + probe.ClientID = fmt.Sprintf("emqx-exporter-probe-%s-%d", hostname, index) } if probe.Topic == "" { hostname, _ := os.Hostname() hostname = strings.Replace(hostname, ".", "-", -1) - probe.Topic = "emqx-exporter-probe/" + hostname + "/" + fmt.Sprintf("%d", index) + probe.Topic = fmt.Sprintf("emqx-exporter-probe/%s/%d", hostname, index) } if probe.KeepAlive == 0 { probe.KeepAlive = 30 diff --git a/main_test.go b/main_test.go index 2c0d98e..95b93ad 100644 --- a/main_test.go +++ b/main_test.go @@ -460,6 +460,7 @@ dashboard { } } } +log.console.level = debug listeners { tcp.fake{ bind = 11883 @@ -590,7 +591,7 @@ rule_engine { mqttxSubResp, err := cli.ContainerCreate(ctx, &container.Config{ Image: mqttxContainer.image, - Cmd: []string{"mqttx", "bench", "sub", "-t", "test", "-h", emqxInfo.NetworkSettings.IPAddress}, + Cmd: []string{"mqttx", "sub", "-t", "test", "-h", emqxInfo.NetworkSettings.IPAddress}, }, nil, nil, nil, "mqttx-sub") if err != nil { panic(err) @@ -602,7 +603,7 @@ rule_engine { mqttxPubResp, err := cli.ContainerCreate(ctx, &container.Config{ Image: mqttxContainer.image, - Cmd: []string{"mqttx", "bench", "pub", "-c", "1", "-t", "test", "-h", emqxInfo.NetworkSettings.IPAddress}, + Cmd: []string{"mqttx", "pub", "-c", "1", "-t", "test", "-h", emqxInfo.NetworkSettings.IPAddress}, }, nil, nil, nil, "mqttx-pub") if err != nil { panic(err) diff --git a/prober/mqtt.go b/prober/mqtt.go index b8a572f..6403bc8 100644 --- a/prober/mqtt.go +++ b/prober/mqtt.go @@ -3,6 +3,7 @@ package prober import ( "context" "emqx-exporter/config" + "fmt" "sync" "time" @@ -49,6 +50,7 @@ func init() { } func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) { + var isReady = make(chan struct{}) var msgChan = make(chan mqtt.Message) opt := mqtt.NewClientOptions().AddBroker(probe.Scheme + "://" + probe.Target) @@ -60,7 +62,8 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) { opt.SetTLSConfig(probe.TLSClientConfig.ToTLSConfig()) } opt.SetOnConnectHandler(func(c mqtt.Client) { - level.Info(logger).Log("msg", "Connected to MQTT broker", "target", probe.Target) + optReader := c.OptionsReader() + level.Info(logger).Log("msg", "Connected to MQTT broker", "target", probe.Target, "client_id", optReader.ClientID()) token := c.Subscribe(probe.Topic, probe.QoS, func(c mqtt.Client, m mqtt.Message) { msgChan <- m }) @@ -69,6 +72,7 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) { level.Error(logger).Log("msg", "Failed to subscribe to MQTT topic", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS, "err", token.Error()) return } + isReady <- struct{}{} level.Info(logger).Log("msg", "Subscribed to MQTT topic", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS) }) opt.SetConnectionLostHandler(func(c mqtt.Client, err error) { @@ -80,6 +84,12 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) { return nil, token.Error() } + select { + case <-isReady: + case <-time.After(60 * time.Second): + return nil, fmt.Errorf("MQTT probe connect timeout") + } + return &MQTTProbe{ Client: c, MsgChan: msgChan, @@ -102,6 +112,7 @@ func ProbeMQTT(probe config.Probe, logger log.Logger) bool { return false } + level.Info(logger).Log("msg", "Publishing MQTT message", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS) if token := mqttProbe.Client.Publish(probe.Topic, probe.QoS, false, "hello world"); token.Wait() && token.Error() != nil { return false }