diff --git a/common/src/main/java/com/amazonaws/services/schemaregistry/common/configs/GlueSchemaRegistryConfiguration.java b/common/src/main/java/com/amazonaws/services/schemaregistry/common/configs/GlueSchemaRegistryConfiguration.java index ce7d870d..9b7e6ac2 100644 --- a/common/src/main/java/com/amazonaws/services/schemaregistry/common/configs/GlueSchemaRegistryConfiguration.java +++ b/common/src/main/java/com/amazonaws/services/schemaregistry/common/configs/GlueSchemaRegistryConfiguration.java @@ -68,6 +68,8 @@ public class GlueSchemaRegistryConfiguration { private List jacksonSerializationFeatures; private List jacksonDeserializationFeatures; + private boolean logicalTypesConversionEnabled; + public GlueSchemaRegistryConfiguration(String region) { Map config = new HashMap<>(); config.put(AWSSchemaRegistryConstants.AWS_REGION, region); @@ -104,6 +106,7 @@ private void buildSchemaRegistryConfigs(Map configs) { validateAndSetUserAgent(configs); validateAndSetSecondaryDeserializer(configs); validateAndSetProxyUrl(configs); + validateAndSetLogicalTypesConversionEnabled(configs); } private void validateAndSetSecondaryDeserializer(Map configs) { @@ -130,6 +133,12 @@ private void validateAndSetUserAgent(Map configs) { } } + private void validateAndSetLogicalTypesConversionEnabled(Map configs) { + if (isPresent(configs, AWSSchemaRegistryConstants.LOGICAL_TYPES_CONVERSION_ENABLED)) { + this.logicalTypesConversionEnabled = (Boolean) configs.get(AWSSchemaRegistryConstants.LOGICAL_TYPES_CONVERSION_ENABLED); + } + } + private void validateAndSetCompressionType(Map configs) { if (isPresent(configs, AWSSchemaRegistryConstants.COMPRESSION_TYPE) && validateCompressionType( (String) configs.get(AWSSchemaRegistryConstants.COMPRESSION_TYPE))) { diff --git a/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java b/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java index 5ce30b06..1138ddc4 100644 --- a/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java +++ b/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java @@ -172,6 +172,11 @@ public final class AWSSchemaRegistryConstants { */ public static final String USER_AGENT_APP = "userAgentApp"; + /** + * Boolean indicating if logical types in avro data must be converted or not. + */ + public static final String LOGICAL_TYPES_CONVERSION_ENABLED = "logicalTypesConversionEnabled"; + /** * Private constructor to avoid initialization of the class. */ diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializer.java index d8f7b43b..c2e9108e 100644 --- a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializer.java +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializer.java @@ -51,6 +51,9 @@ public class AvroDeserializer implements GlueSchemaRegistryDataFormatDeserialize @Setter private AvroRecordType avroRecordType; + @Setter + private boolean logicalTypesConversionEnabled; + @NonNull @Getter @VisibleForTesting @@ -65,6 +68,7 @@ public class AvroDeserializer implements GlueSchemaRegistryDataFormatDeserialize public AvroDeserializer(GlueSchemaRegistryConfiguration configs) { this.schemaRegistrySerDeConfigs = configs; this.avroRecordType = configs.getAvroRecordType(); + this.logicalTypesConversionEnabled = configs.isLogicalTypesConversionEnabled(); this.datumReaderCache = CacheBuilder .newBuilder() @@ -111,7 +115,7 @@ private BinaryDecoder getBinaryDecoder(byte[] data, int start, int end) { private class DatumReaderCache extends CacheLoader> { @Override public DatumReader load(String schema) throws Exception { - return DatumReaderInstance.from(schema, avroRecordType); + return DatumReaderInstance.from(schema, avroRecordType, logicalTypesConversionEnabled); } } } diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/DatumReaderInstance.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/DatumReaderInstance.java index 718d7cd5..136706de 100644 --- a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/DatumReaderInstance.java +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/DatumReaderInstance.java @@ -29,7 +29,7 @@ public class DatumReaderInstance { * @throws IllegalAccessException can be thrown readerClass.newInstance() from * java.lang.Class implementation */ - public static DatumReader from(String writerSchemaDefinition, AvroRecordType avroRecordType) + public static DatumReader from(String writerSchemaDefinition, AvroRecordType avroRecordType, boolean logicalTypesConversionEnabled) throws InstantiationException, IllegalAccessException { Schema writerSchema = AVRO_UTILS.parseSchema(writerSchemaDefinition); @@ -47,7 +47,11 @@ public static DatumReader from(String writerSchemaDefinition, AvroRecord case GENERIC_RECORD: log.debug("Using GenericDatumReader for de-serializing Avro message, schema: {})", writerSchema.toString()); - return new GenericDatumReader<>(writerSchema); + if (logicalTypesConversionEnabled) { + return new GenericDatumReader<>(writerSchema, writerSchema, GenericDataWithLogicalTypesConversion.getInstance()); + } else { + return new GenericDatumReader<>(writerSchema); + } default: String message = String.format("Unsupported AvroRecordType: %s", diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/GenericDataWithLogicalTypesConversion.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/GenericDataWithLogicalTypesConversion.java new file mode 100644 index 00000000..8a232998 --- /dev/null +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/GenericDataWithLogicalTypesConversion.java @@ -0,0 +1,27 @@ +package com.amazonaws.services.schemaregistry.deserializers.avro; + +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Conversions; +import org.apache.avro.data.TimeConversions; +import org.apache.avro.generic.GenericData; + +@Slf4j +public class GenericDataWithLogicalTypesConversion { + private static final GenericData INSTANCE = new GenericData(); + + static { + INSTANCE.addLogicalTypeConversion(new Conversions.DecimalConversion()); + INSTANCE.addLogicalTypeConversion(new Conversions.UUIDConversion()); + INSTANCE.addLogicalTypeConversion(new TimeConversions.DateConversion()); + INSTANCE.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); + INSTANCE.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); + INSTANCE.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion()); + INSTANCE.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion()); + INSTANCE.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); + INSTANCE.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); + } + + public static GenericData getInstance() { + return INSTANCE; + } +} diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/GlueSchemaRegistrySerializerFactory.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/GlueSchemaRegistrySerializerFactory.java index 01bb7065..83e28cc8 100644 --- a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/GlueSchemaRegistrySerializerFactory.java +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/GlueSchemaRegistrySerializerFactory.java @@ -45,7 +45,7 @@ public GlueSchemaRegistryDataFormatSerializer getInstance(@NonNull DataFormat da @NonNull GlueSchemaRegistryConfiguration glueSchemaRegistryConfig) { switch (dataFormat) { case AVRO: - this.serializerMap.computeIfAbsent(dataFormat, key -> new AvroSerializer()); + this.serializerMap.computeIfAbsent(dataFormat, key -> new AvroSerializer(glueSchemaRegistryConfig)); log.debug("Returning Avro serializer instance from GlueSchemaRegistrySerializerFactory"); return this.serializerMap.get(dataFormat); diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/avro/AvroSerializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/avro/AvroSerializer.java index e9390216..cfda62d3 100644 --- a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/avro/AvroSerializer.java +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/avro/AvroSerializer.java @@ -15,6 +15,7 @@ package com.amazonaws.services.schemaregistry.serializers.avro; import com.amazonaws.services.schemaregistry.common.GlueSchemaRegistryDataFormatSerializer; +import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration; import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; import com.amazonaws.services.schemaregistry.utils.AVROUtils; import com.amazonaws.services.schemaregistry.utils.AvroRecordType; @@ -45,12 +46,14 @@ public class AvroSerializer implements GlueSchemaRegistryDataFormatSerializer { private AVROUtils avroUtils = AVROUtils.getInstance(); private static final long MAX_DATUM_WRITER_CACHE_SIZE = 100; + private final boolean logicalTypesConversionEnabled; @NonNull @VisibleForTesting protected final LoadingCache> datumWriterCache; - public AvroSerializer() { + public AvroSerializer(GlueSchemaRegistryConfiguration glueSchemaRegistryConfig) { + this.logicalTypesConversionEnabled = glueSchemaRegistryConfig.isLogicalTypesConversionEnabled(); this.datumWriterCache = CacheBuilder .newBuilder() @@ -61,7 +64,7 @@ public AvroSerializer() { @Override public byte[] serialize(Object data) { byte[] bytes; - bytes = serialize(data, createDatumWriter(data)); + bytes = serialize(data, createDatumWriter(data, logicalTypesConversionEnabled)); return bytes; } @@ -74,19 +77,19 @@ public byte[] serialize(Object data) { * @param object the Avro message * @return Avro datum writer for serialization */ - private DatumWriter createDatumWriter(Object object) { + private DatumWriter createDatumWriter(Object object, boolean logicalTypesConversionEnabled) { org.apache.avro.Schema schema = AVROUtils.getInstance() .getSchema(object); if (object instanceof SpecificRecord) { return getSpecificDatumWriter(schema); } else if (object instanceof GenericRecord) { - return getGenericDatumWriter(schema); + return getGenericDatumWriter(schema, logicalTypesConversionEnabled); } else if (object instanceof GenericData.EnumSymbol) { - return getGenericDatumWriter(schema); + return getGenericDatumWriter(schema, logicalTypesConversionEnabled); } else if (object instanceof GenericData.Array) { - return getGenericDatumWriter(schema); + return getGenericDatumWriter(schema, logicalTypesConversionEnabled); } else if (object instanceof GenericData.Fixed) { - return getGenericDatumWriter(schema); + return getGenericDatumWriter(schema, logicalTypesConversionEnabled); } else { String message = String.format("Unsupported type passed for serialization: %s", object); @@ -96,13 +99,13 @@ private DatumWriter createDatumWriter(Object object) { @SneakyThrows private DatumWriter getSpecificDatumWriter(Schema schema) { - DatumWriterCacheKey datumWriterCacheKey = new DatumWriterCacheKey(schema, AvroRecordType.SPECIFIC_RECORD); + DatumWriterCacheKey datumWriterCacheKey = new DatumWriterCacheKey(schema, AvroRecordType.SPECIFIC_RECORD, false); return datumWriterCache.get(datumWriterCacheKey); } @SneakyThrows - private DatumWriter getGenericDatumWriter(Schema schema) { - DatumWriterCacheKey datumWriterCacheKey = new DatumWriterCacheKey(schema, AvroRecordType.GENERIC_RECORD); + private DatumWriter getGenericDatumWriter(Schema schema, boolean logicalTypesConversionEnabled) { + DatumWriterCacheKey datumWriterCacheKey = new DatumWriterCacheKey(schema, AvroRecordType.GENERIC_RECORD, logicalTypesConversionEnabled); return datumWriterCache.get(datumWriterCacheKey); } @@ -160,6 +163,7 @@ private static class DatumWriterCacheKey { private final Schema schema; @NonNull private final AvroRecordType avroRecordType; + private final boolean logicalTypesConversionEnabled; } private static class DatumWriterCache extends CacheLoader> { @@ -167,7 +171,8 @@ private static class DatumWriterCache extends CacheLoader load(DatumWriterCacheKey datumWriterCacheKey) { Schema schema = datumWriterCacheKey.getSchema(); AvroRecordType avroRecordType = datumWriterCacheKey.getAvroRecordType(); - return DatumWriterInstance.get(schema, avroRecordType); + boolean logicalTypesConversionEnabled = datumWriterCacheKey.isLogicalTypesConversionEnabled(); + return DatumWriterInstance.get(schema, avroRecordType, logicalTypesConversionEnabled); } } } diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/avro/DatumWriterInstance.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/avro/DatumWriterInstance.java index 450a164e..baeacf5b 100644 --- a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/avro/DatumWriterInstance.java +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/avro/DatumWriterInstance.java @@ -1,5 +1,6 @@ package com.amazonaws.services.schemaregistry.serializers.avro; +import com.amazonaws.services.schemaregistry.deserializers.avro.GenericDataWithLogicalTypesConversion; import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; import com.amazonaws.services.schemaregistry.utils.AvroRecordType; import org.apache.avro.Schema; @@ -8,12 +9,16 @@ import org.apache.avro.specific.SpecificDatumWriter; public class DatumWriterInstance { - public static DatumWriter get(Schema schema, AvroRecordType avroRecordType) { + public static DatumWriter get(Schema schema, AvroRecordType avroRecordType, boolean logicalTypesConversionEnabled) { switch (avroRecordType) { case SPECIFIC_RECORD: return new SpecificDatumWriter<>(schema); case GENERIC_RECORD: - return new GenericDatumWriter<>(schema); + if (logicalTypesConversionEnabled) { + return new GenericDatumWriter<>(schema, GenericDataWithLogicalTypesConversion.getInstance()); + } else { + return new GenericDatumWriter<>(schema); + } case UNKNOWN: default: String message = diff --git a/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializerTest.java b/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializerTest.java index 9271dd43..1fd457e3 100644 --- a/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializerTest.java +++ b/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializerTest.java @@ -50,6 +50,7 @@ import java.util.Map; import java.util.UUID; +import static com.amazonaws.services.schemaregistry.utils.RecordGenerator.AVRO_USER_LOGICAL_TYPES_SCHEMA_FILE_PATH; import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -87,6 +88,7 @@ public class AvroDeserializerTest { public void setup() { this.configs.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, "https://test"); this.configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-west-2"); + this.configs.put(AWSSchemaRegistryConstants.LOGICAL_TYPES_CONVERSION_ENABLED, true); this.schemaRegistrySerDeConfigs = new GlueSchemaRegistryConfiguration(this.configs); MockitoAnnotations.initMocks(this); @@ -298,6 +300,28 @@ public void testDeserialize_genericRecord_equalsOriginal(AWSSchemaRegistryConsta assertEquals(1, avroDeserializer.getDatumReaderCache().size()); } + /** + * Test whether the serialized generic record with logical types can be de-serialized back to the + * generic record instance with conversions. + */ + @ParameterizedTest + @EnumSource(AWSSchemaRegistryConstants.COMPRESSION.class) + public void testDeserialize_genericRecord_with_logicalTypes_equalsOriginal(AWSSchemaRegistryConstants.COMPRESSION compressionType) { + GenericRecord genericRecord = RecordGenerator.createGenericAvroRecordWithLogicalTypes(); + + ByteBuffer serializedData = createBasicSerializedData(genericRecord, compressionType.name(), DataFormat.AVRO); + org.apache.avro.Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_LOGICAL_TYPES_SCHEMA_FILE_PATH); + AvroDeserializer avroDeserializer = createAvroDeserializer(AvroRecordType.GENERIC_RECORD); + + com.amazonaws.services.schemaregistry.common.Schema schemaObject = new com.amazonaws.services.schemaregistry.common.Schema( + schema.toString(), DataFormat.AVRO.name(), "testAvroSchema"); + + Object deserializedObject = avroDeserializer.deserialize(serializedData, schemaObject); + assertGenericRecord(genericRecord, deserializedObject); + //Assert the instance is getting cached. + assertEquals(1, avroDeserializer.getDatumReaderCache().size()); + } + public void assertGenericRecord(GenericRecord genericRecord, Object deserializedObject) { assertTrue(deserializedObject instanceof GenericRecord); assertTrue(deserializedObject.equals(genericRecord)); diff --git a/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/serializers/avro/AvroSerializerTest.java b/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/serializers/avro/AvroSerializerTest.java index ec3492a2..22426ae8 100644 --- a/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/serializers/avro/AvroSerializerTest.java +++ b/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/serializers/avro/AvroSerializerTest.java @@ -1,16 +1,21 @@ package com.amazonaws.services.schemaregistry.serializers.avro; +import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration; +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; import com.amazonaws.services.schemaregistry.utils.RecordGenerator; import org.apache.avro.generic.GenericRecord; import org.junit.jupiter.api.Test; +import java.util.HashMap; + import static org.junit.jupiter.api.Assertions.assertEquals; public class AvroSerializerTest { @Test public void serialize_WhenSerializeIsCalled_ReturnsCachedInstance() { - AvroSerializer avroSerializer = new AvroSerializer(); + GlueSchemaRegistryConfiguration config = new GlueSchemaRegistryConfiguration("eu-west-1"); + AvroSerializer avroSerializer = new AvroSerializer(config); User specificUserRecord = RecordGenerator.createSpecificAvroRecord(); GenericRecord genericUserRecord = RecordGenerator.createGenericUserMapAvroRecord(); diff --git a/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/utils/RecordGenerator.java b/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/utils/RecordGenerator.java index a0e494aa..687daab9 100644 --- a/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/utils/RecordGenerator.java +++ b/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/utils/RecordGenerator.java @@ -22,7 +22,9 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import java.math.BigDecimal; import java.time.Instant; +import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; @@ -42,6 +44,7 @@ public final class RecordGenerator { public static final String AVRO_USER_ARRAY_STRING_SCHEMA_FILE = "src/test/resources/avro/user_array_String.avsc"; public static final String AVRO_USER_MAP_SCHEMA_FILE = "src/test/resources/avro/user_map.avsc"; public static final String AVRO_USER_MIXED_TYPE_SCHEMA_FILE = "src/test/resources/avro/user3.avsc"; + public static final String AVRO_USER_LOGICAL_TYPES_SCHEMA_FILE_PATH = "src/test/resources/avro/user4.avsc"; public static final String JSON_PERSON_SCHEMA_FILE_PATH = "src/test/resources/json/schema/draft07/person.schema.json"; public static final String JSON_PERSON_DATA_FILE_PATH = "src/test/resources/json/person1.json"; @@ -322,6 +325,21 @@ public static GenericData.Record createGenericMultipleTypesAvroRecord() { return genericRecordWithAllTypes; } + /** + * Test Helper method to generate a test GenericRecord with logical types + * + * @return Generic AVRO Record + */ + public static GenericRecord createGenericAvroRecordWithLogicalTypes() { + Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_LOGICAL_TYPES_SCHEMA_FILE_PATH); + GenericRecord genericRecord = new GenericData.Record(schema); + genericRecord.put("name", "Sylvestre"); + genericRecord.put("dateOfBirth", LocalDate.parse("2021-05-01")); + genericRecord.put("age", new BigDecimal("1.56")); + + return genericRecord; + } + /** * Helper method to create a test user object * diff --git a/serializer-deserializer/src/test/resources/avro/user4.avsc b/serializer-deserializer/src/test/resources/avro/user4.avsc new file mode 100644 index 00000000..582a4d30 --- /dev/null +++ b/serializer-deserializer/src/test/resources/avro/user4.avsc @@ -0,0 +1,10 @@ +{ + "namespace": "com.amazonaws.services.schemaregistry.serializers.avro", + "type": "record", + "name": "User4", + "fields": [ + {"name": "name", "type": "string" }, + {"name": "dateOfBirth", "type": { "type": "int", "logicalType": "date"} }, + {"name": "age", "type": { "type": "bytes", "logicalType": "decimal", "precision": 12, "scale": 2 } } + ] +} \ No newline at end of file