Skip to content

Commit

Permalink
Pool byte arrays for charset decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
qwwdfsad committed Nov 24, 2022
1 parent 14247a2 commit 0dea861
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@ public fun <T> Json.decodeFromStream(
deserializer: DeserializationStrategy<T>,
stream: InputStream
): T {
return decodeByReader(deserializer, JavaStreamSerialReader(stream))
val reader = JavaStreamSerialReader(stream)
try {
return decodeByReader(deserializer, reader)
} finally {
reader.release()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@
*/
package kotlinx.serialization.json.internal

import java.util.concurrent.*
/*
* Not really documented kill switch as a workaround for potential
* (unlikely) problems with memory consumptions.
*/
private val MAX_CHARS_IN_POOL = runCatching {
System.getProperty("kotlinx.serialization.json.pool.size").toIntOrNull()
}.getOrNull() ?: 1024 * 1024

internal open class CharArrayPoolBase {
private val arrays = ArrayDeque<CharArray>()
private var charsTotal = 0

/*
* Not really documented kill switch as a workaround for potential
* (unlikely) problems with memory consumptions.
*/
private val MAX_CHARS_IN_POOL = runCatching {
System.getProperty("kotlinx.serialization.json.pool.size").toIntOrNull()
}.getOrNull() ?: 1024 * 1024 // 2 MB seems to be a reasonable constraint, (1M of chars)

protected fun take(size: Int): CharArray {
/*
* Initially the pool is empty, so an instance will be allocated
Expand Down Expand Up @@ -52,3 +50,40 @@ internal actual object CharArrayPoolBatchSize : CharArrayPoolBase() {
releaseImpl(array)
}
}

// Byte array pool

internal open class ByteArrayPoolBase {
private val arrays = ArrayDeque<kotlin.ByteArray>()
private var bytesTotal = 0

protected fun take(size: Int): ByteArray {
/*
* Initially the pool is empty, so an instance will be allocated
* and the pool will be populated in the 'release'
*/
val candidate = synchronized(this) {
arrays.removeLastOrNull()?.also { bytesTotal -= it.size / 2 }
}
return candidate ?: ByteArray(size)
}

protected fun releaseImpl(array: ByteArray): Unit = synchronized(this) {
if (bytesTotal + array.size >= MAX_CHARS_IN_POOL) return@synchronized
bytesTotal += array.size / 2
arrays.addLast(array)
}
}

internal object ByteArrayPool8k : ByteArrayPoolBase() {
fun take(): ByteArray = super.take(8196)

fun release(array: ByteArray) = releaseImpl(array)
}


internal object ByteArrayPool : ByteArrayPoolBase() {
fun take(): ByteArray = super.take(128)

fun release(array: ByteArray) = releaseImpl(array)
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal class CharsetReader(
decoder = charset.newDecoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE)
byteBuffer = ByteBuffer.allocate(32)
byteBuffer = ByteBuffer.wrap(ByteArrayPool8k.take())
byteBuffer.flip() // Make empty
}

Expand Down Expand Up @@ -117,4 +117,8 @@ internal class CharsetReader(
else -> error("Unreachable state: $bytesRead")
}
}

public fun release() {
ByteArrayPool8k.release(byteBuffer.array())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,14 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr
}

internal class JavaStreamSerialReader(stream: InputStream) : SerialReader {
// NB: not closed on purpose, it is responsibility of the caller
// NB: not closed on purpose, it is the responsibility of the caller
private val reader = CharsetReader(stream, Charsets.UTF_8)

override fun read(buffer: CharArray, bufferOffset: Int, count: Int): Int {
return reader.read(buffer, bufferOffset, count)
}

fun release() {
reader.release()
}
}

0 comments on commit 0dea861

Please sign in to comment.