Skip to content

Commit

Permalink
Fix a memory leak in json tuple (#10403)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
(cherry picked from commit 73b3279)

Co-authored-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
gerashegalov and revans2 authored Feb 10, 2024
1 parent 2c969ad commit 09d4fd6
Showing 1 changed file with 20 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,29 @@ case class GpuJsonTuple(children: Seq[Expression]) extends GpuGenerator
generatorOffset: Int,
outer: Boolean): Iterator[ColumnarBatch] = {
withRetry(inputBatches, splitSpillableInHalfByRows) { attempt =>
// this is obviously broken
val inputBatch = attempt.getColumnarBatch()

val json = inputBatch.column(generatorOffset).asInstanceOf[GpuColumnVector].getBase
val schema = Array.fill[DataType](fieldExpressions.length)(StringType)

val fieldScalars = fieldExpressions.safeMap { field =>
withResourceIfAllowed(field.columnarEvalAny(inputBatch)) {
case fieldScalar: GpuScalar =>
// Specials characters like '.', '[', ']' are not supported in field names
Scalar.fromString("$." + fieldScalar.getBase.getJavaString)
case _ => throw new UnsupportedOperationException(s"JSON field must be a scalar value")
withResource(attempt.getColumnarBatch()) { inputBatch =>
val json = inputBatch.column(generatorOffset).asInstanceOf[GpuColumnVector].getBase
val schema = Array.fill[DataType](fieldExpressions.length)(StringType)

val fieldScalars = fieldExpressions.safeMap { field =>
withResourceIfAllowed(field.columnarEvalAny(inputBatch)) {
case fieldScalar: GpuScalar =>
// Specials characters like '.', '[', ']' are not supported in field names
Scalar.fromString("$." + fieldScalar.getBase.getJavaString)
case _ => throw new UnsupportedOperationException(s"JSON field must be a scalar value")
}
}
}

withResource(fieldScalars) { fieldScalars =>
withResource(fieldScalars.safeMap(field => json.getJSONObject(field))) { resultCols =>
val generatorCols = resultCols.safeMap(_.incRefCount).zip(schema).safeMap {
case (col, dataType) => GpuColumnVector.from(col, dataType)
}
val nonGeneratorCols = (0 until generatorOffset).safeMap { i =>
inputBatch.column(i).asInstanceOf[GpuColumnVector].incRefCount
withResource(fieldScalars) { fieldScalars =>
withResource(fieldScalars.safeMap(field => json.getJSONObject(field))) { resultCols =>
val generatorCols = resultCols.safeMap(_.incRefCount).zip(schema).safeMap {
case (col, dataType) => GpuColumnVector.from(col, dataType)
}
val nonGeneratorCols = (0 until generatorOffset).safeMap { i =>
inputBatch.column(i).asInstanceOf[GpuColumnVector].incRefCount
}
new ColumnarBatch((nonGeneratorCols ++ generatorCols).toArray, inputBatch.numRows)
}
new ColumnarBatch((nonGeneratorCols ++ generatorCols).toArray, inputBatch.numRows)
}
}
}
Expand Down

0 comments on commit 09d4fd6

Please sign in to comment.