Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWSKafkaAvroSerDe is clunky to use, create GenericRecordAWSKafkaAvroSerDe and SpecificRecordAWSKafkaAvroSerDe #311

Open
er1c opened this issue Nov 30, 2023 · 0 comments

Comments

@er1c
Copy link

er1c commented Nov 30, 2023

(Note: this is some scala code)

But if I have some scala code like:

  val stream = builder.stream(topicName)(
    Consumed.`with`(keySerde, valueSerde)
  ).mapValues { v =>
    val rec = v.asInstanceOf[GenericRecord]

It requires me to do an explicit type cast since AWSKafkaAvroSerDe implements Serde[Object]

It would be much better ergonomics to have something like:

package foo

import com.amazonaws.services.schemaregistry.kafkastreams.AWSKafkaAvroSerDe
import com.amazonaws.services.schemaregistry.utils.{AWSSchemaRegistryConstants, AvroRecordType}
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}
import software.amazon.awssdk.services.glue.model.DataFormat

import scala.collection.JavaConverters._

object GenericRecordAWSKafkaAvroSerDe {
  private val formatProps = Map(
    AWSSchemaRegistryConstants.AVRO_RECORD_TYPE -> AvroRecordType.GENERIC_RECORD.getName,
    AWSSchemaRegistryConstants.DATA_FORMAT -> DataFormat.AVRO.name(),
  )
}

class GenericRecordAWSKafkaAvroSerDe extends Serde[GenericRecord] {
  private val inner: AWSKafkaAvroSerDe = new AWSKafkaAvroSerDe()

  override def serializer(): Serializer[GenericRecord] =
    inner.serializer().asInstanceOf[Serializer[GenericRecord]]

  override def deserializer(): Deserializer[GenericRecord] =
    inner.deserializer().asInstanceOf[Deserializer[GenericRecord]]

  /**
   * Configure the serializer and de-serializer wrapper.
   *
   * @param serdeConfig          configuration elements for the wrapper
   * @param isSerdeForRecordKeys true if key, false otherwise
   */
  final def configure(serdeConfig: Map[String, _], isSerdeForRecordKeys: Boolean): Unit = {
    val updatedConfig = GenericRecordAWSKafkaAvroSerDe.formatProps ++ serdeConfig
    inner.serializer.configure(updatedConfig.asJava, isSerdeForRecordKeys)
    inner.deserializer.configure(updatedConfig.asJava, isSerdeForRecordKeys)
  }
}

and a corresponding one for SpecificRecord.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant