From 89938fafe3bfb32d029741d1d006896583b68579 Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Thu, 10 Dec 2020 10:52:27 -0800 Subject: [PATCH] Add JNI wrapper for the cuFile API (GDS)(#6940) This adds a `libcufilejni.so` that's by default not built nor loaded. The unit tests are controlled similarly. Tested locally with the corresponding spark-rapids plugin changes. @jlowe @revans2 @abellina Authors: - Rong Ou Approvers: - Robert (Bobby) Evans URL: https://github.com/rapidsai/cudf/pull/6940 --- java/README.md | 13 + java/pom.xml | 25 ++ java/src/main/java/ai/rapids/cudf/CuFile.java | 123 ++++++ java/src/main/native/CMakeLists.txt | 9 +- java/src/main/native/src/CuFileJni.cpp | 359 ++++++++++++++++++ .../test/java/ai/rapids/cudf/CuFileTest.java | 61 +++ 6 files changed, 588 insertions(+), 2 deletions(-) create mode 100644 java/src/main/java/ai/rapids/cudf/CuFile.java create mode 100644 java/src/main/native/src/CuFileJni.cpp create mode 100644 java/src/test/java/ai/rapids/cudf/CuFileTest.java diff --git a/java/README.md b/java/README.md index 03b85aa8819..421d6fa5e95 100644 --- a/java/README.md +++ b/java/README.md @@ -119,3 +119,16 @@ then build the jar: cd src/cudf/java mvn clean install -DPER_THREAD_DEFAULT_STREAM=ON ``` + +## GPUDirect Storage (GDS) + +The JNI code can be built with *GPUDirect Storage* (GDS) support, which enables direct copying +between GPU device buffers and supported filesystems (see +https://docs.nvidia.com/gpudirect-storage/). + +To enable GDS support, first make sure GDS is installed (see +https://docs.nvidia.com/gpudirect-storage/troubleshooting-guide/index.html), then run: +```shell script +cd src/cudf/java +mvn clean install -DUSE_GDS=ON +``` diff --git a/java/pom.xml b/java/pom.xml index ea47ba6ad8f..49a54d4e1ec 100755 --- a/java/pom.xml +++ b/java/pom.xml @@ -147,6 +147,7 @@ OFF OFF INFO + OFF ${project.build.directory}/cmake-build 1.7.30 @@ -158,6 +159,28 @@ -Wno-deprecated-declarations + + no-cufile-tests + + + USE_GDS + !ON + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + **/CuFileTest.java + + + + + + release @@ -349,6 +372,7 @@ + @@ -479,6 +503,7 @@ ${native.build.path} libcudfjni.so + libcufilejni.so diff --git a/java/src/main/java/ai/rapids/cudf/CuFile.java b/java/src/main/java/ai/rapids/cudf/CuFile.java new file mode 100644 index 00000000000..d4be9a0194a --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/CuFile.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.cudf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +/** + * JNI wrapper for accessing the cuFile API. + *

+ * Using this wrapper requires GPUDirect Storage (GDS)/cuFile to be installed in the target + * environment, and the jar to be built with `USE_GDS=ON`. Otherwise it will throw an exception when + * loading. + *

+ * The Java APIs are experimental and subject to change. + * + * @see GDS documentation + */ +public class CuFile { + private static final Logger log = LoggerFactory.getLogger(CuFile.class); + private static boolean initialized = false; + private static long driverPointer = 0; + + static { + initialize(); + } + + /** + * Load the native libraries needed for libcufilejni, if not loaded already; open the cuFile + * driver, and add a shutdown hook to close it. + */ + private static synchronized void initialize() { + if (!initialized) { + try { + NativeDepsLoader.loadNativeDeps(new String[]{"cufilejni"}); + driverPointer = createDriver(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + destroyDriver(driverPointer); + })); + initialized = true; + } catch (Throwable t) { + log.error("Could not load cuFile jni library...", t); + } + } + } + + private static native long createDriver(); + + private static native void destroyDriver(long pointer); + + /** + * Check if the libcufilejni library is loaded. + * + * @return true if the libcufilejni library has been successfully loaded. + */ + public static boolean libraryLoaded() { + return initialized; + } + + /** + * Write a device buffer to a given file path synchronously. + *

+ * This method is NOT thread safe if the path points to the same file on disk. + * + * @param path The file path to copy to. + * @param file_offset The file offset from which to write the buffer. + * @param buffer The device buffer to copy from. + * @return The file offset from which the buffer was appended. + */ + public static void writeDeviceBufferToFile(File path, long file_offset, + BaseDeviceMemoryBuffer buffer) { + writeToFile(path.getAbsolutePath(), file_offset, buffer.getAddress(), buffer.getLength()); + } + + /** + * Append a device buffer to a given file path synchronously. + *

+ * This method is NOT thread safe if the path points to the same file on disk. + * + * @param path The file path to copy to. + * @param buffer The device buffer to copy from. + * @return The file offset from which the buffer was appended. + */ + public static long appendDeviceBufferToFile(File path, BaseDeviceMemoryBuffer buffer) { + return appendToFile(path.getAbsolutePath(), buffer.getAddress(), buffer.getLength()); + } + + /** + * Read a file into a device buffer synchronously. + *

+ * This method is NOT thread safe if the path points to the same file on disk. + * + * @param buffer The device buffer to copy into. + * @param path The file path to copy from. + * @param fileOffset The file offset from which to copy the content. + */ + public static void readFileToDeviceBuffer(BaseDeviceMemoryBuffer buffer, File path, + long fileOffset) { + readFromFile(buffer.getAddress(), buffer.getLength(), path.getAbsolutePath(), fileOffset); + } + + private static native void writeToFile(String path, long file_offset, long address, long length); + + private static native long appendToFile(String path, long address, long length); + + private static native void readFromFile(long address, long length, String path, long fileOffset); +} diff --git a/java/src/main/native/CMakeLists.txt b/java/src/main/native/CMakeLists.txt index 9160f857b4e..6d658b6d80b 100755 --- a/java/src/main/native/CMakeLists.txt +++ b/java/src/main/native/CMakeLists.txt @@ -289,8 +289,7 @@ include_directories("${THRUST_INCLUDE}" "${JNI_INCLUDE_DIRS}" "${CUDF_INCLUDE}" "${RMM_INCLUDE}" - "${ARROW_INCLUDE}" - "$<$:${cuFile_INCLUDE_DIRS}>") + "${ARROW_INCLUDE}") ################################################################################################### # - library paths --------------------------------------------------------------------------------- @@ -321,6 +320,12 @@ add_library(cudfjni SHARED ${SOURCE_FILES}) #Override RPATH for cudfjni SET_TARGET_PROPERTIES(cudfjni PROPERTIES BUILD_RPATH "\$ORIGIN") +if(USE_GDS) + add_library(cufilejni SHARED "src/CuFileJni.cpp") + target_include_directories(cufilejni PRIVATE "${cuFile_INCLUDE_DIRS}") + target_link_libraries(cufilejni PRIVATE "${cuFile_LIBRARIES}") +endif(USE_GDS) + ################################################################################################### # - build options --------------------------------------------------------------------------------- diff --git a/java/src/main/native/src/CuFileJni.cpp b/java/src/main/native/src/CuFileJni.cpp new file mode 100644 index 00000000000..cd563bb4b29 --- /dev/null +++ b/java/src/main/native/src/CuFileJni.cpp @@ -0,0 +1,359 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include +#include +#include + +#include + +#include "jni_utils.hpp" + +namespace { + +/** + * @brief Get the error description based on the CUDA driver error code. + * + * @param cu_result CUDA driver error code. + * @return Description for the error. + */ +char const *GetCuErrorString(CUresult cu_result) { + char const *description; + if (cuGetErrorName(cu_result, &description) != CUDA_SUCCESS) + description = "unknown cuda error"; + return description; +} + +/** + * @brief Get the error description based on the integer error code. + * + * cuFile APIs return both cuFile specific error codes as well as POSIX error codes for ease of use. + * + * @param error_code Integer error code. + * @return Description of the error. + */ +std::string cuFileGetErrorString(int error_code) { + return IS_CUFILE_ERR(error_code) ? std::string(CUFILE_ERRSTR(error_code)) : + std::string(std::strerror(error_code)); +} + +/** + * @brief Get the error description based on the cuFile return status. + * + * @param status cuFile return status. + * @return Description of the error. + */ +std::string cuFileGetErrorString(CUfileError_t status) { + std::string error = cuFileGetErrorString(status.err); + if (IS_CUDA_ERR(status)) { + error.append(".").append(GetCuErrorString(status.cu_err)); + } + return error; +} + +/** + * @brief RAII wrapper for the cuFile driver. + */ +class cufile_driver { +public: + /** @brief Construct a new driver instance by opening the cuFile driver. */ + cufile_driver() { + auto const status = cuFileDriverOpen(); + if (status.err != CU_FILE_SUCCESS) { + CUDF_FAIL("Failed to initialize cuFile driver: " + cuFileGetErrorString(status)); + } + } + + // Disable copy (and move) semantics. + cufile_driver(cufile_driver const &) = delete; + cufile_driver &operator=(cufile_driver const &) = delete; + + /** @brief Destroy the driver instance by closing the cuFile driver. */ + ~cufile_driver() { cuFileDriverClose(); } +}; + +/** @brief RAII wrapper for a device buffer used by cuFile. */ +class cufile_buffer { +public: + /** + * @brief Construct a new cuFile buffer. + * + * @param device_pointer Pointer to the device buffer. + * @param size The size of the allocated device buffer. + * @param register_buffer Whether to register the buffer with cuFile. This should only be set to + * true if this buffer is being reused to fill a larger buffer. + */ + cufile_buffer(void *device_pointer, std::size_t size, bool register_buffer = false) + : device_pointer_{device_pointer}, size_{size}, register_buffer_{register_buffer} { + if (register_buffer_) { + auto const status = cuFileBufRegister(device_pointer_, size_, 0); + if (status.err != CU_FILE_SUCCESS) { + CUDF_FAIL("Failed to register cuFile buffer: " + cuFileGetErrorString(status)); + } + } + } + + // Disable copy (and move) semantics. + cufile_buffer(cufile_buffer const &) = delete; + cufile_buffer &operator=(cufile_buffer const &) = delete; + + /** @brief Destroy the buffer by de-registering it if necessary. */ + ~cufile_buffer() { + if (register_buffer_) { + cuFileBufDeregister(device_pointer_); + } + } + + /** + * @brief Get the pointer to the underlying device buffer. + * + * @return Pointer to the device buffer. + */ + void *device_pointer() const { return device_pointer_; } + + /** + * @brief Get the size of the underlying device buffer. + * + * @return The size of the device buffer. + */ + std::size_t size() const { return size_; } + +private: + /// Pointer to the device buffer. + void *device_pointer_; + /// Size of the device buffer. + std::size_t size_; + /// Whether to register the buffer with cuFile. + bool register_buffer_; +}; + +/** @brief RAII wrapper for a file descriptor and the corresponding cuFile handle. */ +class cufile_file { +public: + /** + * @brief Construct a file wrapper. + * + * Should not be called directly; use the following factory methods instead. + * + * @param file_descriptor A valid file descriptor. + */ + explicit cufile_file(int file_descriptor) : file_descriptor_{file_descriptor} { + CUfileDescr_t cufile_descriptor{CU_FILE_HANDLE_TYPE_OPAQUE_FD, file_descriptor_}; + auto const status = cuFileHandleRegister(&cufile_handle_, &cufile_descriptor); + if (status.err != CU_FILE_SUCCESS) { + close(file_descriptor_); + CUDF_FAIL("Failed to register cuFile handle: " + cuFileGetErrorString(status)); + } + } + + /** + * @brief Factory method to create a file wrapper for reading. + * + * @param path Absolute path of the file to read from. + * @return std::unique_ptr for reading. + */ + static auto make_reader(char const *path) { + auto const file_descriptor = open(path, O_RDONLY | O_DIRECT); + if (file_descriptor < 0) { + CUDF_FAIL("Failed to open file to read: " + cuFileGetErrorString(errno)); + } + return std::make_unique(file_descriptor); + } + + /** + * @brief Factory method to create a file wrapper for writing. + * + * @param path Absolute path of the file to write to. + * @return std::unique_ptr for writing. + */ + static auto make_writer(char const *path) { + auto const file_descriptor = open(path, O_CREAT | O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR); + if (file_descriptor < 0) { + CUDF_FAIL("Failed to open file to write: " + cuFileGetErrorString(errno)); + } + return std::make_unique(file_descriptor); + } + + // Disable copy (and move) semantics. + cufile_file(cufile_file const &) = delete; + cufile_file &operator=(cufile_file const &) = delete; + + /** @brief Destroy the file wrapper by de-registering the cuFile handle and closing the file. */ + ~cufile_file() { + cuFileHandleDeregister(cufile_handle_); + close(file_descriptor_); + } + + /** + * @brief Read the file into a device buffer. + * + * @param buffer Device buffer to read the file content into. + * @param file_offset Starting offset from which to read the file. + */ + void read(cufile_buffer const &buffer, std::size_t file_offset) const { + auto const status = + cuFileRead(cufile_handle_, buffer.device_pointer(), buffer.size(), file_offset, 0); + + if (status < 0) { + if (IS_CUFILE_ERR(status)) { + CUDF_FAIL("Failed to read file into buffer: " + cuFileGetErrorString(status)); + } else { + CUDF_FAIL("Failed to read file into buffer: " + cuFileGetErrorString(errno)); + } + } + + CUDF_EXPECTS(status == buffer.size(), "Size of bytes read is different from buffer size"); + } + + /** + * @brief Write a device buffer to the file. + * + * @param buffer The device buffer to write. + * @param file_offset Starting offset from which to write the file. + */ + void write(cufile_buffer const &buffer, std::size_t file_offset) { + auto const status = + cuFileWrite(cufile_handle_, buffer.device_pointer(), buffer.size(), file_offset, 0); + + if (status < 0) { + if (IS_CUFILE_ERR(status)) { + CUDF_FAIL("Failed to write buffer to file: " + cuFileGetErrorString(status)); + } else { + CUDF_FAIL("Failed to write buffer to file: " + cuFileGetErrorString(errno)); + } + } + + CUDF_EXPECTS(status == buffer.size(), "Size of bytes written is different from buffer size"); + } + + /** + * @brief Append a device buffer to the file. + * + * @param buffer The device buffer to write. + * @return The file offset from which the buffer was appended. + */ + std::size_t append(cufile_buffer const &buffer) { + auto const status = lseek(file_descriptor_, 0, SEEK_END); + if (status < 0) { + CUDF_FAIL("Failed to seek end of file: " + cuFileGetErrorString(errno)); + } + + auto const file_offset = static_cast(status); + write(buffer, file_offset); + return file_offset; + } + +private: + /// The underlying file descriptor. + int file_descriptor_; + /// The registered cuFile handle. + CUfileHandle_t cufile_handle_{}; +}; + +} // anonymous namespace + +extern "C" { + +/** + * @brief Create a new cuFile driver wrapper. + * + * @param env The JNI environment. + * @return Pointer address to the new driver wrapper instance. + */ +JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_CuFile_createDriver(JNIEnv *env, jclass) { + try { + return reinterpret_cast(new cufile_driver()); + } + CATCH_STD(env, 0); +} + +/** + * @brief Destroy the given cuFile driver wrapper. + * + * @param env The JNI environment. + * @param pointer Pointer address to the driver wrapper instance. + */ +JNIEXPORT void JNICALL Java_ai_rapids_cudf_CuFile_destroyDriver(JNIEnv *env, jclass, + jlong pointer) { + try { + if (pointer != 0) { + auto *driver = reinterpret_cast(pointer); + delete driver; + } + } + CATCH_STD(env, ); +} + +/** + * @brief Write a device buffer into a given file path. + * + * @param env The JNI environment. + * @param path Absolute path of the file to copy the buffer to. + * @param file_offset The file offset from which the buffer was written. + * @param device_pointer Pointer address to the device buffer. + * @param size The size of the device buffer. + */ +JNIEXPORT void JNICALL Java_ai_rapids_cudf_CuFile_writeToFile(JNIEnv *env, jclass, jstring path, + jlong file_offset, + jlong device_pointer, jlong size) { + try { + cufile_buffer buffer{reinterpret_cast(device_pointer), static_cast(size)}; + auto writer = cufile_file::make_writer(env->GetStringUTFChars(path, nullptr)); + writer->write(buffer, file_offset); + } + CATCH_STD(env, ); +} + +/** + * @brief Append a device buffer into a given file path. + * + * @param env The JNI environment. + * @param path Absolute path of the file to copy the buffer to. + * @param device_pointer Pointer address to the device buffer. + * @param size The size of the device buffer. + */ +JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_CuFile_appendToFile(JNIEnv *env, jclass, jstring path, + jlong device_pointer, jlong size) { + try { + cufile_buffer buffer{reinterpret_cast(device_pointer), static_cast(size)}; + auto writer = cufile_file::make_writer(env->GetStringUTFChars(path, nullptr)); + return writer->append(buffer); + } + CATCH_STD(env, -1); +} + +/** + * @brief Read from a given file path into a device buffer. + * + * @param env The JNI environment. + * @param device_pointer Pointer address to the device buffer. + * @param size The size of the device buffer. + * @param path Absolute path of the file to copy from. + * @param file_offset The file offset from which to copy content. + */ +JNIEXPORT void JNICALL Java_ai_rapids_cudf_CuFile_readFromFile(JNIEnv *env, jclass, + jlong device_pointer, jlong size, + jstring path, jlong file_offset) { + try { + cufile_buffer buffer{reinterpret_cast(device_pointer), static_cast(size)}; + auto const reader = cufile_file::make_reader(env->GetStringUTFChars(path, nullptr)); + reader->read(buffer, file_offset); + } + CATCH_STD(env, ); +} + +} // extern "C" diff --git a/java/src/test/java/ai/rapids/cudf/CuFileTest.java b/java/src/test/java/ai/rapids/cudf/CuFileTest.java new file mode 100644 index 00000000000..1ce0eaa550b --- /dev/null +++ b/java/src/test/java/ai/rapids/cudf/CuFileTest.java @@ -0,0 +1,61 @@ +package ai.rapids.cudf; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CuFileTest extends CudfTestBase { + @TempDir File tempDir; + + @AfterEach + void tearDown() { + if (PinnedMemoryPool.isInitialized()) { + PinnedMemoryPool.shutdown(); + } + } + + @Test + public void testCopyToFile() { + File tempFile = new File(tempDir, "tempFile"); + try (HostMemoryBuffer orig = HostMemoryBuffer.allocate(16); + DeviceMemoryBuffer from = DeviceMemoryBuffer.allocate(16); + DeviceMemoryBuffer to = DeviceMemoryBuffer.allocate(16); + HostMemoryBuffer dest = HostMemoryBuffer.allocate(16);) { + orig.setLong(0, 123456789); + from.copyFromHostBuffer(orig); + CuFile.writeDeviceBufferToFile(tempFile, 0, from); + CuFile.readFileToDeviceBuffer(to, tempFile, 0); + dest.copyFromDeviceBuffer(to); + assertEquals(123456789, dest.getLong(0)); + } + } + + @Test + public void testAppendToFile() { + File tempFile = new File(tempDir, "tempFile"); + try (HostMemoryBuffer orig = HostMemoryBuffer.allocate(16); + DeviceMemoryBuffer from = DeviceMemoryBuffer.allocate(16); + DeviceMemoryBuffer to = DeviceMemoryBuffer.allocate(16); + HostMemoryBuffer dest = HostMemoryBuffer.allocate(16);) { + orig.setLong(0, 123456789); + from.copyFromHostBuffer(orig); + CuFile.appendDeviceBufferToFile(tempFile, from); + + orig.setLong(0, 987654321); + from.copyFromHostBuffer(orig); + CuFile.appendDeviceBufferToFile(tempFile, from); + + CuFile.readFileToDeviceBuffer(to, tempFile, 0); + dest.copyFromDeviceBuffer(to); + assertEquals(123456789, dest.getLong(0)); + + CuFile.readFileToDeviceBuffer(to, tempFile, 16); + dest.copyFromDeviceBuffer(to); + assertEquals(987654321, dest.getLong(0)); + } + } +}