Skip to content

Commit

Permalink
Make links and nodes thread-safe + add a getter for links
Browse files Browse the repository at this point in the history
  • Loading branch information
duncte123 committed Feb 1, 2024
1 parent 63aa962 commit b428c13
Showing 1 changed file with 19 additions and 8 deletions.
27 changes: 19 additions & 8 deletions src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,26 @@ import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
import java.io.Closeable
import java.net.URI
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

/**
* @param userId ID of the bot for authenticating with Discord
*/
class LavalinkClient(val userId: Long) : Closeable, Disposable {
private val internalNodes = mutableListOf<LavalinkNode>()
private val links = mutableMapOf<Long, Link>()
private val internalNodes = CopyOnWriteArrayList<LavalinkNode>()
private val linkMap = ConcurrentHashMap<Long, Link>()
private var clientOpen = true

// Immutable public list
val nodes: List<LavalinkNode>
get() = internalNodes.toList()

val links: List<Link>
get() = linkMap.values.toList()

// Events forwarded from all nodes.
private val sink: Sinks.Many<ClientEvent<*>> = Sinks.many().multicast().onBackpressureBuffer()
val flux: Flux<ClientEvent<*>> = sink.asFlux()
Expand Down Expand Up @@ -70,6 +75,9 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
return node
}

/**
* Remove a node by its [name].
*/
fun removeNode(name: String): Boolean {
val node = nodes.firstOrNull { it.name == name }

Expand All @@ -80,6 +88,9 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
return removeNode(node)
}

/**
* Disconnect and remove a node the client.
*/
fun removeNode(node: LavalinkNode): Boolean {
if (node !in internalNodes) {
return false
Expand All @@ -100,19 +111,19 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
*/
@JvmOverloads
fun getLink(guildId: Long, region: VoiceRegion? = null): Link {
if (guildId !in links) {
if (!linkMap.containsKey(guildId)) {
val bestNode = loadBalancer.selectNode(region)
links[guildId] = Link(guildId, bestNode)
linkMap[guildId] = Link(guildId, bestNode)
}

return links[guildId]!!
return linkMap[guildId]!!
}

/**
* Returns a [Link] if it exists in the cache.
* If we select a link for voice updates, we don't know the region yet.
*/
fun getLinkIfCached(guildId: Long): Link? = links[guildId]
fun getLinkIfCached(guildId: Long): Link? = linkMap[guildId]

internal fun onNodeDisconnected(node: LavalinkNode) {
// Don't do anything if we are shutting down.
Expand All @@ -121,13 +132,13 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
}

if (nodes.size == 1) {
links.forEach { (_, link) ->
linkMap.forEach { (_, link) ->
link.state = LinkState.DISCONNECTED
}
return
}

links.forEach { (_, link) ->
linkMap.forEach { (_, link) ->
if (link.node == node) {
link.transferNode(loadBalancer.selectNode(region = null))
}
Expand Down

0 comments on commit b428c13

Please sign in to comment.