Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Further improve stream decoding performance #2101

Merged
merged 3 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@ public fun <T> Json.decodeFromStream(
deserializer: DeserializationStrategy<T>,
stream: InputStream
): T {
return decodeByReader(deserializer, JavaStreamSerialReader(stream))
val reader = JavaStreamSerialReader(stream)
try {
return decodeByReader(deserializer, reader)
} finally {
reader.release()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@
*/
package kotlinx.serialization.json.internal

import java.util.concurrent.*
/*
* Not really documented kill switch as a workaround for potential
* (unlikely) problems with memory consumptions.
*/
private val MAX_CHARS_IN_POOL = runCatching {
System.getProperty("kotlinx.serialization.json.pool.size").toIntOrNull()
}.getOrNull() ?: 2 * 1024 * 1024

internal open class CharArrayPoolBase {
private val arrays = ArrayDeque<CharArray>()
private var charsTotal = 0

/*
* Not really documented kill switch as a workaround for potential
* (unlikely) problems with memory consumptions.
*/
private val MAX_CHARS_IN_POOL = runCatching {
System.getProperty("kotlinx.serialization.json.pool.size").toIntOrNull()
}.getOrNull() ?: 1024 * 1024 // 2 MB seems to be a reasonable constraint, (1M of chars)

protected fun take(size: Int): CharArray {
/*
* Initially the pool is empty, so an instance will be allocated
Expand Down Expand Up @@ -52,3 +50,40 @@ internal actual object CharArrayPoolBatchSize : CharArrayPoolBase() {
releaseImpl(array)
}
}

// Byte array pool

internal open class ByteArrayPoolBase {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Rewrite this code to remove many synchronized calls and speed up code?

internal class ByteArrayPoolBase(val size: Int) {

// new pool instance in every kotlin object. Not need super.take and protected overload or change to glocal static array / kotlin object
      private val arrays = ArrayDeque<kotlin.ByteArray>()
      private val bufferSize =size / 2
// bytesCount is private and not used in code 

 fun take(): ByteArray =
        if (array.isEmpty) { // try check for empty without synchronized
               ByteArray(size)
        } else {
            synchronized(this) {
                arrays.removeLastOrNull() 
            } ?: ByteArray(size)
        }        

}
    fun release(array: ByteArray): Unit {
             // arrays.size atomic. But If will be data race we temporary add +1 element to poll. synchronized not needed.  MAX_CHARS_IN_POOL pool size for chars. is for bytebuffer is same?
             if (arrays.size * bufferSize  < MAX_CHARS_IN_POOL) 
                 synchronized(this) {
                       arrays.addLast(array)
                }
             
    }


}

internal object ByteArrayPool8k : ByteArrayPoolBase(8196)
internal object ByteArrayPool : ByteArrayPoolBase(512)


Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also ByteArray, CharArray can replace with template T :)

private val arrays = ArrayDeque<kotlin.ByteArray>()
private var bytesTotal = 0

protected fun take(size: Int): ByteArray {
/*
* Initially the pool is empty, so an instance will be allocated
* and the pool will be populated in the 'release'
*/
val candidate = synchronized(this) {
arrays.removeLastOrNull()?.also { bytesTotal -= it.size / 2 }
}
return candidate ?: ByteArray(size)
}

protected fun releaseImpl(array: ByteArray): Unit = synchronized(this) {
if (bytesTotal + array.size >= MAX_CHARS_IN_POOL) return@synchronized
bytesTotal += array.size / 2
arrays.addLast(array)
}
}

internal object ByteArrayPool8k : ByteArrayPoolBase() {
fun take(): ByteArray = super.take(8196)

fun release(array: ByteArray) = releaseImpl(array)
}


internal object ByteArrayPool : ByteArrayPoolBase() {
fun take(): ByteArray = super.take(512)

fun release(array: ByteArray) = releaseImpl(array)
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package kotlinx.serialization.json.internal

import java.io.*
import java.nio.*
import java.nio.charset.*

internal class CharsetReader(
private val inputStream: InputStream,
private val charset: Charset
) {
private val decoder: CharsetDecoder
private val byteBuffer: ByteBuffer

// Surrogate-handling in cases when a single char is requested, but two were read
private var hasLeftoverPotentiallySurrogateChar = false
private var leftoverChar = 0.toChar()

init {
decoder = charset.newDecoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE)
byteBuffer = ByteBuffer.wrap(ByteArrayPool8k.take())
byteBuffer.flip() // Make empty
}

@Suppress("NAME_SHADOWING")
fun read(array: CharArray, offset: Int, length: Int): Int {
if (length == 0) return 0
require(offset in 0 until array.size && length >= 0 && offset + length <= array.size) {
"Unexpected arguments: $offset, $length, ${array.size}"
}

var offset = offset
var length = length
var bytesRead = 0
if (hasLeftoverPotentiallySurrogateChar) {
array[offset] = leftoverChar
offset++
length--
hasLeftoverPotentiallySurrogateChar = false
bytesRead = 1
if (length == 0) return bytesRead
}
if (length == 1) {
// Treat single-character array reads just like read()
val c = oneShotReadSlowPath()
if (c == -1) return if (bytesRead == 0) -1 else bytesRead
array[offset] = c.toChar()
return bytesRead + 1
}
return doRead(array, offset, length) + bytesRead
}

private fun doRead(array: CharArray, offset: Int, length: Int): Int {
var charBuffer = CharBuffer.wrap(array, offset, length)
if (charBuffer.position() != 0) {
charBuffer = charBuffer.slice()
}
var isEof = false
while (true) {
val cr = decoder.decode(byteBuffer, charBuffer, isEof)
if (cr.isUnderflow) {
if (isEof) break
if (!charBuffer.hasRemaining()) break
val n = fillByteBuffer()
if (n < 0) {
isEof = true
if (charBuffer.position() == 0 && !byteBuffer.hasRemaining()) break
decoder.reset()
}
continue
}
if (cr.isOverflow) {
assert(charBuffer.position() > 0)
break
}
cr.throwException()
}
if (isEof) decoder.reset()
return if (charBuffer.position() == 0) -1
else charBuffer.position()
}

private fun fillByteBuffer(): Int {
byteBuffer.compact()
try {
// Read from the input stream, and then update the buffer
val limit = byteBuffer.limit()
val position = byteBuffer.position()
val remaining = if (position <= limit) limit - position else 0
val bytesRead = inputStream.read(byteBuffer.array(), byteBuffer.arrayOffset() + position, remaining)
if (bytesRead < 0) return bytesRead
byteBuffer.position(position + bytesRead)
} finally {
byteBuffer.flip()
}
return byteBuffer.remaining()
}

private fun oneShotReadSlowPath(): Int {
// Return the leftover char, if there is one
if (hasLeftoverPotentiallySurrogateChar) {
hasLeftoverPotentiallySurrogateChar = false
return leftoverChar.code
}

val array = CharArray(2)
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
val bytesRead = read(array, 0, 2)
return when (bytesRead) {
-1 -> -1
1 -> array[0].code
2 -> {
leftoverChar = array[1]
hasLeftoverPotentiallySurrogateChar = true
array[0].code
}
else -> error("Unreachable state: $bytesRead")
}
}

public fun release() {
ByteArrayPool8k.release(byteBuffer.array())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kotlinx.serialization.json.internal

import java.io.InputStream
import java.io.OutputStream
import java.nio.charset.Charset

internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWriter {
private val buffer = ByteArrayPool.take()
Expand Down Expand Up @@ -255,9 +254,14 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr
}

internal class JavaStreamSerialReader(stream: InputStream) : SerialReader {
private val reader = stream.reader(Charsets.UTF_8)
// NB: not closed on purpose, it is the responsibility of the caller
private val reader = CharsetReader(stream, Charsets.UTF_8)

override fun read(buffer: CharArray, bufferOffset: Int, count: Int): Int {
return reader.read(buffer, bufferOffset, count)
}

fun release() {
reader.release()
}
}