Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly handle the mapped and registered regions in memory_mapped_source #16865

Merged
merged 27 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e3fd5bb
don't register the padding
vuule Sep 4, 2024
47a494e
Merge branch 'branch-24.10' of https://github.com/rapidsai/cudf into …
vuule Sep 4, 2024
cc4ab53
works
vuule Sep 17, 2024
879d450
mild clean up
vuule Sep 17, 2024
b7b4935
bit more clean up
vuule Sep 18, 2024
63abe6a
well.. there it is
vuule Sep 18, 2024
9a26102
well.. there it is
vuule Sep 18, 2024
a1f487d
Merge branch 'rework-read_csv-ingest' of https://github.com/vuule/cud…
vuule Sep 18, 2024
f5e5bae
Merge branch 'branch-24.10' into rework-read_csv-ingest
vuule Sep 18, 2024
cea8b7f
Merge branch 'impr-buffer_register-exact-range' into bug-fix-mmaped-s…
vuule Sep 18, 2024
315119f
Merge branch 'branch-24.10' of https://github.com/rapidsai/cudf into …
vuule Sep 19, 2024
42d560a
Merge branch 'branch-24.12' into rework-read_csv-ingest
vuule Sep 19, 2024
8adbbf4
reorg file sources
vuule Sep 19, 2024
3b38774
done
vuule Sep 20, 2024
ad93cec
Merge branch 'rework-read_csv-ingest' of https://github.com/vuule/cud…
vuule Sep 20, 2024
0b33a4c
Merge branch 'branch-24.10' of https://github.com/rapidsai/cudf into …
vuule Sep 20, 2024
f490c30
Merge branch 'branch-24.12' of https://github.com/rapidsai/cudf into …
vuule Sep 20, 2024
e6d4111
Merge branch 'rework-read_csv-ingest' into bug-fix-mmaped-source
vuule Sep 20, 2024
f5ab98a
tests + fix
vuule Sep 21, 2024
2282897
Merge branch 'branch-24.12' into bug-fix-mmaped-source
vuule Sep 28, 2024
f8ae1d6
comment fix
vuule Oct 2, 2024
a6bc941
docs, max >=min check
vuule Oct 2, 2024
45e1e5e
Merge branch 'bug-fix-mmaped-source' of https://github.com/rapidsai/c…
vuule Oct 2, 2024
1d0ece4
Merge branch 'branch-24.12' into bug-fix-mmaped-source
vuule Oct 2, 2024
808a22c
bit more docs
vuule Oct 2, 2024
7e53ed3
Merge branch 'bug-fix-mmaped-source' of https://github.com/rapidsai/c…
vuule Oct 2, 2024
2d8363a
style
vuule Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,16 @@ class datasource {
*
* @param[in] filepath Path to the file to use
* @param[in] offset Bytes from the start of the file (the default is zero)
* @param[in] size Bytes from the offset; use zero for entire file (the default is zero)
* @param[in] max_size_estimate Maximum size of the input within the file (the default is zero,
* which means the whole file after `offset`)
* @param[in] min_size_estimate Minimum size of the input within the file (the default is zero,
* which means the whole file after `offset`)
* @return Constructed datasource object
*/
static std::unique_ptr<datasource> create(std::string const& filepath,
size_t offset = 0,
size_t size = 0);
size_t offset = 0,
size_t max_size_estimate = 0,
size_t min_size_estimate = 0);
Comment on lines +109 to +110
Copy link
Contributor Author

@vuule vuule Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the max includes the padding for the last row when reading a byte range. We're now passing the byte range size without padding as well, for the operations that should not include the padding, such as the buffer registration (because registering overlapping ranges fails, and this can happen when we read a CSV or a JSON file in chunks).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make that use case more obvious in the docstrings? I wouldn't know the difference between padded/non-padded and registration restrictions from this docstring.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to throw if max < min? If so that @throws would go into the docstring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added docs here and in memory_mapped_source constructor. I think they add up to a full explanation, let me know what you think :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! Thanks.


