Skip to content
This repository has been archived by the owner on May 9, 2023. It is now read-only.

Commit

Permalink
Add Bytes Serde (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdulac authored Oct 31, 2022
1 parent 4ce24be commit 883649e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
10 changes: 6 additions & 4 deletions src/main/scala/zio/kafka/serde/Serdes.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package zio.kafka.serde

import java.nio.ByteBuffer
import java.util.UUID

import org.apache.kafka.common.serialization.{ Serde => KafkaSerde, Serdes => KafkaSerdes }
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.serialization.{ Serde => KafkaSerde, Serdes => KafkaSerdes }
import org.apache.kafka.common.utils.Bytes
import zio.{ RIO, Task }

import java.nio.ByteBuffer
import java.util.UUID

private[zio] trait Serdes {
lazy val long: Serde[Any, Long] = convertPrimitiveSerde(KafkaSerdes.Long()).inmap(Long2long)(long2Long)
lazy val int: Serde[Any, Int] = convertPrimitiveSerde(KafkaSerdes.Integer()).inmap(Integer2int)(int2Integer)
Expand All @@ -15,6 +16,7 @@ private[zio] trait Serdes {
lazy val double: Serde[Any, Double] = convertPrimitiveSerde(KafkaSerdes.Double()).inmap(Double2double)(double2Double)
lazy val string: Serde[Any, String] = convertPrimitiveSerde(KafkaSerdes.String())
lazy val byteArray: Serde[Any, Array[Byte]] = convertPrimitiveSerde(KafkaSerdes.ByteArray())
lazy val bytes: Serde[Any, Bytes] = convertPrimitiveSerde(KafkaSerdes.Bytes())
lazy val byteBuffer: Serde[Any, ByteBuffer] = convertPrimitiveSerde(KafkaSerdes.ByteBuffer())
lazy val uuid: Serde[Any, UUID] = convertPrimitiveSerde(KafkaSerdes.UUID())

Expand Down
4 changes: 4 additions & 0 deletions src/test/scala/zio/kafka/serde/SerdeSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import zio.test._
import scala.reflect.ClassTag

object SerdeSpec extends DefaultRunnableSpec {

private val anyBytes = Gen.listOf(Gen.anyByte).map(bytes => new org.apache.kafka.common.utils.Bytes(bytes.toArray))

override def spec = suite("Serde")(
testSerde(Serde.string, Gen.anyString),
testSerde(Serde.int, Gen.anyInt),
Expand All @@ -15,6 +18,7 @@ object SerdeSpec extends DefaultRunnableSpec {
testSerde(Serde.double, Gen.anyDouble),
testSerde(Serde.long, Gen.anyLong),
testSerde(Serde.uuid, Gen.anyUUID),
testSerde(Serde.bytes, anyBytes),
testSerde(Serde.byteArray, Gen.listOf(Gen.anyByte).map(_.toArray)),
suite("asOption")(
testM("serialize and deserialize None values to null and visa versa") {
Expand Down

0 comments on commit 883649e

Please sign in to comment.