Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12327: Remove MethodHandle usage in CompressionType #10123

Merged
merged 5 commits into from
Feb 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>

<!-- Third-party compression libraries should only be references from this package -->
<subpackage name="compress">
<allow pkg="com.github.luben.zstd" />
<allow pkg="net.jpountz.lz4" />
<allow pkg="net.jpountz.xxhash" />
<allow pkg="org.apache.kafka.common.compress" />
<allow pkg="org.xerial.snappy" />
</subpackage>

<subpackage name="message">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.common.protocol" />
Expand Down Expand Up @@ -144,15 +153,14 @@
</subpackage>

<subpackage name="record">
<allow pkg="net.jpountz" />
<allow pkg="org.apache.kafka.common.compress" />
<allow pkg="org.apache.kafka.common.header" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.protocol.types" />
<allow pkg="org.apache.kafka.common.errors" />
<allow pkg="com.github.luben.zstd" />
</subpackage>

<subpackage name="header">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
package org.apache.kafka.common.compress;

import net.jpountz.lz4.LZ4Exception;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4SafeDecompressor;
import net.jpountz.xxhash.XXHash32;
import net.jpountz.xxhash.XXHashFactory;

import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.BD;
import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.FLG;
import org.apache.kafka.common.utils.BufferSupplier;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
import static org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
import static org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.MAGIC;

/**
* A partial implementation of the v1.5.1 LZ4 Frame format.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
package org.apache.kafka.common.compress;

import java.io.IOException;
import java.io.OutputStream;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.compress;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.xerial.snappy.SnappyInputStream;
import org.xerial.snappy.SnappyOutputStream;

import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;

public class SnappyFactory {

private SnappyFactory() { }

public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) {
try {
return new SnappyOutputStream(buffer);
} catch (Throwable e) {
throw new KafkaException(e);
}
}

public static InputStream wrapForInput(ByteBuffer buffer) {
try {
return new SnappyInputStream(new ByteBufferInputStream(buffer));
} catch (Throwable e) {
throw new KafkaException(e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.compress;

import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteBufferOutputStream;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;

public class ZstdFactory {

private ZstdFactory() { }

public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) {
try {
// Set input buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
// in cases where the caller passes a small number of bytes to write (potentially a single byte).
return new BufferedOutputStream(new ZstdOutputStreamNoFinalizer(buffer, RecyclingBufferPool.INSTANCE), 16 * 1024);
} catch (Throwable e) {
throw new KafkaException(e);
}
}

public static InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
// Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
// in cases where the caller reads a small number of bytes (potentially a single byte).
return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer),
RecyclingBufferPool.INSTANCE), 16 * 1024);
} catch (Throwable e) {
throw new KafkaException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.CloseableIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@
*/
package org.apache.kafka.common.record;

import com.github.luben.zstd.BufferPool;
import com.github.luben.zstd.RecyclingBufferPool;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.compress.KafkaLZ4BlockInputStream;
import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream;
import org.apache.kafka.common.compress.SnappyFactory;
import org.apache.kafka.common.compress.ZstdFactory;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteBufferOutputStream;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
Expand All @@ -49,6 +49,7 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu
}
},

// Shipped with the JDK
GZIP(1, "gzip", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
Expand Down Expand Up @@ -76,23 +77,21 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu
}
},

// We should only load classes from a given compression library when we actually use said compression library. This
// is because compression libraries include native code for a set of platforms and we want to avoid errors
// in case the platform is not supported and the compression library is not actually used.
// To ensure this, we only reference compression library code from classes that are only invoked when actual usage
// happens.

SNAPPY(2, "snappy", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
try {
return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer);
} catch (Throwable e) {
throw new KafkaException(e);
}
return SnappyFactory.wrapForOutput(buffer);
}

@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
return (InputStream) SnappyConstructors.INPUT.invoke(new ByteBufferInputStream(buffer));
} catch (Throwable e) {
throw new KafkaException(e);
}
return SnappyFactory.wrapForInput(buffer);
}
},

Expand Down Expand Up @@ -120,28 +119,12 @@ public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, Buf
ZSTD(4, "zstd", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
try {
// Set input buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
// in cases where the caller passes a small number of bytes to write (potentially a single byte).
// It's ok to reference `RecyclingBufferPool` since it doesn't load any native libraries
return new BufferedOutputStream((OutputStream) ZstdConstructors.OUTPUT.invoke(buffer, RecyclingBufferPool.INSTANCE),
16 * 1024);
} catch (Throwable e) {
throw new KafkaException(e);
}
return ZstdFactory.wrapForOutput(buffer);
}

@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
// Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
// in cases where the caller reads a small number of bytes (potentially a single byte).
// It's ok to reference `RecyclingBufferPool` since it doesn't load any native libraries.
return new BufferedInputStream((InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer),
RecyclingBufferPool.INSTANCE), 16 * 1024);
} catch (Throwable e) {
throw new KafkaException(e);
}
return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier);
}
};

Expand Down Expand Up @@ -207,37 +190,4 @@ else if (ZSTD.name.equals(name))
else
throw new IllegalArgumentException("Unknown compression name: " + name);
}

// We should only have a runtime dependency on compression algorithms in case the native libraries don't support
// some platforms.
//
// For Snappy and Zstd, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure
// they're only loaded if used.
//
// For LZ4 we are using org.apache.kafka classes, which should always be in the classpath, and would not trigger
// an error until KafkaLZ4BlockInputStream is initialized, which only happens if LZ4 is actually used.

private static class SnappyConstructors {
static final MethodHandle INPUT = findConstructor("org.xerial.snappy.SnappyInputStream",
MethodType.methodType(void.class, InputStream.class));
static final MethodHandle OUTPUT = findConstructor("org.xerial.snappy.SnappyOutputStream",
MethodType.methodType(void.class, OutputStream.class));
}

private static class ZstdConstructors {
// It's ok to reference `BufferPool` since it doesn't load any native libraries
static final MethodHandle INPUT = findConstructor("com.github.luben.zstd.ZstdInputStreamNoFinalizer",
MethodType.methodType(void.class, InputStream.class, BufferPool.class));
static final MethodHandle OUTPUT = findConstructor("com.github.luben.zstd.ZstdOutputStreamNoFinalizer",
MethodType.methodType(void.class, OutputStream.class, BufferPool.class));
}

private static MethodHandle findConstructor(String className, MethodType methodType) {
try {
return MethodHandles.publicLookup().findConstructor(Class.forName(className), methodType);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.CloseableIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.record.AbstractLegacyRecordBatch.LegacyFileChannelRecordBatch;
import org.apache.kafka.common.record.DefaultRecordBatch.DefaultFileChannelRecordBatch;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Utils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Time;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.CloseableIterator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.CloseableIterator;

import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.kafka.common.record;
package org.apache.kafka.common.utils;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.DefaultRecordBatch;
Expand Down
Loading