Skip to content

Commit

Permalink
Implement logical types conversion for serializer/deserializer
Browse files Browse the repository at this point in the history
  • Loading branch information
charlescd authored and rlecomte committed Jul 23, 2024
1 parent c5b16ea commit 75ee139
Show file tree
Hide file tree
Showing 12 changed files with 134 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class GlueSchemaRegistryConfiguration {
private List<SerializationFeature> jacksonSerializationFeatures;
private List<DeserializationFeature> jacksonDeserializationFeatures;

private boolean logicalTypesConversionEnabled;

public GlueSchemaRegistryConfiguration(String region) {
Map<String, Object> config = new HashMap<>();
config.put(AWSSchemaRegistryConstants.AWS_REGION, region);
Expand Down Expand Up @@ -104,6 +106,7 @@ private void buildSchemaRegistryConfigs(Map<String, ?> configs) {
validateAndSetUserAgent(configs);
validateAndSetSecondaryDeserializer(configs);
validateAndSetProxyUrl(configs);
validateAndSetLogicalTypesConversionEnabled(configs);
}

private void validateAndSetSecondaryDeserializer(Map<String, ?> configs) {
Expand All @@ -130,6 +133,12 @@ private void validateAndSetUserAgent(Map<String, ?> configs) {
}
}

private void validateAndSetLogicalTypesConversionEnabled(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.LOGICAL_TYPES_CONVERSION_ENABLED)) {
this.logicalTypesConversionEnabled = (Boolean) configs.get(AWSSchemaRegistryConstants.LOGICAL_TYPES_CONVERSION_ENABLED);
}
}

private void validateAndSetCompressionType(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.COMPRESSION_TYPE) && validateCompressionType(
(String) configs.get(AWSSchemaRegistryConstants.COMPRESSION_TYPE))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ public final class AWSSchemaRegistryConstants {
*/
public static final String USER_AGENT_APP = "userAgentApp";

/**
* Boolean indicating if logical types in avro data must be converted or not.
*/
public static final String LOGICAL_TYPES_CONVERSION_ENABLED = "logicalTypesConversionEnabled";

/**
* Private constructor to avoid initialization of the class.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public class AvroDeserializer implements GlueSchemaRegistryDataFormatDeserialize
@Setter
private AvroRecordType avroRecordType;

@Setter
private boolean logicalTypesConversionEnabled;

@NonNull
@Getter
@VisibleForTesting
Expand All @@ -65,6 +68,7 @@ public class AvroDeserializer implements GlueSchemaRegistryDataFormatDeserialize
public AvroDeserializer(GlueSchemaRegistryConfiguration configs) {
this.schemaRegistrySerDeConfigs = configs;
this.avroRecordType = configs.getAvroRecordType();
this.logicalTypesConversionEnabled = configs.isLogicalTypesConversionEnabled();
this.datumReaderCache =
CacheBuilder
.newBuilder()
Expand Down Expand Up @@ -111,7 +115,7 @@ private BinaryDecoder getBinaryDecoder(byte[] data, int start, int end) {
private class DatumReaderCache extends CacheLoader<String, DatumReader<Object>> {
@Override
public DatumReader<Object> load(String schema) throws Exception {
return DatumReaderInstance.from(schema, avroRecordType);
return DatumReaderInstance.from(schema, avroRecordType, logicalTypesConversionEnabled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class DatumReaderInstance {
* @throws IllegalAccessException can be thrown readerClass.newInstance() from
* java.lang.Class implementation
*/
public static DatumReader<Object> from(String writerSchemaDefinition, AvroRecordType avroRecordType)
public static DatumReader<Object> from(String writerSchemaDefinition, AvroRecordType avroRecordType, boolean logicalTypesConversionEnabled)
throws InstantiationException, IllegalAccessException {

Schema writerSchema = AVRO_UTILS.parseSchema(writerSchemaDefinition);
Expand All @@ -47,7 +47,11 @@ public static DatumReader<Object> from(String writerSchemaDefinition, AvroRecord
case GENERIC_RECORD:
log.debug("Using GenericDatumReader for de-serializing Avro message, schema: {})",
writerSchema.toString());
return new GenericDatumReader<>(writerSchema);
if (logicalTypesConversionEnabled) {
return new GenericDatumReader<>(writerSchema, writerSchema, GenericDataWithLogicalTypesConversion.getInstance());
} else {
return new GenericDatumReader<>(writerSchema);
}

default:
String message = String.format("Unsupported AvroRecordType: %s",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.amazonaws.services.schemaregistry.deserializers.avro;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;

@Slf4j
public class GenericDataWithLogicalTypesConversion {
private static final GenericData INSTANCE = new GenericData();

static {
INSTANCE.addLogicalTypeConversion(new Conversions.DecimalConversion());
INSTANCE.addLogicalTypeConversion(new Conversions.UUIDConversion());
INSTANCE.addLogicalTypeConversion(new TimeConversions.DateConversion());
INSTANCE.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
INSTANCE.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
INSTANCE.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
INSTANCE.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
INSTANCE.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
INSTANCE.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
}

public static GenericData getInstance() {
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public GlueSchemaRegistryDataFormatSerializer getInstance(@NonNull DataFormat da
@NonNull GlueSchemaRegistryConfiguration glueSchemaRegistryConfig) {
switch (dataFormat) {
case AVRO:
this.serializerMap.computeIfAbsent(dataFormat, key -> new AvroSerializer());
this.serializerMap.computeIfAbsent(dataFormat, key -> new AvroSerializer(glueSchemaRegistryConfig));

log.debug("Returning Avro serializer instance from GlueSchemaRegistrySerializerFactory");
return this.serializerMap.get(dataFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.amazonaws.services.schemaregistry.serializers.avro;

import com.amazonaws.services.schemaregistry.common.GlueSchemaRegistryDataFormatSerializer;
import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration;
import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.amazonaws.services.schemaregistry.utils.AVROUtils;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
Expand Down Expand Up @@ -45,12 +46,14 @@
public class AvroSerializer implements GlueSchemaRegistryDataFormatSerializer {
private AVROUtils avroUtils = AVROUtils.getInstance();
private static final long MAX_DATUM_WRITER_CACHE_SIZE = 100;
private final boolean logicalTypesConversionEnabled;

@NonNull
@VisibleForTesting
protected final LoadingCache<DatumWriterCacheKey, DatumWriter<Object>> datumWriterCache;

public AvroSerializer() {
public AvroSerializer(GlueSchemaRegistryConfiguration glueSchemaRegistryConfig) {
this.logicalTypesConversionEnabled = glueSchemaRegistryConfig.isLogicalTypesConversionEnabled();
this.datumWriterCache =
CacheBuilder
.newBuilder()
Expand All @@ -61,7 +64,7 @@ public AvroSerializer() {
@Override
public byte[] serialize(Object data) {
byte[] bytes;
bytes = serialize(data, createDatumWriter(data));
bytes = serialize(data, createDatumWriter(data, logicalTypesConversionEnabled));

return bytes;
}
Expand All @@ -74,19 +77,19 @@ public byte[] serialize(Object data) {
* @param object the Avro message
* @return Avro datum writer for serialization
*/
private DatumWriter<Object> createDatumWriter(Object object) {
private DatumWriter<Object> createDatumWriter(Object object, boolean logicalTypesConversionEnabled) {
org.apache.avro.Schema schema = AVROUtils.getInstance()
.getSchema(object);
if (object instanceof SpecificRecord) {
return getSpecificDatumWriter(schema);
} else if (object instanceof GenericRecord) {
return getGenericDatumWriter(schema);
return getGenericDatumWriter(schema, logicalTypesConversionEnabled);
} else if (object instanceof GenericData.EnumSymbol) {
return getGenericDatumWriter(schema);
return getGenericDatumWriter(schema, logicalTypesConversionEnabled);
} else if (object instanceof GenericData.Array) {
return getGenericDatumWriter(schema);
return getGenericDatumWriter(schema, logicalTypesConversionEnabled);
} else if (object instanceof GenericData.Fixed) {
return getGenericDatumWriter(schema);
return getGenericDatumWriter(schema, logicalTypesConversionEnabled);
} else {
String message =
String.format("Unsupported type passed for serialization: %s", object);
Expand All @@ -96,13 +99,13 @@ private DatumWriter<Object> createDatumWriter(Object object) {

@SneakyThrows
private DatumWriter<Object> getSpecificDatumWriter(Schema schema) {
DatumWriterCacheKey datumWriterCacheKey = new DatumWriterCacheKey(schema, AvroRecordType.SPECIFIC_RECORD);
DatumWriterCacheKey datumWriterCacheKey = new DatumWriterCacheKey(schema, AvroRecordType.SPECIFIC_RECORD, false);
return datumWriterCache.get(datumWriterCacheKey);
}

@SneakyThrows
private DatumWriter<Object> getGenericDatumWriter(Schema schema) {
DatumWriterCacheKey datumWriterCacheKey = new DatumWriterCacheKey(schema, AvroRecordType.GENERIC_RECORD);
private DatumWriter<Object> getGenericDatumWriter(Schema schema, boolean logicalTypesConversionEnabled) {
DatumWriterCacheKey datumWriterCacheKey = new DatumWriterCacheKey(schema, AvroRecordType.GENERIC_RECORD, logicalTypesConversionEnabled);
return datumWriterCache.get(datumWriterCacheKey);
}

Expand Down Expand Up @@ -160,14 +163,16 @@ private static class DatumWriterCacheKey {
private final Schema schema;
@NonNull
private final AvroRecordType avroRecordType;
private final boolean logicalTypesConversionEnabled;
}

private static class DatumWriterCache extends CacheLoader<DatumWriterCacheKey, DatumWriter<Object>> {
@Override
public DatumWriter<Object> load(DatumWriterCacheKey datumWriterCacheKey) {
Schema schema = datumWriterCacheKey.getSchema();
AvroRecordType avroRecordType = datumWriterCacheKey.getAvroRecordType();
return DatumWriterInstance.get(schema, avroRecordType);
boolean logicalTypesConversionEnabled = datumWriterCacheKey.isLogicalTypesConversionEnabled();
return DatumWriterInstance.get(schema, avroRecordType, logicalTypesConversionEnabled);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.amazonaws.services.schemaregistry.serializers.avro;

import com.amazonaws.services.schemaregistry.deserializers.avro.GenericDataWithLogicalTypesConversion;
import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import org.apache.avro.Schema;
Expand All @@ -8,12 +9,16 @@
import org.apache.avro.specific.SpecificDatumWriter;

public class DatumWriterInstance {
public static DatumWriter<Object> get(Schema schema, AvroRecordType avroRecordType) {
public static DatumWriter<Object> get(Schema schema, AvroRecordType avroRecordType, boolean logicalTypesConversionEnabled) {
switch (avroRecordType) {
case SPECIFIC_RECORD:
return new SpecificDatumWriter<>(schema);
case GENERIC_RECORD:
return new GenericDatumWriter<>(schema);
if (logicalTypesConversionEnabled) {
return new GenericDatumWriter<>(schema, GenericDataWithLogicalTypesConversion.getInstance());
} else {
return new GenericDatumWriter<>(schema);
}
case UNKNOWN:
default:
String message =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Map;
import java.util.UUID;

import static com.amazonaws.services.schemaregistry.utils.RecordGenerator.AVRO_USER_LOGICAL_TYPES_SCHEMA_FILE_PATH;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -87,6 +88,7 @@ public class AvroDeserializerTest {
public void setup() {
this.configs.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, "https://test");
this.configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-west-2");
this.configs.put(AWSSchemaRegistryConstants.LOGICAL_TYPES_CONVERSION_ENABLED, true);
this.schemaRegistrySerDeConfigs = new GlueSchemaRegistryConfiguration(this.configs);

MockitoAnnotations.initMocks(this);
Expand Down Expand Up @@ -298,6 +300,28 @@ public void testDeserialize_genericRecord_equalsOriginal(AWSSchemaRegistryConsta
assertEquals(1, avroDeserializer.getDatumReaderCache().size());
}

/**
* Test whether the serialized generic record with logical types can be de-serialized back to the
* generic record instance with conversions.
*/
@ParameterizedTest
@EnumSource(AWSSchemaRegistryConstants.COMPRESSION.class)
public void testDeserialize_genericRecord_with_logicalTypes_equalsOriginal(AWSSchemaRegistryConstants.COMPRESSION compressionType) {
GenericRecord genericRecord = RecordGenerator.createGenericAvroRecordWithLogicalTypes();

ByteBuffer serializedData = createBasicSerializedData(genericRecord, compressionType.name(), DataFormat.AVRO);
org.apache.avro.Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_LOGICAL_TYPES_SCHEMA_FILE_PATH);
AvroDeserializer avroDeserializer = createAvroDeserializer(AvroRecordType.GENERIC_RECORD);

com.amazonaws.services.schemaregistry.common.Schema schemaObject = new com.amazonaws.services.schemaregistry.common.Schema(
schema.toString(), DataFormat.AVRO.name(), "testAvroSchema");

Object deserializedObject = avroDeserializer.deserialize(serializedData, schemaObject);
assertGenericRecord(genericRecord, deserializedObject);
//Assert the instance is getting cached.
assertEquals(1, avroDeserializer.getDatumReaderCache().size());
}

public void assertGenericRecord(GenericRecord genericRecord, Object deserializedObject) {
assertTrue(deserializedObject instanceof GenericRecord);
assertTrue(deserializedObject.equals(genericRecord));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package com.amazonaws.services.schemaregistry.serializers.avro;

import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.RecordGenerator;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.Test;

import java.util.HashMap;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class AvroSerializerTest {

@Test
public void serialize_WhenSerializeIsCalled_ReturnsCachedInstance() {
AvroSerializer avroSerializer = new AvroSerializer();
GlueSchemaRegistryConfiguration config = new GlueSchemaRegistryConfiguration("eu-west-1");
AvroSerializer avroSerializer = new AvroSerializer(config);

User specificUserRecord = RecordGenerator.createSpecificAvroRecord();
GenericRecord genericUserRecord = RecordGenerator.createGenericUserMapAvroRecord();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
Expand All @@ -42,6 +44,7 @@ public final class RecordGenerator {
public static final String AVRO_USER_ARRAY_STRING_SCHEMA_FILE = "src/test/resources/avro/user_array_String.avsc";
public static final String AVRO_USER_MAP_SCHEMA_FILE = "src/test/resources/avro/user_map.avsc";
public static final String AVRO_USER_MIXED_TYPE_SCHEMA_FILE = "src/test/resources/avro/user3.avsc";
public static final String AVRO_USER_LOGICAL_TYPES_SCHEMA_FILE_PATH = "src/test/resources/avro/user4.avsc";
public static final String JSON_PERSON_SCHEMA_FILE_PATH =
"src/test/resources/json/schema/draft07/person.schema.json";
public static final String JSON_PERSON_DATA_FILE_PATH = "src/test/resources/json/person1.json";
Expand Down Expand Up @@ -322,6 +325,21 @@ public static GenericData.Record createGenericMultipleTypesAvroRecord() {
return genericRecordWithAllTypes;
}

/**
* Test Helper method to generate a test GenericRecord with logical types
*
* @return Generic AVRO Record
*/
public static GenericRecord createGenericAvroRecordWithLogicalTypes() {
Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_LOGICAL_TYPES_SCHEMA_FILE_PATH);
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("name", "Sylvestre");
genericRecord.put("dateOfBirth", LocalDate.parse("2021-05-01"));
genericRecord.put("age", new BigDecimal("1.56"));

return genericRecord;
}

/**
* Helper method to create a test user object
*
Expand Down
10 changes: 10 additions & 0 deletions serializer-deserializer/src/test/resources/avro/user4.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"namespace": "com.amazonaws.services.schemaregistry.serializers.avro",
"type": "record",
"name": "User4",
"fields": [
{"name": "name", "type": "string" },
{"name": "dateOfBirth", "type": { "type": "int", "logicalType": "date"} },
{"name": "age", "type": { "type": "bytes", "logicalType": "decimal", "precision": 12, "scale": 2 } }
]
}

0 comments on commit 75ee139

Please sign in to comment.