From 56fe8c571e6ece3f67a114526aef83c9a1de37aa Mon Sep 17 00:00:00 2001 From: Duncan Sterken Date: Sun, 10 Mar 2024 16:12:19 +0000 Subject: [PATCH 1/2] Remove types on events (#21) --- .../arbjerg/lavalink/client/LavalinkClient.kt | 10 ++++----- .../arbjerg/lavalink/client/LavalinkNode.kt | 8 +++---- .../dev/arbjerg/lavalink/client/events.kt | 22 +++++++++---------- src/test/kotlin/testScript.kt | 2 +- 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt b/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt index ad69a0d..6d8a80f 100644 --- a/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt +++ b/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt @@ -32,8 +32,8 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable { get() = linkMap.values.toList() // Events forwarded from all nodes. - private val sink: Sinks.Many> = Sinks.many().multicast().onBackpressureBuffer() - val flux: Flux> = sink.asFlux() + private val sink: Sinks.Many = Sinks.many().multicast().onBackpressureBuffer() + val flux: Flux = sink.asFlux() private val reference: Disposable = flux.subscribe() /** @@ -216,7 +216,7 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable { * * @return a [Flux] of [ClientEvent]s */ - fun > on(type: Class): Flux { + fun on(type: Class): Flux { return flux.ofType(type) } @@ -225,7 +225,7 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable { * * @return a [Flux] of [ClientEvent]s */ - inline fun > on() = on(T::class.java) + inline fun on() = on(T::class.java) /** * Close the client and disconnect all nodes. @@ -246,7 +246,7 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable { } private fun listenForNodeEvent(node: LavalinkNode) { - node.on>() + node.on() .subscribe { try { sink.tryEmitNext(it) diff --git a/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt b/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt index 9489fd6..a33fe52 100644 --- a/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt +++ b/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt @@ -49,8 +49,8 @@ class LavalinkNode( internal val httpClient = OkHttpClient.Builder().callTimeout(nodeOptions.httpTimeout, TimeUnit.MILLISECONDS).build() - internal val sink: Many> = Sinks.many().multicast().onBackpressureBuffer() - val flux: Flux> = sink.asFlux() + internal val sink: Many = Sinks.many().multicast().onBackpressureBuffer() + val flux: Flux = sink.asFlux() private val reference: Disposable = flux.subscribe() internal val rest = LavalinkRestClient(this) @@ -89,7 +89,7 @@ class LavalinkNode( * * @return a [Flux] of [ClientEvent]s */ - fun > on(type: Class): Flux { + fun on(type: Class): Flux { return flux.ofType(type) } @@ -98,7 +98,7 @@ class LavalinkNode( * * @return a [Flux] of [ClientEvent]s */ - inline fun > on() = on(T::class.java) + inline fun on() = on(T::class.java) /** * Retrieves a list of all players from the lavalink node. diff --git a/src/main/kotlin/dev/arbjerg/lavalink/client/events.kt b/src/main/kotlin/dev/arbjerg/lavalink/client/events.kt index c7ffb70..a1b8cd0 100644 --- a/src/main/kotlin/dev/arbjerg/lavalink/client/events.kt +++ b/src/main/kotlin/dev/arbjerg/lavalink/client/events.kt @@ -17,14 +17,14 @@ internal fun Message.toClientEvent(node: LavalinkNode) = when (this) { is Message.StatsEvent -> StatsEvent(node, frameStats, players, playingPlayers, uptime, memory, cpu) } -sealed class ClientEvent(open val node: LavalinkNode) +sealed class ClientEvent(open val node: LavalinkNode) // Normal events data class ReadyEvent(override val node: LavalinkNode, val resumed: Boolean, val sessionId: String) - : ClientEvent(node) + : ClientEvent(node) data class PlayerUpdateEvent(override val node: LavalinkNode, val guildId: Long, val state: PlayerState) - : ClientEvent(node) + : ClientEvent(node) data class StatsEvent( override val node: LavalinkNode, @@ -34,23 +34,23 @@ data class StatsEvent( val uptime: Long, val memory: Memory, val cpu: Cpu -) : ClientEvent(node) +) : ClientEvent(node) // Player events -sealed class EmittedEvent(override val node: LavalinkNode, open val guildId: Long) - : ClientEvent(node) +sealed class EmittedEvent(override val node: LavalinkNode, open val guildId: Long) + : ClientEvent(node) data class TrackStartEvent(override val node: LavalinkNode, override val guildId: Long, val track: Track) - : EmittedEvent(node, guildId) + : EmittedEvent(node, guildId) data class TrackEndEvent(override val node: LavalinkNode, override val guildId: Long, val track: Track, val endReason: AudioTrackEndReason) - : EmittedEvent(node, guildId) + : EmittedEvent(node, guildId) data class TrackExceptionEvent(override val node: LavalinkNode, override val guildId: Long, val track: Track, val exception: TrackException) - : EmittedEvent(node, guildId) + : EmittedEvent(node, guildId) data class TrackStuckEvent(override val node: LavalinkNode, override val guildId: Long, val track: Track, val thresholdMs: Long) - : EmittedEvent(node, guildId) + : EmittedEvent(node, guildId) data class WebSocketClosedEvent(override val node: LavalinkNode, override val guildId: Long, val code: Int, val reason: String, val byRemote: Boolean) - : EmittedEvent(node, guildId) + : EmittedEvent(node, guildId) diff --git a/src/test/kotlin/testScript.kt b/src/test/kotlin/testScript.kt index abf845b..4536f0f 100644 --- a/src/test/kotlin/testScript.kt +++ b/src/test/kotlin/testScript.kt @@ -49,7 +49,7 @@ fun main() { println("[event 2] Node '${event.node.name}' has stats, current players: ${event.playingPlayers}/${event.players}") } - client.on>() + client.on() .subscribe { event -> if (event is TrackStartEvent) { println("Is a track start event!") From 3047756a7b6035663884d2dae03911cdaff705ff Mon Sep 17 00:00:00 2001 From: Duncan Sterken Date: Sun, 10 Mar 2024 16:14:23 +0000 Subject: [PATCH 2/2] Add delay on node transfer to hopefully prevent 4006 (#24) * Fix transfer delay, double delay to 1000ms * Add closecode reconnect example --------- Co-authored-by: Freya Arbjerg --- .../arbjerg/lavalink/client/LavalinkClient.kt | 4 ++- .../dev/arbjerg/lavalink/client/Link.kt | 5 ++-- .../main/java/me/duncte123/testbot/Main.java | 27 +++++++++++++++++-- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt b/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt index 6d8a80f..725fd80 100644 --- a/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt +++ b/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt @@ -11,6 +11,7 @@ import reactor.core.publisher.Flux import reactor.core.publisher.Sinks import java.io.Closeable import java.net.URI +import java.time.Duration import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.Executors @@ -203,7 +204,8 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable { val voiceRegion = link.cachedPlayer?.voiceRegion link.state = LinkState.CONNECTING - link.transferNode(loadBalancer.selectNode(region = voiceRegion)) + // The delay is used to prevent a race condition in Discord, causing close code 4006 + link.transferNode(loadBalancer.selectNode(region = voiceRegion), delay = Duration.ofMillis(1000)) } } } diff --git a/src/main/kotlin/dev/arbjerg/lavalink/client/Link.kt b/src/main/kotlin/dev/arbjerg/lavalink/client/Link.kt index 6bf57a4..2eb26ee 100644 --- a/src/main/kotlin/dev/arbjerg/lavalink/client/Link.kt +++ b/src/main/kotlin/dev/arbjerg/lavalink/client/Link.kt @@ -61,15 +61,14 @@ class Link( */ fun loadItem(identifier: String) = node.loadItem(identifier) - internal fun transferNode(newNode: LavalinkNode) { + internal fun transferNode(newNode: LavalinkNode, delay: Duration = Duration.ZERO) { val player = node.getCachedPlayer(guildId) if (player != null) { node.removeCachedPlayer(guildId) newNode.createOrUpdatePlayer(guildId) .applyBuilder(player.stateToBuilder()) - // Delay by 500ms to hopefully prevent a race-condition from triggering - .delayElement(Duration.ofMillis(500)) + .delaySubscription(delay) .subscribe() } diff --git a/testbot/src/main/java/me/duncte123/testbot/Main.java b/testbot/src/main/java/me/duncte123/testbot/Main.java index 69bd5f8..e76cf15 100644 --- a/testbot/src/main/java/me/duncte123/testbot/Main.java +++ b/testbot/src/main/java/me/duncte123/testbot/Main.java @@ -5,6 +5,7 @@ import dev.arbjerg.lavalink.client.loadbalancing.builtin.VoiceRegionPenaltyProvider; import dev.arbjerg.lavalink.libraries.jda.JDAVoiceUpdateListener; import net.dv8tion.jda.api.JDABuilder; +import net.dv8tion.jda.api.entities.channel.unions.AudioChannelUnion; import net.dv8tion.jda.api.requests.GatewayIntent; import net.dv8tion.jda.api.utils.cache.CacheFlag; import org.slf4j.Logger; @@ -16,6 +17,8 @@ public class Main { private static final Logger LOG = LoggerFactory.getLogger(Main.class); + private static final int SESSION_INVALID = 4006; + public static void main(String[] args) throws InterruptedException { final var token = System.getenv("BOT_TOKEN"); final LavalinkClient client = new LavalinkClient(Helpers.getUserIdFromToken(token)); @@ -25,15 +28,35 @@ public static void main(String[] args) throws InterruptedException { registerLavalinkListeners(client); registerLavalinkNodes(client); - JDABuilder.createDefault(token) + final var jda = JDABuilder.createDefault(token) .setVoiceDispatchInterceptor(new JDAVoiceUpdateListener(client)) .enableIntents(GatewayIntent.GUILD_VOICE_STATES) .enableCache(CacheFlag.VOICE_STATE) .addEventListeners(new JDAListener(client)) .build() .awaitReady(); - } + // Got a lot of 4006 closecodes? Try this "fix" + client.on(WebSocketClosedEvent.class).subscribe((event) -> { + if (event.getCode() == SESSION_INVALID) { + final var guildId = event.getGuildId(); + final var guild = jda.getGuildById(guildId); + + if (guild == null) { + return; + } + + final var connectedChannel = guild.getSelfMember().getVoiceState().getChannel(); + + // somehow + if (connectedChannel == null) { + return; + } + + jda.getDirectAudioController().reconnect(connectedChannel); + } + }); + } private static void registerLavalinkNodes(LavalinkClient client) {