Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stored messages require multiple reboots to fully forward #681

Open
jismithc opened this issue Jul 18, 2024 · 3 comments
Open

Stored messages require multiple reboots to fully forward #681

jismithc opened this issue Jul 18, 2024 · 3 comments

Comments

@jismithc
Copy link

Hi! We are hoping someone has seen this issue before or can quickly point out something silly we are doing.

We are currently supporting a container utilizing paho.mqtt to forward multiple messages with various cadences, at max every 2 seconds.

We are using the store functionality during network outages to store up to 16 minutes of messages and forward the stored messages after reconnection. After 16 minutes through a network outage, a watchdog will reboot our system. After the system reconnects to the network, we would expect to see all stored messages are forwarded. Instead, we see only some of the stored messages forwarded. We don't see the remaining messages until after a container restart.

What we are seeing is the following:

  • system goes through an outage for >16minutes, outgoing messages are stored
  • upon reconnection, we see complete outage data from minute 0 to minute 7 backfilled into our db, partial data from minute 7 to minute 16 followed by the following logs
    ConnectionAttemptHandler - Attempting to connect to MQTT broker
    OnConnectHandler - Connected to MQTT broker, number of messages to resend: 0
    [store] corrupted file detected: EOF archived at: /path/to/store/o.2656.CORRUPT
    [store] corrupted file detected: EOF archived at: /path/to/store/o.2657.CORRUPT
    [store] corrupted file detected: EOF archived at: /path/to/store/o.2658.CORRUPT
    [store] corrupted file detected: EOF archived at: /path/to/store/o.2659.CORRUPT
                    ... < repeated until o.2683.CORRUPT > ...
    Connected to MQTT broker
    [store] store could not delete key: o.2656
    [store] store could not delete key: o.2657
    
  • viewing the directory where the messages are stored, we can see the missing data

    • ls /path/to/store/ |wc -l returns 459
  • reboot the container

  • upon reconnection, we see complete outage data from minute 0 to minute 14 backfilled into our db, partial data from minute 14 to minute 16 followed by similar logs to above

  • viewing the directory where the messages are stored, we can see there is still partial data

    • ls /path/to/store/ |wc -l returns 90
  • reboot the container

  • upon reconnection, we see complete outage data from minute 0 to minute 16 backfilled into our db

  • viewing the directory where the messages are stored, we can see there is no more stored messages

Here is how we are initializing our client,

	opts := mqtt.NewClientOptions()
	opts.AddBroker(broker)
	opts.SetClientID(clientID)
	opts.SetTLSConfig(tlsConfig)
	opts.SetCleanSession(false)

	// #1196 - Enable store and forward for QoS 1 and 2 messages.
	opts.SetStore(mqtt.NewFileStore(storeForwardPath))

	// Set timeout to 30 seconds (arbitrary number). Default is no timeout.
	opts.SetWriteTimeout(time.Duration(30) * time.Second)

        // Set handlers to log client messages
	opts.SetDefaultPublishHandler(DefaultPublishHandler)
	opts.SetOnConnectHandler(OnConnectHandler)
	opts.SetConnectionLostHandler(ConnectionLostHandler)
	opts.SetReconnectingHandler(ReconnectingHandler)
	opts.SetConnectionAttemptHandler(ConnectionAttemptHandler)

	// Set up the logger instances for each mqtt log level.
	mqtt.ERROR = newMqttLogger(*slog.Default(), slog.LevelError)
	mqtt.CRITICAL = newMqttLogger(*slog.Default(), slog.LevelError) // Treat as error
	mqtt.WARN = newMqttLogger(*slog.Default(), slog.LevelWarn)

	client := mqtt.NewClient(opts)

Upon publish, we are calling the client.publish() as so,

	if token := client.Publish('/my/mqtt/topic', 1, false, payload); token.Error() != nil {
		return token.Error()
	}

Note: we are not using token.Wait() upon return of the token. With a QOS of 1 during a network outage, we expect the token.Wait() will never be fulfilled. Are we correct in assuming this is what we should be doing?

Another clue that may or may not be relevant - if we disable our watchdog so it does not reboot after 16 minutes, then the system reconnects to the network, any new (live) messages AND stored messages will NOT be published until a container reboot.

Please let us know if this is an issue seen before. We're hoping there is an obvious flaw in our process that the community can spot quickly. Thanks!

@MattBrittan
Copy link
Contributor

Interesting - I have seen this before but always put it down to OS buffering (and message loss on a power cycle was not a big issue with my use case). The filestore writes these files out in two steps:

  1. Writes to tempfile (e.g. o.2656.TMP)
  2. Renames the temp file

I believe that this was done to avoid the situation where the file is partially written, and then power lost, but suspect it does not achieve that goal fully . I wonder if we need to add a Sync.

I've moved all of my apps over to the V5 client and have not seen any corrupt files since doing that. Checked the V5 code and it does call Sync (addded that because I noticed that the timestamps were sometimes not ordered as expected). As such I'd say thats worth a try - would you be open to testing the change (just add f.Sync() before the Close here).

With a QOS of 1 during a network outage, we expect the token.Wait() will never be fulfilled.

The wait would be fuilfilled when the connection drops (no right answer but leaving calls hanging was not really an option (the V5 client handles this better).

Another clue that may or may not be relevant - if we disable our watchdog so it does not reboot after 16 minutes, then the system reconnects to the network, any new (live) messages AND stored messages will NOT be published until a container reboot.

Thats not what I would expect to see (reconnect calls resume which should sent the messages). Would you be open to enabling logging so we can see what is happening there?

Note: Whilst I'm happy to help I don't actually use this library any more so won't put as much time into tracing issues as I would have previously (so the more logs etc you provide the more likely it is that I'll do something). Am still happy to review/merge PR's!

@jismithc
Copy link
Author

Hi @MattBrittan sorry for the late reply / thank you for your quick reply.

After reviewing your message we attempted to move our codebase to the v5, to utilize the queue functionality to store and forward messages during network outages. On initial research it seems the migration to the new client solves all of our issues above.

For posterity, I did try the following without success. I believe I saw the same behavior as before with this change.

would you be open to testing the change (just add f.Sync() before the Close here).

Time is in short supply for me, but if I do have time, I could explore adding logs and debugging this issue with you. For now, I will finalize our migration to paho.golang utilizing the mqtt v5 protocol as you have done.

Thank you again!

@MattBrittan
Copy link
Contributor

No worries - I've done a lot of testing with the V5 client (simulating network drops etc) so am prerry confident that it will not loose messages (QOS2 is another matter - it's supported but the implementation could use some work).
I will leave this issue open for a while in case anyone else encounters the same problem and is able to perform some testring (unfortunately these issues often require a specific setup and can be hard to replicate).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants