Skip to content

Commit

Permalink
Merge branch 'main' into remove-deprecated
Browse files Browse the repository at this point in the history
  • Loading branch information
duncte123 committed Mar 10, 2024
2 parents 7a22b8c + 3047756 commit e5a6e7e
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 27 deletions.
14 changes: 8 additions & 6 deletions src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import reactor.core.Disposable
import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
import java.io.Closeable
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.Executors
Expand All @@ -30,8 +31,8 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
get() = linkMap.values.toList()

// Events forwarded from all nodes.
private val sink: Sinks.Many<ClientEvent<*>> = Sinks.many().multicast().onBackpressureBuffer()
val flux: Flux<ClientEvent<*>> = sink.asFlux()
private val sink: Sinks.Many<ClientEvent> = Sinks.many().multicast().onBackpressureBuffer()
val flux: Flux<ClientEvent> = sink.asFlux()
private val reference: Disposable = flux.subscribe()

/**
Expand Down Expand Up @@ -170,7 +171,8 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
val voiceRegion = link.cachedPlayer?.voiceRegion

link.state = LinkState.CONNECTING
link.transferNode(loadBalancer.selectNode(region = voiceRegion))
// The delay is used to prevent a race condition in Discord, causing close code 4006
link.transferNode(loadBalancer.selectNode(region = voiceRegion), delay = Duration.ofMillis(1000))
}
}
}
Expand All @@ -183,7 +185,7 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
*
* @return a [Flux] of [ClientEvent]s
*/
fun <T : ClientEvent<*>> on(type: Class<T>): Flux<T> {
fun <T : ClientEvent> on(type: Class<T>): Flux<T> {
return flux.ofType(type)
}

Expand All @@ -192,7 +194,7 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
*
* @return a [Flux] of [ClientEvent]s
*/
inline fun <reified T : ClientEvent<*>> on() = on(T::class.java)
inline fun <reified T : ClientEvent> on() = on(T::class.java)

/**
* Close the client and disconnect all nodes.
Expand All @@ -213,7 +215,7 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
}

private fun listenForNodeEvent(node: LavalinkNode) {
node.on<ClientEvent<Message>>()
node.on<ClientEvent>()
.subscribe {
try {
sink.tryEmitNext(it)
Expand Down
8 changes: 4 additions & 4 deletions src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class LavalinkNode(

internal val httpClient = OkHttpClient.Builder().callTimeout(nodeOptions.httpTimeout, TimeUnit.MILLISECONDS).build()

internal val sink: Many<ClientEvent<*>> = Sinks.many().multicast().onBackpressureBuffer()
val flux: Flux<ClientEvent<*>> = sink.asFlux()
internal val sink: Many<ClientEvent> = Sinks.many().multicast().onBackpressureBuffer()
val flux: Flux<ClientEvent> = sink.asFlux()
private val reference: Disposable = flux.subscribe()

internal val rest = LavalinkRestClient(this)
Expand Down Expand Up @@ -89,7 +89,7 @@ class LavalinkNode(
*
* @return a [Flux] of [ClientEvent]s
*/
fun <T : ClientEvent<*>> on(type: Class<T>): Flux<T> {
fun <T : ClientEvent> on(type: Class<T>): Flux<T> {
return flux.ofType(type)
}

Expand All @@ -98,7 +98,7 @@ class LavalinkNode(
*
* @return a [Flux] of [ClientEvent]s
*/
inline fun <reified T : ClientEvent<*>> on() = on(T::class.java)
inline fun <reified T : ClientEvent> on() = on(T::class.java)

/**
* Retrieves a list of all players from the lavalink node.
Expand Down
5 changes: 2 additions & 3 deletions src/main/kotlin/dev/arbjerg/lavalink/client/Link.kt
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,14 @@ class Link(
*/
fun loadItem(identifier: String) = node.loadItem(identifier)

internal fun transferNode(newNode: LavalinkNode) {
internal fun transferNode(newNode: LavalinkNode, delay: Duration = Duration.ZERO) {
val player = node.getCachedPlayer(guildId)

if (player != null) {
node.removeCachedPlayer(guildId)
newNode.createOrUpdatePlayer(guildId)
.applyBuilder(player.stateToBuilder())
// Delay by 500ms to hopefully prevent a race-condition from triggering
.delayElement(Duration.ofMillis(500))
.delaySubscription(delay)
.subscribe()
}

Expand Down
22 changes: 11 additions & 11 deletions src/main/kotlin/dev/arbjerg/lavalink/client/events.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ internal fun Message.toClientEvent(node: LavalinkNode) = when (this) {
is Message.StatsEvent -> StatsEvent(node, frameStats, players, playingPlayers, uptime, memory, cpu)
}

sealed class ClientEvent<T : Message>(open val node: LavalinkNode)
sealed class ClientEvent(open val node: LavalinkNode)

// Normal events
data class ReadyEvent(override val node: LavalinkNode, val resumed: Boolean, val sessionId: String)
: ClientEvent<Message.ReadyEvent>(node)
: ClientEvent(node)

data class PlayerUpdateEvent(override val node: LavalinkNode, val guildId: Long, val state: PlayerState)
: ClientEvent<Message.PlayerUpdateEvent>(node)
: ClientEvent(node)

data class StatsEvent(
override val node: LavalinkNode,
Expand All @@ -34,23 +34,23 @@ data class StatsEvent(
val uptime: Long,
val memory: Memory,
val cpu: Cpu
) : ClientEvent<Message.StatsEvent>(node)
) : ClientEvent(node)

// Player events
sealed class EmittedEvent<T : Message.EmittedEvent>(override val node: LavalinkNode, open val guildId: Long)
: ClientEvent<T>(node)
sealed class EmittedEvent(override val node: LavalinkNode, open val guildId: Long)
: ClientEvent(node)

data class TrackStartEvent(override val node: LavalinkNode, override val guildId: Long, val track: Track)
: EmittedEvent<Message.EmittedEvent.TrackStartEvent>(node, guildId)
: EmittedEvent(node, guildId)

data class TrackEndEvent(override val node: LavalinkNode, override val guildId: Long, val track: Track, val endReason: AudioTrackEndReason)
: EmittedEvent<Message.EmittedEvent.TrackEndEvent>(node, guildId)
: EmittedEvent(node, guildId)

data class TrackExceptionEvent(override val node: LavalinkNode, override val guildId: Long, val track: Track, val exception: TrackException)
: EmittedEvent<Message.EmittedEvent.TrackExceptionEvent>(node, guildId)
: EmittedEvent(node, guildId)

data class TrackStuckEvent(override val node: LavalinkNode, override val guildId: Long, val track: Track, val thresholdMs: Long)
: EmittedEvent<Message.EmittedEvent.TrackStuckEvent>(node, guildId)
: EmittedEvent(node, guildId)

data class WebSocketClosedEvent(override val node: LavalinkNode, override val guildId: Long, val code: Int, val reason: String, val byRemote: Boolean)
: EmittedEvent<Message.EmittedEvent.WebSocketClosedEvent>(node, guildId)
: EmittedEvent(node, guildId)
2 changes: 1 addition & 1 deletion src/test/kotlin/testScript.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fun main() {
println("[event 2] Node '${event.node.name}' has stats, current players: ${event.playingPlayers}/${event.players}")
}

client.on<EmittedEvent<*>>()
client.on<EmittedEvent>()
.subscribe { event ->
if (event is TrackStartEvent) {
println("Is a track start event!")
Expand Down
27 changes: 25 additions & 2 deletions testbot/src/main/java/me/duncte123/testbot/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dev.arbjerg.lavalink.client.loadbalancing.builtin.VoiceRegionPenaltyProvider;
import dev.arbjerg.lavalink.libraries.jda.JDAVoiceUpdateListener;
import net.dv8tion.jda.api.JDABuilder;
import net.dv8tion.jda.api.entities.channel.unions.AudioChannelUnion;
import net.dv8tion.jda.api.requests.GatewayIntent;
import net.dv8tion.jda.api.utils.cache.CacheFlag;
import org.slf4j.Logger;
Expand All @@ -15,6 +16,8 @@
public class Main {
private static final Logger LOG = LoggerFactory.getLogger(Main.class);

private static final int SESSION_INVALID = 4006;

public static void main(String[] args) throws InterruptedException {
final var token = System.getenv("BOT_TOKEN");
final LavalinkClient client = new LavalinkClient(Helpers.getUserIdFromToken(token));
Expand All @@ -24,15 +27,35 @@ public static void main(String[] args) throws InterruptedException {
registerLavalinkListeners(client);
registerLavalinkNodes(client);

JDABuilder.createDefault(token)
final var jda = JDABuilder.createDefault(token)
.setVoiceDispatchInterceptor(new JDAVoiceUpdateListener(client))
.enableIntents(GatewayIntent.GUILD_VOICE_STATES)
.enableCache(CacheFlag.VOICE_STATE)
.addEventListeners(new JDAListener(client))
.build()
.awaitReady();
}

// Got a lot of 4006 closecodes? Try this "fix"
client.on(WebSocketClosedEvent.class).subscribe((event) -> {
if (event.getCode() == SESSION_INVALID) {
final var guildId = event.getGuildId();
final var guild = jda.getGuildById(guildId);

if (guild == null) {
return;
}

final var connectedChannel = guild.getSelfMember().getVoiceState().getChannel();

// somehow
if (connectedChannel == null) {
return;
}

jda.getDirectAudioController().reconnect(connectedChannel);
}
});
}


private static void registerLavalinkNodes(LavalinkClient client) {
Expand Down

0 comments on commit e5a6e7e

Please sign in to comment.