/**
* @brief Creates a source from a host memory buffer.
Expand Down
14 changes: 9 additions & 5 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,16 @@ chunked_parquet_writer_options_builder chunked_parquet_writer_options::builder(
namespace {

std::vector<std::unique_ptr<cudf::io::datasource>> make_datasources(source_info const& info,
size_t range_offset = 0,
size_t range_size = 0)
size_t offset = 0,
size_t max_size_estimate = 0,
size_t min_size_estimate = 0)
{
switch (info.type()) {
case io_type::FILEPATH: {
auto sources = std::vector<std::unique_ptr<cudf::io::datasource>>();
for (auto const& filepath : info.filepaths()) {
sources.emplace_back(cudf::io::datasource::create(filepath, range_offset, range_size));
sources.emplace_back(
cudf::io::datasource::create(filepath, offset, max_size_estimate, min_size_estimate));
}
return sources;
}
Expand Down Expand Up @@ -211,7 +213,8 @@ table_with_metadata read_json(json_reader_options options,

auto datasources = make_datasources(options.get_source(),
options.get_byte_range_offset(),
options.get_byte_range_size_with_padding());
options.get_byte_range_size_with_padding(),
options.get_byte_range_size());

return json::detail::read_json(datasources, options, stream, mr);
}
Expand All @@ -238,7 +241,8 @@ table_with_metadata read_csv(csv_reader_options options,

auto datasources = make_datasources(options.get_source(),
options.get_byte_range_offset(),
options.get_byte_range_size_with_padding());
options.get_byte_range_size_with_padding(),
options.get_byte_range_size());

CUDF_EXPECTS(datasources.size() == 1, "Only a single source is currently supported.");

Expand Down
150 changes: 89 additions & 61 deletions cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <unistd.h>

#include <unordered_map>
#include <vector>

namespace cudf {
namespace io {
Expand All @@ -54,6 +55,30 @@ class file_source : public datasource {
}
}

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
Copy link
Contributor Author

@vuule vuule Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the implementation from the derived class because the memory mapped source now performs direct reads when the location is outside of the mapped range.

{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
ssize_t const read_size = std::min(size, _file.size() - offset);

std::vector<uint8_t> v(read_size);
CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed");
return buffer::create(std::move(v));
}

size_t host_read(size_t offset, size_t size, uint8_t* dst) override
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
auto const read_size = std::min(size, _file.size() - offset);

CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast<ssize_t>(read_size),
"read failed");
return read_size;
}

~file_source() override = default;

[[nodiscard]] bool supports_device_read() const override
Expand Down Expand Up @@ -138,40 +163,59 @@ class file_source : public datasource {
*/
class memory_mapped_source : public file_source {
public:
explicit memory_mapped_source(char const* filepath, size_t offset, size_t size)
explicit memory_mapped_source(char const* filepath,
size_t offset,
size_t map_size,
size_t register_size)
: file_source(filepath)
{
if (_file.size() != 0) {
map(_file.desc(), offset, size);
register_mmap_buffer();
map(_file.desc(), offset, map_size);
register_mmap_buffer(offset, register_size);
}
}

~memory_mapped_source() override
{
if (_map_addr != nullptr) {
munmap(_map_addr, _map_size);
unmap();
unregister_mmap_buffer();
}
}

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
{
CUDF_EXPECTS(offset >= _map_offset, "Requested offset is outside mapping");
// Clamp length to available data
auto const read_size = std::min(size, +_file.size() - offset);

// If the requested range is outside of the mapped region, read from the file
if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) {
return file_source::host_read(offset, read_size);
}

// Clamp length to available data in the mapped region
auto const read_size = std::min(size, _map_size - (offset - _map_offset));
// If the requested range is only partially within the mapped region, copy to a new
vuule marked this conversation as resolved.
Show resolved Hide resolved
// host buffer to make the data safe to copy to the device
if (_reg_addr != nullptr and
(offset < _reg_offset or offset + read_size > (_reg_offset + _reg_size))) {
auto const src = static_cast<uint8_t*>(_map_addr) + (offset - _map_offset);

return std::make_unique<owning_buffer<std::vector<uint8_t>>>(
std::vector<uint8_t>(src, src + read_size));
}

return std::make_unique<non_owning_buffer>(
static_cast<uint8_t*>(_map_addr) + (offset - _map_offset), read_size);
static_cast<uint8_t*>(_map_addr) + offset - _map_offset, read_size);
}

size_t host_read(size_t offset, size_t size, uint8_t* dst) override
{
CUDF_EXPECTS(offset >= _map_offset, "Requested offset is outside mapping");
// Clamp length to available data
auto const read_size = std::min(size, +_file.size() - offset);

// Clamp length to available data in the mapped region
auto const read_size = std::min(size, _map_size - (offset - _map_offset));
// If the requested range is outside of the mapped region, read from the file
if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) {
return file_source::host_read(offset, read_size, dst);
}

