Skip to content

Commit

Permalink
GH-2992: Gate LocalTimestamp references in AvroSchemaConverter (#2993)
Browse files Browse the repository at this point in the history
* PARQUET-2992: Gate LocalTimestamp references in AvroSchemaConverter

* GH-2992: Test logical type conversion for different Avro versions
  • Loading branch information
clairemcginty committed Aug 21, 2024
1 parent d4384d3 commit 312a15f
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static java.util.Optional.of;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED_DEFAULT;
import static org.apache.parquet.avro.AvroRecordConverter.getRuntimeAvroVersion;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
Expand Down Expand Up @@ -488,15 +489,20 @@ private LogicalTypeAnnotation convertLogicalType(LogicalType logicalType) {
return timeType(true, MICROS);
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
return timestampType(true, MILLIS);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
return timestampType(false, MILLIS);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
return timestampType(true, MICROS);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
return timestampType(false, MICROS);
} else if (logicalType.getName().equals(LogicalTypes.uuid().getName()) && writeParquetUUID) {
return uuidType();
}

if (avroVersionSupportsLocalTimestampTypes()) {
if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
return timestampType(false, MILLIS);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
return timestampType(false, MICROS);
}
}

return null;
}

Expand Down Expand Up @@ -538,7 +544,7 @@ public Optional<LogicalType> visit(
LogicalTypeAnnotation.TimeUnit unit = timestampLogicalType.getUnit();
boolean isAdjustedToUTC = timestampLogicalType.isAdjustedToUTC();

if (isAdjustedToUTC) {
if (isAdjustedToUTC || !avroVersionSupportsLocalTimestampTypes()) {
switch (unit) {
case MILLIS:
return of(LogicalTypes.timestampMillis());
Expand Down Expand Up @@ -605,4 +611,14 @@ private static String namespace(String name, Map<String, Integer> names) {
Integer nameCount = names.merge(name, 1, (oldValue, value) -> oldValue + 1);
return nameCount > 1 ? name + nameCount : null;
}

/* Avro <= 1.9 does not support conversions to LocalTimestamp{Micros, Millis} classes */
private static boolean avroVersionSupportsLocalTimestampTypes() {
final String avroVersion = getRuntimeAvroVersion();

return avroVersion == null
|| !(avroVersion.startsWith("1.7.")
|| avroVersion.startsWith("1.8.")
|| avroVersion.startsWith("1.9."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.CALLS_REAL_METHODS;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import java.util.Arrays;
Expand All @@ -53,19 +55,34 @@
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(PowerMockRunner.class)
@PrepareForTest(AvroRecordConverter.class)
public class TestAvroSchemaConverter {

private static final Configuration NEW_BEHAVIOR = new Configuration(false);

@Before
public void setupMockito() {
PowerMockito.mockStatic(AvroRecordConverter.class, CALLS_REAL_METHODS);
}

@BeforeClass
public static void setupConf() {
NEW_BEHAVIOR.setBoolean("parquet.avro.add-list-element-records", false);
Expand Down Expand Up @@ -665,6 +682,27 @@ public void testTimestampMillisType() throws Exception {
testRoundTripConversion(
expected, "message myrecord {\n" + " required int64 timestamp (TIMESTAMP(MILLIS,true));\n" + "}\n");

// Test that conversions for timestamp types only use APIs that are available in the user's Avro version
for (String avroVersion : ImmutableSet.of("1.7.0", "1.8.0", "1.9.0", "1.10.0", "1.11.0")) {
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn(avroVersion);
final Schema converted = new AvroSchemaConverter()
.convert(Types.buildMessage()
.addField(Types.primitive(INT64, Type.Repetition.REQUIRED)
.as(LogicalTypeAnnotation.timestampType(
false, LogicalTypeAnnotation.TimeUnit.MILLIS))
.length(1)
.named("timestamp_type"))
.named("TestAvro"));

assertEquals(
avroVersion.matches("1\\.[789]\\.\\d+") ? "timestamp-millis" : "local-timestamp-millis",
converted
.getField("timestamp_type")
.schema()
.getLogicalType()
.getName());
}

for (PrimitiveTypeName primitive :
new PrimitiveTypeName[] {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
final PrimitiveType type;
Expand Down Expand Up @@ -729,6 +767,27 @@ public void testTimestampMicrosType() throws Exception {
IllegalArgumentException.class,
() -> new AvroSchemaConverter().convert(message(type)));
}

// Test that conversions for timestamp types only use APIs that are available in the user's Avro version
for (String avroVersion : ImmutableSet.of("1.7.0", "1.8.0", "1.9.0", "1.10.0", "1.11.0")) {
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn(avroVersion);
final Schema converted = new AvroSchemaConverter()
.convert(Types.buildMessage()
.addField(Types.primitive(INT64, Type.Repetition.REQUIRED)
.as(LogicalTypeAnnotation.timestampType(
false, LogicalTypeAnnotation.TimeUnit.MICROS))
.length(1)
.named("timestamp_type"))
.named("TestAvro"));

assertEquals(
avroVersion.matches("1\\.[789]\\.\\d+") ? "timestamp-micros" : "local-timestamp-micros",
converted
.getField("timestamp_type")
.schema()
.getLogicalType()
.getName());
}
}

@Test
Expand Down

0 comments on commit 312a15f

Please sign in to comment.