From 744d05b12897267803f46549e8bca3d31d57be4c Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sun, 14 Feb 2021 08:12:25 -0800 Subject: [PATCH] KAFKA-12327: Remove MethodHandle usage in CompressionType (#10123) We don't really need it and it causes problems in older Android versions and GraalVM native image usage (there are workarounds for the latter). Move the logic to separate classes that are only invoked when the relevant compression library is actually used. Place such classes in their own package and enforce via checkstyle that only these classes refer to compression library packages. To avoid cyclic dependencies, moved `BufferSupplier` to the `utils` package. Reviewers: Chia-Ping Tsai --- checkstyle/import-control.xml | 12 ++- .../clients/consumer/internals/Fetcher.java | 2 +- .../KafkaLZ4BlockInputStream.java | 11 +-- .../KafkaLZ4BlockOutputStream.java | 2 +- .../kafka/common/compress/SnappyFactory.java | 50 +++++++++++ .../kafka/common/compress/ZstdFactory.java | 58 +++++++++++++ .../record/AbstractLegacyRecordBatch.java | 1 + .../kafka/common/record/CompressionType.java | 82 ++++--------------- .../common/record/DefaultRecordBatch.java | 1 + .../common/record/FileLogInputStream.java | 1 + .../kafka/common/record/MemoryRecords.java | 1 + .../common/record/MutableRecordBatch.java | 1 + .../kafka/common/record/RecordBatch.java | 1 + .../{record => utils}/BufferSupplier.java | 2 +- .../consumer/internals/FetcherTest.java | 2 +- .../{record => compress}/KafkaLZ4Test.java | 5 +- .../common/record/BufferSupplierTest.java | 1 + .../common/record/CompressionTypeTest.java | 3 + .../common/record/DefaultRecordBatchTest.java | 1 + .../common/record/MemoryRecordsTest.java | 1 + .../src/main/scala/kafka/log/LogCleaner.scala | 3 +- .../src/main/scala/kafka/log/LogSegment.scala | 3 +- .../main/scala/kafka/log/LogValidator.scala | 5 +- .../test/scala/unit/kafka/log/LogTest.scala | 3 +- gradle/spotbugs-exclude.xml | 2 +- .../jmh/record/BaseRecordBatchBenchmark.java | 2 +- .../apache/kafka/raft/KafkaRaftClient.java | 2 +- .../raft/internals/RecordsBatchReader.java | 2 +- .../internals/RecordsBatchReaderTest.java | 2 +- .../kafka/snapshot/FileRawSnapshotTest.java | 2 +- .../kafka/snapshot/SnapshotWriterTest.java | 2 +- 31 files changed, 171 insertions(+), 95 deletions(-) rename clients/src/main/java/org/apache/kafka/common/{record => compress}/KafkaLZ4BlockInputStream.java (95%) rename clients/src/main/java/org/apache/kafka/common/{record => compress}/KafkaLZ4BlockOutputStream.java (99%) create mode 100644 clients/src/main/java/org/apache/kafka/common/compress/SnappyFactory.java create mode 100644 clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java rename clients/src/main/java/org/apache/kafka/common/{record => utils}/BufferSupplier.java (99%) rename clients/src/test/java/org/apache/kafka/common/{record => compress}/KafkaLZ4Test.java (98%) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index b6583701a57e..bc0491e2c9c1 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -69,6 +69,15 @@ + + + + + + + + + @@ -144,7 +153,7 @@ - + @@ -152,7 +161,6 @@ - diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 01efa762fc9b..2b9198fa1e75 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -62,7 +62,7 @@ import org.apache.kafka.common.metrics.stats.WindowedCount; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.record.BufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java similarity index 95% rename from clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java rename to clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java index 850b1e96e55f..85e7f7b9bec9 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.record; +package org.apache.kafka.common.compress; import net.jpountz.lz4.LZ4Exception; import net.jpountz.lz4.LZ4Factory; @@ -22,16 +22,17 @@ import net.jpountz.xxhash.XXHash32; import net.jpountz.xxhash.XXHashFactory; -import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD; -import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG; +import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.BD; +import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.FLG; +import org.apache.kafka.common.utils.BufferSupplier; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; -import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC; +import static org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; +import static org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.MAGIC; /** * A partial implementation of the v1.5.1 LZ4 Frame format. diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockOutputStream.java similarity index 99% rename from clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java rename to clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockOutputStream.java index 591ab1693646..5c5aee416f61 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockOutputStream.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.record; +package org.apache.kafka.common.compress; import java.io.IOException; import java.io.OutputStream; diff --git a/clients/src/main/java/org/apache/kafka/common/compress/SnappyFactory.java b/clients/src/main/java/org/apache/kafka/common/compress/SnappyFactory.java new file mode 100644 index 000000000000..b56273df8ea6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/compress/SnappyFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.ByteBufferInputStream; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.xerial.snappy.SnappyInputStream; +import org.xerial.snappy.SnappyOutputStream; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class SnappyFactory { + + private SnappyFactory() { } + + public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) { + try { + return new SnappyOutputStream(buffer); + } catch (Throwable e) { + throw new KafkaException(e); + } + } + + public static InputStream wrapForInput(ByteBuffer buffer) { + try { + return new SnappyInputStream(new ByteBufferInputStream(buffer)); + } catch (Throwable e) { + throw new KafkaException(e); + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java b/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java new file mode 100644 index 000000000000..8f4735e4d80d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.compress; + +import com.github.luben.zstd.RecyclingBufferPool; +import com.github.luben.zstd.ZstdInputStreamNoFinalizer; +import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferInputStream; +import org.apache.kafka.common.utils.ByteBufferOutputStream; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class ZstdFactory { + + private ZstdFactory() { } + + public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) { + try { + // Set input buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance + // in cases where the caller passes a small number of bytes to write (potentially a single byte). + return new BufferedOutputStream(new ZstdOutputStreamNoFinalizer(buffer, RecyclingBufferPool.INSTANCE), 16 * 1024); + } catch (Throwable e) { + throw new KafkaException(e); + } + } + + public static InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { + try { + // Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance + // in cases where the caller reads a small number of bytes (potentially a single byte). + return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer), + RecyclingBufferPool.INSTANCE), 16 * 1024); + } catch (Throwable e) { + throw new KafkaException(e); + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index 83637640af49..59b2c68388a7 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.CloseableIterator; diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index c2694ca7f68b..1b9754ffabbb 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -16,9 +16,12 @@ */ package org.apache.kafka.common.record; -import com.github.luben.zstd.BufferPool; -import com.github.luben.zstd.RecyclingBufferPool; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.compress.KafkaLZ4BlockInputStream; +import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream; +import org.apache.kafka.common.compress.SnappyFactory; +import org.apache.kafka.common.compress.ZstdFactory; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.ByteBufferOutputStream; @@ -26,9 +29,6 @@ import java.io.BufferedOutputStream; import java.io.InputStream; import java.io.OutputStream; -import java.lang.invoke.MethodHandle; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.MethodType; import java.nio.ByteBuffer; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -49,6 +49,7 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu } }, + // Shipped with the JDK GZIP(1, "gzip", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { @@ -76,23 +77,21 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu } }, + // We should only load classes from a given compression library when we actually use said compression library. This + // is because compression libraries include native code for a set of platforms and we want to avoid errors + // in case the platform is not supported and the compression library is not actually used. + // To ensure this, we only reference compression library code from classes that are only invoked when actual usage + // happens. + SNAPPY(2, "snappy", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { - try { - return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer); - } catch (Throwable e) { - throw new KafkaException(e); - } + return SnappyFactory.wrapForOutput(buffer); } @Override public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { - try { - return (InputStream) SnappyConstructors.INPUT.invoke(new ByteBufferInputStream(buffer)); - } catch (Throwable e) { - throw new KafkaException(e); - } + return SnappyFactory.wrapForInput(buffer); } }, @@ -120,28 +119,12 @@ public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, Buf ZSTD(4, "zstd", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { - try { - // Set input buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance - // in cases where the caller passes a small number of bytes to write (potentially a single byte). - // It's ok to reference `RecyclingBufferPool` since it doesn't load any native libraries - return new BufferedOutputStream((OutputStream) ZstdConstructors.OUTPUT.invoke(buffer, RecyclingBufferPool.INSTANCE), - 16 * 1024); - } catch (Throwable e) { - throw new KafkaException(e); - } + return ZstdFactory.wrapForOutput(buffer); } @Override public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { - try { - // Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance - // in cases where the caller reads a small number of bytes (potentially a single byte). - // It's ok to reference `RecyclingBufferPool` since it doesn't load any native libraries. - return new BufferedInputStream((InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer), - RecyclingBufferPool.INSTANCE), 16 * 1024); - } catch (Throwable e) { - throw new KafkaException(e); - } + return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier); } }; @@ -207,37 +190,4 @@ else if (ZSTD.name.equals(name)) else throw new IllegalArgumentException("Unknown compression name: " + name); } - - // We should only have a runtime dependency on compression algorithms in case the native libraries don't support - // some platforms. - // - // For Snappy and Zstd, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure - // they're only loaded if used. - // - // For LZ4 we are using org.apache.kafka classes, which should always be in the classpath, and would not trigger - // an error until KafkaLZ4BlockInputStream is initialized, which only happens if LZ4 is actually used. - - private static class SnappyConstructors { - static final MethodHandle INPUT = findConstructor("org.xerial.snappy.SnappyInputStream", - MethodType.methodType(void.class, InputStream.class)); - static final MethodHandle OUTPUT = findConstructor("org.xerial.snappy.SnappyOutputStream", - MethodType.methodType(void.class, OutputStream.class)); - } - - private static class ZstdConstructors { - // It's ok to reference `BufferPool` since it doesn't load any native libraries - static final MethodHandle INPUT = findConstructor("com.github.luben.zstd.ZstdInputStreamNoFinalizer", - MethodType.methodType(void.class, InputStream.class, BufferPool.class)); - static final MethodHandle OUTPUT = findConstructor("com.github.luben.zstd.ZstdOutputStreamNoFinalizer", - MethodType.methodType(void.class, OutputStream.class, BufferPool.class)); - } - - private static MethodHandle findConstructor(String className, MethodType methodType) { - try { - return MethodHandles.publicLookup().findConstructor(Class.forName(className), methodType); - } catch (ReflectiveOperationException e) { - throw new RuntimeException(e); - } - } - } diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 33709c038796..62cab8fafe42 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.CloseableIterator; diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java index 15c09dea32c8..10837d650a6d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.record.AbstractLegacyRecordBatch.LegacyFileChannelRecordBatch; import org.apache.kafka.common.record.DefaultRecordBatch.DefaultFileChannelRecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Utils; diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 82a54afe6f7f..7d14f67d6e63 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.network.TransferableChannel; import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention; import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Time; diff --git a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java index 8c0dc2363e94..fc924b0a8072 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.CloseableIterator; diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index 65a6a95fbe41..1cff7a238906 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.CloseableIterator; import java.nio.ByteBuffer; diff --git a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java b/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java similarity index 99% rename from clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java rename to clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java index 1a6c92c712fb..1688d105f068 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.common.record; +package org.apache.kafka.common.utils; import java.nio.ByteBuffer; import java.util.ArrayDeque; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index c49bb6a700aa..9330f9eb51c5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -65,7 +65,7 @@ import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.record.BufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.record.DefaultRecordBatch; diff --git a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java b/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java similarity index 98% rename from clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java rename to clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java index 5f35f7d78034..a03c8308617c 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java +++ b/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java @@ -14,10 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.record; +package org.apache.kafka.common.compress; import net.jpountz.xxhash.XXHashFactory; +import org.apache.kafka.common.utils.BufferSupplier; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -34,7 +35,7 @@ import java.util.Random; import java.util.stream.Stream; -import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; +import static org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; diff --git a/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java b/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java index 9ead2881beae..e580be5ad71b 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.record; +import org.apache.kafka.common.utils.BufferSupplier; import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; diff --git a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java index af696c3d608c..16b560d2e800 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.compress.KafkaLZ4BlockInputStream; +import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.junit.jupiter.api.Test; diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java index 4e8690636090..9cd744a8f630 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index cab667a8b4d9..ebac0bd818df 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.message.LeaderChangeMessage; import org.apache.kafka.common.message.LeaderChangeMessage.Voter; import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.extension.ExtensionContext; diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 3225d9d1b314..df9722c6ee2e 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -21,7 +21,6 @@ import java.io.{File, IOException} import java.nio._ import java.util.Date import java.util.concurrent.TimeUnit - import kafka.common._ import kafka.metrics.KafkaMetricsGroup import kafka.server.{BrokerReconfigurable, KafkaConfig, LogDirFailureChannel} @@ -32,7 +31,7 @@ import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageExcep import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention import org.apache.kafka.common.record._ -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.utils.{BufferSupplier, Time} import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index b43833d4a701..37882ffa5259 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -20,7 +20,6 @@ import java.io.{File, IOException} import java.nio.file.{Files, NoSuchFileException} import java.nio.file.attribute.FileTime import java.util.concurrent.TimeUnit - import kafka.common.LogSegmentOffsetOverflowException import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server.epoch.LeaderEpochFileCache @@ -30,7 +29,7 @@ import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset} import org.apache.kafka.common.record._ -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.utils.{BufferSupplier, Time} import scala.jdk.CollectionConverters._ import scala.math._ diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index b2e6222e8ec5..056be10be181 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -17,19 +17,18 @@ package kafka.log import java.nio.ByteBuffer - import kafka.api.{ApiVersion, KAFKA_2_1_IV0} import kafka.common.{LongRef, RecordValidationException} import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} import kafka.server.BrokerTopicStats import kafka.utils.Logging import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} -import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} +import org.apache.kafka.common.record.{AbstractRecords, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.ProduceResponse.RecordError -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.utils.{BufferSupplier, Time} import scala.collection.{Seq, mutable} import scala.jdk.CollectionConverters._ diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index dab9eb1d71bd..1a953c53d5d9 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -23,7 +23,6 @@ import java.nio.file.{Files, Paths} import java.util.concurrent.{Callable, Executors} import java.util.regex.Pattern import java.util.{Collections, Optional, Properties} - import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0} import kafka.common.{OffsetsOutOfOrderException, RecordValidationException, UnexpectedAppendOffsetException} import kafka.log.Log.DeleteDirSuffix @@ -41,7 +40,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse} -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} import org.easymock.EasyMock import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 3d0e0d2b6802..ab60dfdc7d0d 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -95,7 +95,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - + diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java index 834652e0b19f..30f908ea0881 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java @@ -19,7 +19,7 @@ import kafka.server.BrokerTopicStats; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.record.AbstractRecords; -import org.apache.kafka.common.record.BufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 9b6eb576c475..b021e3a52a69 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -40,7 +40,7 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.record.BufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Records; diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java index 9a4c6b572c61..0817138e3ff0 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.protocol.DataInputStreamReadable; import org.apache.kafka.common.protocol.Readable; -import org.apache.kafka.common.record.BufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java index 7dc1769c57f6..78ffd51befaf 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.raft.internals; -import org.apache.kafka.common.record.BufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; diff --git a/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java b/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java index 37dac9feaee9..dc4f6359d6cd 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.snapshot; -import org.apache.kafka.common.record.BufferSupplier.GrowableBufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java index 35652c75306c..27bdff2fda02 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.Random; import java.util.Set; -import org.apache.kafka.common.record.BufferSupplier.GrowableBufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.RaftClientTestContext;