Skip to content

Commit

Permalink
KAFKA-12327: Remove MethodHandle usage in CompressionType (apache#10123)
Browse files Browse the repository at this point in the history
We don't really need it and it causes problems in older Android versions
and GraalVM native image usage (there are workarounds for the latter).

Move the logic to separate classes that are only invoked when the
relevant compression library is actually used. Place such classes
in their own package and enforce via checkstyle that only these
classes refer to compression library packages.

To avoid cyclic dependencies, moved `BufferSupplier` to the `utils`
package.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
ijuma committed Feb 14, 2021
1 parent b3313b8 commit 744d05b
Show file tree
Hide file tree
Showing 31 changed files with 171 additions and 95 deletions.
12 changes: 10 additions & 2 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>

<!-- Third-party compression libraries should only be references from this package -->
<subpackage name="compress">
<allow pkg="com.github.luben.zstd" />
<allow pkg="net.jpountz.lz4" />
<allow pkg="net.jpountz.xxhash" />
<allow pkg="org.apache.kafka.common.compress" />
<allow pkg="org.xerial.snappy" />
</subpackage>

<subpackage name="message">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.common.protocol" />
Expand Down Expand Up @@ -144,15 +153,14 @@
</subpackage>

<subpackage name="record">
<allow pkg="net.jpountz" />
<allow pkg="org.apache.kafka.common.compress" />
<allow pkg="org.apache.kafka.common.header" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.protocol.types" />
<allow pkg="org.apache.kafka.common.errors" />
<allow pkg="com.github.luben.zstd" />
</subpackage>

<subpackage name="header">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
package org.apache.kafka.common.compress;

import net.jpountz.lz4.LZ4Exception;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4SafeDecompressor;
import net.jpountz.xxhash.XXHash32;
import net.jpountz.xxhash.XXHashFactory;

import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.BD;
import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.FLG;
import org.apache.kafka.common.utils.BufferSupplier;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
import static org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
import static org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.MAGIC;

/**
* A partial implementation of the v1.5.1 LZ4 Frame format.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
package org.apache.kafka.common.compress;

import java.io.IOException;
import java.io.OutputStream;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.compress;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.xerial.snappy.SnappyInputStream;
import org.xerial.snappy.SnappyOutputStream;

import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;

public class SnappyFactory {

private SnappyFactory() { }

public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) {
try {
return new SnappyOutputStream(buffer);
} catch (Throwable e) {
throw new KafkaException(e);
}
}

public static InputStream wrapForInput(ByteBuffer buffer) {
try {
return new SnappyInputStream(new ByteBufferInputStream(buffer));
} catch (Throwable e) {
throw new KafkaException(e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.compress;

import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteBufferOutputStream;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;

public class ZstdFactory {

private ZstdFactory() { }

public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) {
try {
// Set input buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
// in cases where the caller passes a small number of bytes to write (potentially a single byte).
return new BufferedOutputStream(new ZstdOutputStreamNoFinalizer(buffer, RecyclingBufferPool.INSTANCE), 16 * 1024);
} catch (Throwable e) {
throw new KafkaException(e);
}
}

public static InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
// Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
// in cases where the caller reads a small number of bytes (potentially a single byte).
return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer),
RecyclingBufferPool.INSTANCE), 16 * 1024);
} catch (Throwable e) {
throw new KafkaException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.CloseableIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@
*/
package org.apache.kafka.common.record;

import com.github.luben.zstd.BufferPool;
import com.github.luben.zstd.RecyclingBufferPool;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.compress.KafkaLZ4BlockInputStream;
import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream;
import org.apache.kafka.common.compress.SnappyFactory;
import org.apache.kafka.common.compress.ZstdFactory;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteBufferOutputStream;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
Expand All @@ -49,6 +49,7 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu
}
},

// Shipped with the JDK
GZIP(1, "gzip", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
Expand Down Expand Up @@ -76,23 +77,21 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu
}
},

// We should only load classes from a given compression library when we actually use said compression library. This
// is because compression libraries include native code for a set of platforms and we want to avoid errors
// in case the platform is not supported and the compression library is not actually used.
// To ensure this, we only reference compression library code from classes that are only invoked when actual usage
// happens.

SNAPPY(2, "snappy", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
try {
return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer);
} catch (Throwable e) {
throw new KafkaException(e);
}
return SnappyFactory.wrapForOutput(buffer);
}

@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
return (InputStream) SnappyConstructors.INPUT.invoke(new ByteBufferInputStream(buffer));
} catch (Throwable e) {
throw new KafkaException(e);
}
return SnappyFactory.wrapForInput(buffer);
}
},

Expand Down Expand Up @@ -120,28 +119,12 @@ public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, Buf
ZSTD(4, "zstd", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
try {
// Set input buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
// in cases where the caller passes a small number of bytes to write (potentially a single byte).
// It's ok to reference `RecyclingBufferPool` since it doesn't load any native libraries
return new BufferedOutputStream((OutputStream) ZstdConstructors.OUTPUT.invoke(buffer, RecyclingBufferPool.INSTANCE),
16 * 1024);
} catch (Throwable e) {
throw new KafkaException(e);
}
return ZstdFactory.wrapForOutput(buffer);
}

@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
// Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
// in cases where the caller reads a small number of bytes (potentially a single byte).
// It's ok to reference `RecyclingBufferPool` since it doesn't load any native libraries.
return new BufferedInputStream((InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer),
RecyclingBufferPool.INSTANCE), 16 * 1024);
} catch (Throwable e) {
throw new KafkaException(e);
}
return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier);
}
};

Expand Down Expand Up @@ -207,37 +190,4 @@ else if (ZSTD.name.equals(name))
else
throw new IllegalArgumentException("Unknown compression name: " + name);
}

// We should only have a runtime dependency on compression algorithms in case the native libraries don't support
// some platforms.
//
// For Snappy and Zstd, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure
// they're only loaded if used.
//
// For LZ4 we are using org.apache.kafka classes, which should always be in the classpath, and would not trigger
// an error until KafkaLZ4BlockInputStream is initialized, which only happens if LZ4 is actually used.

private static class SnappyConstructors {
static final MethodHandle INPUT = findConstructor("org.xerial.snappy.SnappyInputStream",
MethodType.methodType(void.class, InputStream.class));
static final MethodHandle OUTPUT = findConstructor("org.xerial.snappy.SnappyOutputStream",
MethodType.methodType(void.class, OutputStream.class));
}

private static class ZstdConstructors {
// It's ok to reference `BufferPool` since it doesn't load any native libraries
static final MethodHandle INPUT = findConstructor("com.github.luben.zstd.ZstdInputStreamNoFinalizer",
MethodType.methodType(void.class, InputStream.class, BufferPool.class));
static final MethodHandle OUTPUT = findConstructor("com.github.luben.zstd.ZstdOutputStreamNoFinalizer",
MethodType.methodType(void.class, OutputStream.class, BufferPool.class));
}

private static MethodHandle findConstructor(String className, MethodType methodType) {
try {
return MethodHandles.publicLookup().findConstructor(Class.forName(className), methodType);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.CloseableIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.record.AbstractLegacyRecordBatch.LegacyFileChannelRecordBatch;
import org.apache.kafka.common.record.DefaultRecordBatch.DefaultFileChannelRecordBatch;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Utils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Time;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.CloseableIterator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.CloseableIterator;

import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.kafka.common.record;
package org.apache.kafka.common.utils;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.DefaultRecordBatch;
Expand Down
Loading

0 comments on commit 744d05b

Please sign in to comment.