From 42528559819fe4e540954c3da4565b291160174c Mon Sep 17 00:00:00 2001 From: Freya Arbjerg Date: Tue, 10 Oct 2023 15:40:29 +0200 Subject: [PATCH] Add option to migrate players to other node on disconnect --- .../kotlin/dev/schlaubi/lavakord/Options.kt | 8 ++++++- .../dev/schlaubi/lavakord/audio/Link.kt | 2 +- .../audio/internal/AbstractLavakord.kt | 15 +++++++++++-- .../lavakord/audio/internal/AbstractLink.kt | 22 +++++++++++++------ .../lavakord/audio/internal/LoadBalancer.kt | 16 +++++--------- .../lavakord/audio/internal/NodeImpl.kt | 11 ++++++++-- .../audio/internal/WebsocketPlayer.kt | 20 +++++++++++++---- 7 files changed, 66 insertions(+), 28 deletions(-) diff --git a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/Options.kt b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/Options.kt index f8636316..b19b5522 100644 --- a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/Options.kt +++ b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/Options.kt @@ -37,12 +37,16 @@ public interface LavaKordOptions { * Configuration for Links and Nodes. * * @property autoReconnect Whether to auto-reconnect links or not + * @property autoMigrateOnDisconnect Whether to try to migrate links from a disconnected node onto a new one. + * This option has no effect if [autoReconnect] is false. If the node is trying to resume, the migration will only + * take place after the node gives up on resuming as per [retry]. * @property resumeTimeout amount of seconds Lavalink will wait to kill all players if the client fails to resume it's connection * @property retry retry strategy (See [Retry] and [LinearRetry]) * @property showTrace whether [RestError.trace] should be populated */ public interface LinkConfig { public val autoReconnect: Boolean + public val autoMigrateOnDisconnect: Boolean public val resumeTimeout: Int public val retry: Retry public val showTrace: Boolean @@ -130,12 +134,13 @@ public data class MutableLavaKordOptions( */ public data class LinkConfig( override var autoReconnect: Boolean = true, + override var autoMigrateOnDisconnect: Boolean = true, override var resumeTimeout: Int = 60, override var retry: Retry = LinearRetry(2.seconds, 60.seconds, 10), override val showTrace: Boolean = false ) : LavaKordOptions.LinkConfig { internal fun seal(): LavaKordOptions.LinkConfig = - ImmutableLavaKordOptions.LinkConfig(autoReconnect, resumeTimeout, retry, showTrace) + ImmutableLavaKordOptions.LinkConfig(autoReconnect, autoMigrateOnDisconnect, resumeTimeout, retry, showTrace) /** * Creates a linear [Retry] strategy. @@ -199,6 +204,7 @@ private data class ImmutableLavaKordOptions( */ data class LinkConfig( override val autoReconnect: Boolean, + override val autoMigrateOnDisconnect: Boolean, override val resumeTimeout: Int, override val retry: Retry, override val showTrace: Boolean diff --git a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/Link.kt b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/Link.kt index bdb6966d..32c8a674 100644 --- a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/Link.kt +++ b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/Link.kt @@ -48,7 +48,7 @@ public interface Link { /** * Called internally when this link is connected or reconnected to a new node without resuming, thereby creating a - * new session. + * new session. This function may also be used if the link is moved to a new session. * @param node The node that was connected to, which may be potentially different from the previously connected node */ public suspend fun onNewSession(node: Node) diff --git a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLavakord.kt b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLavakord.kt index d573066d..fa7cdd3a 100644 --- a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLavakord.kt +++ b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLavakord.kt @@ -22,7 +22,9 @@ import io.ktor.http.* import io.ktor.serialization.kotlinx.* import io.ktor.serialization.kotlinx.json.* import kotlinx.atomicfu.atomic +import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch +import kotlinx.coroutines.newCoroutineContext import kotlinx.serialization.modules.SerializersModule import kotlinx.serialization.modules.contextual import kotlinx.serialization.modules.plus @@ -132,7 +134,7 @@ public abstract class AbstractLavakord internal constructor( override fun getLink(guildId: ULong): Link { return linksMap.getOrPut(guildId) { - val node = loadBalancer.determineBestNode(guildId) as NodeImpl + val node = loadBalancer.determineBestNode(guildId) ?: error("No nodes are available") buildNewLink(guildId, node) } } @@ -160,7 +162,7 @@ public abstract class AbstractLavakord internal constructor( override fun removeNode(name: String) { val node = nodesMap.remove(name) - requireNotNull(node) { "There is no node with that name" } + requireNotNull(node) { "There is no node with name $name" } node.close() } @@ -180,6 +182,15 @@ public abstract class AbstractLavakord internal constructor( */ protected abstract fun buildNewLink(guildId: ULong, node: Node): Link + internal suspend fun migrateFromDisconnectedNode(disconnectedNode: NodeImpl) { + linksMap.filterValues { it.node == disconnectedNode }.mapNotNull { (guild, link) -> + val newNode = loadBalancer.determineBestNode(guild) ?: return@mapNotNull null + launch { + link.onNewSession(newNode) + } + }.joinAll() + } + /** Called on websocket connect without resuming */ internal suspend fun onNewSession(node: Node) { if (!options.link.autoReconnect) return diff --git a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLink.kt b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLink.kt index 5ff57577..c241e626 100644 --- a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLink.kt +++ b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLink.kt @@ -4,6 +4,7 @@ import dev.arbjerg.lavalink.protocol.v4.PlayerUpdate import dev.arbjerg.lavalink.protocol.v4.VoiceState import dev.arbjerg.lavalink.protocol.v4.toOmissible import dev.schlaubi.lavakord.audio.Link +import dev.schlaubi.lavakord.audio.Link.State import dev.schlaubi.lavakord.audio.Node import dev.schlaubi.lavakord.audio.player.Player import dev.schlaubi.lavakord.audio.player.node @@ -21,11 +22,11 @@ public abstract class AbstractLink(node: Node, final override val guildId: ULong override val player: Player = WebsocketPlayer(node as NodeImpl, guildId) abstract override val lavakord: AbstractLavakord override var lastChannelId: ULong? = null - override var state: Link.State = Link.State.NOT_CONNECTED + override var state: State = State.NOT_CONNECTED private var cachedVoiceState: VoiceState? = null override suspend fun onDisconnected() { - state = Link.State.NOT_CONNECTED + state = State.NOT_CONNECTED node.destroyPlayer(guildId) cachedVoiceState = null } @@ -33,22 +34,29 @@ public abstract class AbstractLink(node: Node, final override val guildId: ULong override suspend fun onNewSession(node: Node) { this.node = node player.node + state = State.CONNECTING - cachedVoiceState?.let { - node.updatePlayer(guildId, request = PlayerUpdate(voice = it.toOmissible())) + try { + cachedVoiceState?.let { + node.updatePlayer(guildId, request = PlayerUpdate(voice = it.toOmissible())) + } + } catch (e: Exception) { + state = State.NOT_CONNECTED + throw e } + state = State.CONNECTED (player as WebsocketPlayer).recreatePlayer(node as NodeImpl) } override suspend fun destroy() { - val shouldDisconnect = state !== Link.State.DISCONNECTING && state !== Link.State.NOT_CONNECTED - state = Link.State.DESTROYING + val shouldDisconnect = state !== State.DISCONNECTING && state !== State.NOT_CONNECTED + state = State.DESTROYING if (shouldDisconnect) { disconnectAudio() } node.destroyPlayer(guildId) lavakord.removeDestroyedLink(this) - state = Link.State.DESTROYED + state = State.DESTROYED } internal suspend fun onVoiceServerUpdate(update: VoiceState) { diff --git a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/LoadBalancer.kt b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/LoadBalancer.kt index 2cb4d474..2bf25f76 100644 --- a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/LoadBalancer.kt +++ b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/LoadBalancer.kt @@ -9,18 +9,12 @@ internal class LoadBalancer( private val lavakord: LavaKord ) { - fun determineBestNode(guildId: ULong): Node { - val leastPenalty = lavakord.nodes - .asSequence() - .filter(Node::available) - .minByOrNull { calculatePenalties(it, penaltyProviders, guildId) } + fun determineBestNode(guildId: ULong): Node? = lavakord.nodes + .asSequence() + .filter(Node::available) + .minByOrNull { calculatePenalties(it, penaltyProviders, guildId) } - checkNotNull(leastPenalty) { "No nodes available" } - - return leastPenalty - } - - // Inspired by: https://github.com/Frederikam/Lavalink-Client/blob/master/src/main/java/lavalink/client/io/LavalinkLoadBalancer.java#L111 + // Inspired by: https://github.com/freyacodes/Lavalink-Client/blob/master/src/main/java/lavalink/client/io/LavalinkLoadBalancer.java#L111 private fun calculatePenalties( node: Node, penaltyProviders: List, diff --git a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt index 272cbb02..0006da41 100644 --- a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt +++ b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt @@ -135,9 +135,12 @@ internal class NodeImpl( val reason = session.closeReason.await() val resumeAgain = resume && reason?.knownReason != CloseReason.Codes.NORMAL if (resumeAgain) { - LOG.warn { "Disconnected from websocket for: $reason. Music will continue playing if we can reconnect within the next $resumeTimeout seconds" } + LOG.warn { "$name disconnected from websocket for: $reason. Music will continue playing if we can reconnect within the next $resumeTimeout seconds" } } else { - LOG.warn { "Disconnected from websocket for: $reason. Not resuming." } + LOG.warn { "$name disconnected from websocket for: $reason. Not resuming." } + if (lavakord.options.link.autoReconnect && lavakord.options.link.autoMigrateOnDisconnect) { + lavakord.migrateFromDisconnectedNode(this) + } } reconnect(resume = resumeAgain) } @@ -214,8 +217,12 @@ internal class NodeImpl( } override fun close() { + available = false lavakord.launch { session.close(CloseReason(CloseReason.Codes.NORMAL, "Close requested by client")) + if (lavakord.options.link.autoReconnect && lavakord.options.link.autoMigrateOnDisconnect) { + lavakord.migrateFromDisconnectedNode(this@NodeImpl) + } } } diff --git a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/WebsocketPlayer.kt b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/WebsocketPlayer.kt index 6c038f76..8ca713ee 100644 --- a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/WebsocketPlayer.kt +++ b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/WebsocketPlayer.kt @@ -12,6 +12,8 @@ import dev.schlaubi.lavakord.audio.player.Player import dev.schlaubi.lavakord.rest.models.FiltersObject import dev.schlaubi.lavakord.rest.models.toLavalink import dev.schlaubi.lavakord.rest.updatePlayer +import kotlinx.atomicfu.AtomicBoolean +import kotlinx.atomicfu.atomic import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.filter @@ -38,6 +40,7 @@ internal class WebsocketPlayer(node: NodeImpl, internal val guildId: ULong) : Pl return (lastPosition + elapsedSinceUpdate).coerceAtMost(trackLength) } private var specifiedEndTime: Duration? = null + private val isRecreating = atomic(false) override val volume: Int get() = ((filters.volume ?: 1.0f) * 100).toInt() @@ -90,7 +93,7 @@ internal class WebsocketPlayer(node: NodeImpl, internal val guildId: ULong) : Pl private fun handleNewTrack(event: TrackStartEvent) { updateTime = Clock.System.now() val track = event.track - lastPosition = 0.milliseconds + lastPosition = event.track.info.position.milliseconds playingTrack = track } @@ -127,22 +130,31 @@ internal class WebsocketPlayer(node: NodeImpl, internal val guildId: ULong) : Pl } internal fun provideState(state: PlayerState) { + // After migrating the player to a new node, the new node may send a position of 0 as we are starting a new track. + // This may cause a race condition where the migrated track starts at close to 0:00 even if the start time should + // be later. Ignoring the first player update if it is zero fixes this issue. + if (isRecreating.getAndSet(true) && state.position == 0L) return updateTime = Instant.fromEpochMilliseconds(state.time) lastPosition = state.position.milliseconds } internal suspend fun recreatePlayer(node: NodeImpl) { this.node = node - val position = if (playingTrack == null) Omissible.omitted() else positionDuration.inWholeMilliseconds.toOmissible() - node.updatePlayer(guildId, noReplace = false, PlayerUpdate( + val position = if (playingTrack == null) null else positionDuration.inWholeMilliseconds + + isRecreating.value = true + node.updatePlayer( + guildId, noReplace = false, PlayerUpdate( encodedTrack = playingTrack?.encoded.toOmissible(), identifier = Omissible.omitted(), - position = position, + position = position.toOmissible(), endTime = specifiedEndTime?.inWholeMilliseconds.toOmissible(), volume = volume.toOmissible(), paused = paused.toOmissible(), filters = filters.toLavalink().toOmissible() ) ) + updateTime = Clock.System.now() + lastPosition = position?.milliseconds ?: 0.milliseconds } }