Skip to content

Commit

Permalink
feat: avro endpoint
Browse files Browse the repository at this point in the history
An endpoint that returns a serialized payload from generated data, based
on the schema providedd
  • Loading branch information
jgiovaresco committed Mar 2, 2023
1 parent 87a07a4 commit da8ac55
Show file tree
Hide file tree
Showing 12 changed files with 431 additions and 9 deletions.
12 changes: 12 additions & 0 deletions app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ plugins {

repositories {
mavenCentral()
maven {
url = uri("https://packages.confluent.io/maven/")
name = "Confluent"
content {
includeGroup("io.confluent")
includeGroup("org.apache.kafka")
}
}
}

scmVersion {
Expand Down Expand Up @@ -47,6 +55,10 @@ dependencies {
implementation(libs.bundles.grpc)
implementation(libs.bundles.logback)
implementation(libs.bundles.rx)

implementation(libs.avro)
implementation(libs.kafka.serializer.avro)
implementation(libs.kotlin.faker)
implementation(libs.slf4j.api)

testImplementation(libs.junit.jupiter.api)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.apim.samples.avro

import io.github.serpro69.kfaker.Faker
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericFixed
import org.apache.avro.generic.GenericRecord
import kotlin.random.Random

val faker = Faker()

fun generate(schema: Schema?): Any? = when(schema?.type) {
Schema.Type.BOOLEAN -> faker.random.nextBoolean()
Schema.Type.INT -> faker.random.nextInt()
Schema.Type.LONG -> faker.random.nextLong()
Schema.Type.FLOAT -> faker.random.nextFloat()
Schema.Type.DOUBLE -> faker.random.nextDouble()
Schema.Type.BYTES -> faker.random.randomString().toByteArray()
Schema.Type.STRING -> faker.random.randomString()
Schema.Type.RECORD -> newRecord(schema)
Schema.Type.ENUM -> faker.random.randomValue(schema.enumSymbols)
Schema.Type.ARRAY -> newArray(schema)
Schema.Type.MAP -> newMap(schema)
Schema.Type.UNION -> newUnion(schema)
Schema.Type.FIXED -> newFixed(schema)
Schema.Type.NULL -> null
null -> null
}

private fun newRecord(schema: Schema): GenericRecord {
val record = GenericData.Record(schema)

schema.fields.forEach {
record.put(it.name(), generate(it.schema()))
}

return record
}

private fun newArray(schema: Schema): List<Any?> {
val list = mutableListOf<Any?>()
repeat(3) { list.add(generate(schema.elementType)) }
return list
}

private fun newMap(schema: Schema): Map<String, Any?> {
val map = mutableMapOf<String, Any?>()
repeat(3) { map[faker.random.randomString()] = generate(schema.valueType) }
return map
}

private fun newUnion(schema: Schema): Any? {
val selectedSchema = faker.random.randomValue(schema.types)
return generate(selectedSchema)
}

private fun newFixed(schema: Schema): GenericFixed {
val bytes = ByteArray(schema.fixedSize)
Random.nextBytes(bytes)
return GenericData.Fixed(schema, bytes)
}
7 changes: 7 additions & 0 deletions app/src/main/kotlin/io/apim/samples/avro/AvroSerDe.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.apim.samples.avro

interface AvroSerDe {
fun serialize(data: Any?): ByteArray

fun deserialize(binary: ByteArray): Any?
}
19 changes: 19 additions & 0 deletions app/src/main/kotlin/io/apim/samples/avro/AvroSerDeConfluent.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.apim.samples.avro

import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.avro.Schema

class AvroSerDeConfluent(private val schema: Schema) : AvroSerDe {
override fun serialize(data: Any?): ByteArray {
val serializer = KafkaAvroSerializer()
serializer.configure(mapOf("schema.registry.url" to "mock://my-scope"), false)
return serializer.serialize("topic", data)
}

override fun deserialize(binary: ByteArray): Any? {
val deserializer = KafkaAvroDeserializer()
deserializer.configure(mapOf("schema.registry.url" to "mock://my-scope"), false)
return deserializer.deserialize("topic", binary, schema)
}
}
9 changes: 9 additions & 0 deletions app/src/main/kotlin/io/apim/samples/avro/AvroSerDeFactory.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.apim.samples.avro

import org.apache.avro.Schema

enum class SerializationFormat { CONFLUENT, SIMPLE, }

interface AvroSerDeFactory {
fun new(schema: Schema, format: SerializationFormat = SerializationFormat.SIMPLE): AvroSerDe
}
10 changes: 10 additions & 0 deletions app/src/main/kotlin/io/apim/samples/avro/AvroSerDeFactoryImpl.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.apim.samples.avro

import org.apache.avro.Schema

class AvroSerDeFactoryImpl: AvroSerDeFactory {
override fun new(schema: Schema, format: SerializationFormat): AvroSerDe = when(format){
SerializationFormat.SIMPLE -> AvroSerDeSimple(schema)
SerializationFormat.CONFLUENT -> AvroSerDeConfluent(schema)
}
}
28 changes: 28 additions & 0 deletions app/src/main/kotlin/io/apim/samples/avro/AvroSerDeSimple.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.apim.samples.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.io.DecoderFactory
import org.apache.avro.io.EncoderFactory
import java.io.ByteArrayOutputStream

class AvroSerDeSimple(private val schema: Schema) : AvroSerDe {
override fun serialize(data: Any?): ByteArray {
val writer = GenericDatumWriter<Any>(schema)
val output = ByteArrayOutputStream()
val encoder = EncoderFactory.get().binaryEncoder(output, null)

writer.write(data, encoder)
encoder.flush()

return output.toByteArray()
}

override fun deserialize(binary: ByteArray): Any? {
val reader = GenericDatumReader<Any>(schema)
val decoder = DecoderFactory.get().binaryDecoder(binary, null)

return reader.read(null, decoder)
}
}
51 changes: 51 additions & 0 deletions app/src/main/kotlin/io/apim/samples/rest/AvroHandler.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.apim.samples.rest

import io.apim.samples.avro.AvroSerDeFactoryImpl
import io.apim.samples.avro.SerializationFormat
import io.apim.samples.avro.generate
import io.vertx.core.http.HttpHeaders
import io.vertx.ext.web.impl.ParsableMIMEValue
import io.vertx.kotlin.core.json.obj
import io.vertx.rxjava3.core.buffer.Buffer
import io.vertx.rxjava3.ext.web.RoutingContext
import org.apache.avro.Schema
import org.apache.avro.SchemaParseException

val serdeFactory = AvroSerDeFactoryImpl()

fun avroHandler(ctx: RoutingContext) {
val contentType = (ctx.parsedHeaders().delegate.contentType() as ParsableMIMEValue).forceParse()

if (!contentType.isJson()) {
ctx.sendError(400, "Provide an avro schema")
return
}

try {
val schema = Schema.Parser().parse(ctx.body().asString())
val data = generate(schema)

val format = ctx.queryParam("format").elementAtOrNull(0)?.let { SerializationFormat.valueOf(it) }
?: SerializationFormat.CONFLUENT
val serde = serdeFactory.new(schema, format)

ctx.response().statusCode = 200
ctx.response().putHeader(HttpHeaders.CONTENT_TYPE, "application/*+avro")
ctx.end(Buffer.buffer(serde.serialize(data))).subscribe()

} catch (e: SchemaParseException) {
ctx.sendError(400, "Invalid avro schema", e.message)
return
}
}

private fun RoutingContext.sendError(statusCode: Int, title: String, detail: String? = null) {
this.response().statusCode = statusCode
this.end(io.vertx.kotlin.core.json.json {
obj(
"title" to title,
"detail" to detail
)
}
.toString()).subscribe()
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class RestServerVerticle(
private fun router(): Router = Router.router(vertx).let { router ->
router.route().handler(BodyHandler.create())
router.route("/echo").handler(::echoHandler)
router.route("/avro").handler(::avroHandler)
router.route("/grpc*").handler(::protobufFileHandler)
router.route("/health*").handler(HealthCheckHandler.createWithHealthChecks(healthChecks))
router
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package io.apim.samples.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericFixed
import org.apache.avro.generic.GenericRecord
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import strikt.api.expectThat
import strikt.assertions.*

class AvroGenericDataGeneratorTest {

@Nested
inner class Primitives {

@Test
fun `generate from boolean schema`() {
val schema = Schema.Parser().parse("""{"type": "boolean"}""")
expectThat(generate(schema)).isNotNull().isA<Boolean>()
}

@Test
fun `generate from null schema`() {
val schema = Schema.Parser().parse("""{"type": "null"}""")
expectThat(generate(schema)).isNull()
expectThat(generate(null)).isNull()
}

@Test
fun `generate from int schema`() {
val schema = Schema.Parser().parse("""{"type": "int"}""")
expectThat(generate(schema)).isNotNull().isA<Int>()
}

@Test
fun `generate from long schema`() {
val schema = Schema.Parser().parse("""{"type": "long"}""")
expectThat(generate(schema)).isNotNull().isA<Long>()
}

@Test
fun `generate from float schema`() {
val schema = Schema.Parser().parse("""{"type": "float"}""")
expectThat(generate(schema)).isNotNull().isA<Float>()
}

@Test
fun `generate from double schema`() {
val schema = Schema.Parser().parse("""{"type": "double"}""")
expectThat(generate(schema)).isNotNull().isA<Double>()
}

@Test
fun `generate from bytes schema`() {
val schema = Schema.Parser().parse("""{"type": "bytes"}""")
expectThat(generate(schema)).isNotNull().isA<ByteArray>()
}

@Test
fun `generate from string schema`() {
val schema = Schema.Parser().parse("""{"type": "string"}""")
expectThat(generate(schema)).isNotNull().isA<String>()
}
}

@Nested
inner class Record {

@Test
fun `generate from record schema`() {
val schema = Schema.Parser().parse("""
{
"type": "record",
"name": "Payment",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "amount",
"type": "double"
}
]
}
""".trimIndent())
expectThat(generate(schema)).isNotNull().isA<GenericRecord>().and {
get { get("id") }.isNotNull().isA<String>()
get { get("amount") }.isNotNull().isA<Double>()
}
}
}

@Nested
inner class Enum {
@Test
fun `generate from enum schema`() {
val expectedValues = listOf("SPADES", "HEARTS", "DIAMONDS", "CLUBS")
val schema = Schema.Parser().parse("""
{
"type": "enum",
"name": "Suit",
"symbols" : [${expectedValues.joinToString(",") { "\"$it\"" }}]
}
""".trimIndent())
expectThat(generate(schema)).isNotNull().isA<String>().isContainedIn(expectedValues)
}
}

@Nested
inner class Arrays {
@Test
fun `generate from string array schema`() {
val schema = Schema.Parser().parse("""
{
"type": "array",
"items" : "string",
"default": []
}
""".trimIndent())
expectThat(generate(schema)).isNotNull().isA<List<String>>().hasSize(3)
}
}

@Nested
inner class Maps {
@Test
fun `generate from long value map schema`() {
val schema = Schema.Parser().parse("""
{
"type": "map",
"values" : "long",
"default": {}
}
""".trimIndent())
expectThat(generate(schema)).isNotNull().isA<Map<String, Long>>().hasSize(3)
}
}

@Nested
inner class Unions {
@Test
fun `generate from union schema`() {
val schema = Schema.Parser().parse("""
["null", "string"]
""".trimIndent())

repeat(10) {
val result = generate(schema)

val isNull = result == null
val isString = result != null && result is String

expectThat(isNull || isString).describedAs("Expecting a null or string but was '$result'").isTrue()
}
}
}

@Nested
inner class Fixed {
@Test
fun `generate from fixed schema`() {
val schema = Schema.Parser().parse("""
{"type": "fixed", "size": 16, "name": "md5"}
""".trimIndent())

val result = generate(schema)
expectThat(result).isNotNull().isA<GenericFixed>().and {
get { bytes().asList() }.hasSize(16)
}
}
}
}
Loading

0 comments on commit da8ac55

Please sign in to comment.