auto const src = static_cast<uint8_t*>(_map_addr) + (offset - _map_offset);
std::memcpy(dst, src, read_size);
Expand All @@ -184,16 +228,18 @@ class memory_mapped_source : public file_source {
*
* Fixes nvbugs/4215160
*/
void register_mmap_buffer()
void register_mmap_buffer(size_t offset, size_t size)
{
if (_map_addr == nullptr or _map_size == 0 or not pageableMemoryAccessUsesHostPageTables()) {
return;
}
if (_map_addr == nullptr or not pageableMemoryAccessUsesHostPageTables()) { return; }

auto const result = cudaHostRegister(_map_addr, _map_size, cudaHostRegisterDefault);
if (result == cudaSuccess) {
_is_map_registered = true;
} else {
// Registered region must be within the mapped region
_reg_offset = std::max(offset, _map_offset);
_reg_size = std::min(size != 0 ? size : _map_size, (_map_offset + _map_size) - _reg_offset);

_reg_addr = static_cast<std::byte*>(_map_addr) - _map_offset + _reg_offset;
auto const result = cudaHostRegister(_reg_addr, _reg_size, cudaHostRegisterReadOnly);
if (result != cudaSuccess) {
_reg_addr = nullptr;
CUDF_LOG_WARN("cudaHostRegister failed with {} ({})",
static_cast<int>(result),
cudaGetErrorString(result));
Expand All @@ -205,10 +251,12 @@ class memory_mapped_source : public file_source {
*/
void unregister_mmap_buffer()
{
if (not _is_map_registered) { return; }
if (_reg_addr == nullptr) { return; }

auto const result = cudaHostUnregister(_map_addr);
if (result != cudaSuccess) {
auto const result = cudaHostUnregister(_reg_addr);
if (result == cudaSuccess) {
_reg_addr = nullptr;
} else {
CUDF_LOG_WARN("cudaHostUnregister failed with {} ({})",
static_cast<int>(result),
cudaGetErrorString(result));
Expand All @@ -226,52 +274,30 @@ class memory_mapped_source : public file_source {

// Size for `mmap()` needs to include the page padding
_map_size = size + (offset - _map_offset);
if (_map_size == 0) { return; }

// Check if accessing a region within already mapped area
_map_addr = mmap(nullptr, _map_size, PROT_READ, MAP_PRIVATE, fd, _map_offset);
CUDF_EXPECTS(_map_addr != MAP_FAILED, "Cannot create memory mapping");
}

private:
size_t _map_size = 0;
size_t _map_offset = 0;
void* _map_addr = nullptr;
bool _is_map_registered = false;
};

/**
* @brief Implementation class for reading from a file using `read` calls
*
* Potentially faster than `memory_mapped_source` when only a small portion of the file is read
* through the host.
*/
class direct_read_source : public file_source {
public:
explicit direct_read_source(char const* filepath) : file_source(filepath) {}

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
void unmap()
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
ssize_t const read_size = std::min(size, _file.size() - offset);

std::vector<uint8_t> v(read_size);
CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed");
return buffer::create(std::move(v));
if (_map_addr != nullptr) {
auto const result = munmap(_map_addr, _map_size);
if (result != 0) { CUDF_LOG_WARN("munmap failed with {}", result); }
_map_addr = nullptr;
}
}

size_t host_read(size_t offset, size_t size, uint8_t* dst) override
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
auto const read_size = std::min(size, _file.size() - offset);
private:
size_t _map_offset = 0;
size_t _map_size = 0;
void* _map_addr = nullptr;

CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast<ssize_t>(read_size),
"read failed");
return read_size;
}
size_t _reg_offset = 0;
size_t _reg_size = 0;
void* _reg_addr = nullptr;
};

/**
Expand Down Expand Up @@ -431,16 +457,18 @@ class user_datasource_wrapper : public datasource {

std::unique_ptr<datasource> datasource::create(std::string const& filepath,
size_t offset,
size_t size)
size_t max_size_estimate,
size_t min_size_estimate)
{
#ifdef CUFILE_FOUND
if (cufile_integration::is_always_enabled()) {
// avoid mmap as GDS is expected to be used for most reads
return std::make_unique<direct_read_source>(filepath.c_str());
return std::make_unique<file_source>(filepath.c_str());
}
#endif
// Use our own memory mapping implementation for direct file reads
return std::make_unique<memory_mapped_source>(filepath.c_str(), offset, size);
return std::make_unique<memory_mapped_source>(
filepath.c_str(), offset, max_size_estimate, min_size_estimate);
}

std::unique_ptr<datasource> datasource::create(host_buffer const& buffer)
Expand Down
35 changes: 35 additions & 0 deletions cpp/tests/io/csv_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2516,4 +2516,39 @@ TEST_F(CsvReaderTest, UTF8BOM)
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(result_view, expected);
}

void expect_buffers_equal(cudf::io::datasource::buffer* lhs, cudf::io::datasource::buffer* rhs)
{
ASSERT_EQ(lhs->size(), rhs->size());
EXPECT_EQ(0, std::memcmp(lhs->data(), rhs->data(), lhs->size()));
}

TEST_F(CsvReaderTest, OutOfMapBoundsReads)
{
// write a lot of data into a file
auto filepath = temp_env->get_temp_dir() + "OutOfMapBoundsReads.csv";
auto const num_rows = 1 << 20;
auto const row = std::string{"0,1,2,3,4,5,6,7,8,9\n"};
auto const file_size = num_rows * row.size();
{
std::ofstream outfile(filepath, std::ofstream::out);
for (size_t i = 0; i < num_rows; ++i) {
outfile << row;
}
}

// Only memory map the middle of the file
auto source = cudf::io::datasource::create(filepath, file_size / 2, file_size / 4);
auto full_source = cudf::io::datasource::create(filepath);
auto const all_data = source->host_read(0, file_size);
auto ref_data = full_source->host_read(0, file_size);
expect_buffers_equal(ref_data.get(), all_data.get());

auto const start_data = source->host_read(file_size / 2, file_size / 2);
expect_buffers_equal(full_source->host_read(file_size / 2, file_size / 2).get(),
start_data.get());

auto const end_data = source->host_read(0, file_size / 2 + 512);
expect_buffers_equal(full_source->host_read(0, file_size / 2 + 512).get(), end_data.get());
}

CUDF_TEST_PROGRAM_MAIN()
Loading