Skip to content

Commit

Permalink
update node penalty naming to be more inline with function, break it …
Browse files Browse the repository at this point in the history
…out into an interface to allow for overriding.
  • Loading branch information
kikkia committed Mar 27, 2024
1 parent c345d20 commit 76103de
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 8 deletions.
5 changes: 3 additions & 2 deletions src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import dev.arbjerg.lavalink.client.player.Track
import dev.arbjerg.lavalink.client.player.toCustom
import dev.arbjerg.lavalink.internal.*
import dev.arbjerg.lavalink.internal.error.RestException
import dev.arbjerg.lavalink.internal.loadbalancing.Penalties
import dev.arbjerg.lavalink.client.loadbalancing.builtin.DefaultNodeHealthProvider
import dev.arbjerg.lavalink.client.loadbalancing.builtin.INodeHealthProvider
import dev.arbjerg.lavalink.internal.toLavalinkPlayer
import dev.arbjerg.lavalink.protocol.v4.*
import kotlinx.serialization.DeserializationStrategy
Expand Down Expand Up @@ -57,7 +58,7 @@ class LavalinkNode(
val ws = LavalinkSocket(this)

// Stuff for load balancing
val penalties = Penalties(this)
var nodeHealth: INodeHealthProvider = nodeOptions.nodeHealthProvider ?: DefaultNodeHealthProvider(this)
var stats: Stats? = null
internal set
var available: Boolean = false
Expand Down
9 changes: 9 additions & 0 deletions src/main/kotlin/dev/arbjerg/lavalink/client/NodeOptions.kt
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package dev.arbjerg.lavalink.client

import dev.arbjerg.lavalink.client.loadbalancing.IRegionFilter
import dev.arbjerg.lavalink.client.loadbalancing.builtin.INodeHealthProvider
import dev.arbjerg.lavalink.internal.TIMEOUT_MS
import java.net.URI

data class NodeOptions private constructor(val name: String,
val serverUri: URI,
val password: String,
val regionFilter: IRegionFilter?,
val nodeHealthProvider: INodeHealthProvider?,
val httpTimeout: Long) {
data class Builder(
private var name: String? = null,
private var serverUri: URI? = null,
private var password: String? = null,
private var regionFilter: IRegionFilter? = null,
private var nodeHealthProvider: INodeHealthProvider? = null,
private var httpTimeout: Long = TIMEOUT_MS,
) {
fun setName(name: String) = apply { this.name = name }
Expand All @@ -39,6 +42,11 @@ data class NodeOptions private constructor(val name: String,
*/
fun setRegionFilter(regionFilter: IRegionFilter?) = apply { this.regionFilter = regionFilter }

/**
* Sets a custom node health provider for the node. Used in loadbalancing/traffic routing (Default: none)
*/
fun setNodeHealthProvider(nodeHealthProvider: INodeHealthProvider?) = apply { this.nodeHealthProvider = nodeHealthProvider }

/**
* Sets the http total call timeout. (Default: 10000ms)
* @param httpTimeout - timeout in ms
Expand All @@ -55,6 +63,7 @@ data class NodeOptions private constructor(val name: String,
serverUri!!,
password!!,
regionFilter,
nodeHealthProvider,
httpTimeout)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class DefaultLoadBalancer(private val client: LavalinkClient) : ILoadBalancer {

// TODO: Probably should enforce that no nodes go above the max
return nodes.filter { it.available }.minByOrNull { node ->
node.penalties.calculateTotal() + penaltyProviders.sumOf { it.getPenalty(node, region) }
node.nodeHealth.calculateTotalHealthPenalty() + penaltyProviders.sumOf { it.getPenalty(node, region) }
} ?: throw IllegalStateException("No available nodes!")
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package dev.arbjerg.lavalink.internal.loadbalancing
package dev.arbjerg.lavalink.client.loadbalancing.builtin

import dev.arbjerg.lavalink.client.LavalinkNode
import dev.arbjerg.lavalink.client.loadbalancing.MAX_ERROR
import dev.arbjerg.lavalink.internal.loadbalancing.MetricService
import dev.arbjerg.lavalink.internal.loadbalancing.MetricType
import dev.arbjerg.lavalink.protocol.v4.Message
import kotlin.math.pow

Expand All @@ -13,10 +15,10 @@ import kotlin.math.pow
// Tracks stuck per minute
// Track exceptions per minute
// loads failed per minute
data class Penalties(val node: LavalinkNode) {
data class DefaultNodeHealthProvider(val node: LavalinkNode): INodeHealthProvider {
private val metricService = MetricService()

fun handleTrackEvent(event: Message.EmittedEvent) {
override fun handleTrackEvent(event: Message.EmittedEvent) {
when (event) {
is Message.EmittedEvent.TrackStartEvent -> {
metricService.trackMetric(MetricType.LOAD_ATTEMPT)
Expand All @@ -42,7 +44,7 @@ data class Penalties(val node: LavalinkNode) {
}
}

fun calculateTotal(): Int {
override fun calculateTotalHealthPenalty(): Int {
val stats = node.stats

if (!node.available || stats == null) {
Expand Down Expand Up @@ -87,4 +89,13 @@ data class Penalties(val node: LavalinkNode) {

return playerPenalty + cpuPenalty + deficitFramePenalty + nullFramePenalty + trackStuckPenalty + trackExceptionPenalty + loadFailedPenalty
}

override fun isHealthy(): Boolean {
val metrics = metricService.getCurrentMetrics()
val loadsAttempted = metrics[MetricType.LOAD_ATTEMPT] ?: 0
val loadsFailed = metrics[MetricType.LOAD_FAILED] ?: 0

// When the node fails to load anything, we consider it to be unhealthy
return !(loadsAttempted > 0 && loadsAttempted == loadsFailed) && node.available
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package dev.arbjerg.lavalink.client.loadbalancing.builtin

import dev.arbjerg.lavalink.client.loadbalancing.MAX_ERROR
import dev.arbjerg.lavalink.protocol.v4.Message

interface INodeHealthProvider {
/**
* Called for each event on the node.
*/
fun handleTrackEvent(event: Message.EmittedEvent)

/**
* Calculate the penalty for the node based off of its health.
*
* Return value should never exceed [MAX_ERROR]. Lower means to take preference.
*
* @return A number between 0 and [MAX_ERROR] (inclusive), using numbers outside of this range may cause errors.
*/
fun calculateTotalHealthPenalty(): Int

/**
* Gives a simple answer if the node is considered healthy.
*
* @return true if the node is in a healthy state
*/
fun isHealthy() : Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class LavalinkSocket(private val node: LavalinkNode) : WebSocketListener(), Clos
else -> {}
}

node.penalties.handleTrackEvent(event)
node.nodeHealth.handleTrackEvent(event)
}

else -> {
Expand Down

0 comments on commit 76103de

Please sign in to comment.