Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Sep 26, 2024
1 parent f43e92d commit bf9ef6b
Show file tree
Hide file tree
Showing 12 changed files with 490 additions and 26 deletions.
43 changes: 29 additions & 14 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/range.h"
#include "arrow/util/tracing_internal.h"
Expand Down Expand Up @@ -1081,22 +1082,36 @@ Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
std::vector<std::pair<std::string, std::vector<int>>> paths_with_row_group_ids;
std::unordered_map<std::string, int> paths_to_index;

for (int i = 0; i < metadata->num_row_groups(); i++) {
auto row_group = metadata->RowGroup(i);
ARROW_ASSIGN_OR_RAISE(auto path,
FileFromRowGroup(filesystem.get(), base_path, *row_group,
options.validate_column_chunk_paths));

// Insert the path, or increase the count of row groups. It will be assumed that the
// RowGroup of a file are ordered exactly as in the metadata file.
auto inserted_index = paths_to_index.emplace(
std::move(path), static_cast<int>(paths_with_row_group_ids.size()));
if (inserted_index.second) {
paths_with_row_group_ids.push_back({inserted_index.first->first, {}});
if (metadata_source.path() == "_metadata") {
auto kvmd = metadata->key_value_metadata();
for (int i = 0; i < metadata->num_row_groups(); i++) {
auto index = std::to_string(i);
PARQUET_ASSIGN_OR_THROW(auto path, kvmd->Get("row_group_file_" + index));
auto inserted_index = paths_to_index.emplace(
std::move(path), static_cast<int>(paths_with_row_group_ids.size()));
if (inserted_index.second) {
paths_with_row_group_ids.push_back({inserted_index.first->first, {}});
}
paths_with_row_group_ids[inserted_index.first->second].second.push_back(i);
}
} else {
for (int i = 0; i < metadata->num_row_groups(); i++) {
auto row_group = metadata->RowGroup(i);

ARROW_ASSIGN_OR_RAISE(auto path,
FileFromRowGroup(filesystem.get(), base_path, *row_group,
options.validate_column_chunk_paths));

// Insert the path, or increase the count of row groups. It will be assumed that the
// RowGroup of a file are ordered exactly as in the metadata file.
auto inserted_index = paths_to_index.emplace(
std::move(path), static_cast<int>(paths_with_row_group_ids.size()));
if (inserted_index.second) {
paths_with_row_group_ids.push_back({inserted_index.first->first, {}});
}
paths_with_row_group_ids[inserted_index.first->second].second.push_back(i);
}
paths_with_row_group_ids[inserted_index.first->second].second.push_back(i);
}

return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
std::move(filesystem), std::move(format), std::move(metadata), std::move(manifest),
std::move(physical_schema), base_path, std::move(options),
Expand Down
242 changes: 241 additions & 1 deletion cpp/src/arrow/dataset/file_parquet_encryption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/type.h"
#include "arrow/util/key_value_metadata.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/encryption/crypto_factory.h"
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/kms_client.h"
#include "parquet/encryption/test_in_memory_kms.h"
#include "parquet/file_writer.h"

constexpr std::string_view kFooterKeyMasterKey = "0123456789012345";
constexpr std::string_view kFooterKeyMasterKeyId = "footer_key";
Expand All @@ -51,6 +53,62 @@ using arrow::internal::checked_pointer_cast;
namespace arrow {
namespace dataset {

class DatasetTestBase : public ::testing::Test {
public:
void SetUp() override {
// Creates a mock file system using the current time point.
EXPECT_OK_AND_ASSIGN(file_system_, fs::internal::MockFileSystem::Make(
std::chrono::system_clock::now(), {}));
ASSERT_OK(file_system_->CreateDir(std::string(kBaseDir)));

// Init dataset and partitioning.
ASSERT_NO_FATAL_FAILURE(PrepareTableAndPartitioning());

auto file_format = std::make_shared<ParquetFileFormat>();
auto parquet_file_write_options =
checked_pointer_cast<ParquetFileWriteOptions>(file_format->DefaultWriteOptions());

// Write dataset.
auto dataset = std::make_shared<InMemoryDataset>(table_);
EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());

FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = parquet_file_write_options;
write_options.filesystem = file_system_;
write_options.base_dir = kBaseDir;
write_options.partitioning = partitioning_;
write_options.basename_template = "part{i}.parquet";
ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner)));
}

void PrepareTableAndPartitioning() {
// Prepare table data.
auto table_schema = schema({field("a", int64()), field("c", int64()),
field("e", int64()), field("part", utf8())});
table_ = TableFromJSON(table_schema, {R"([
[ 0, 9, 1, "a" ],
[ 1, 8, 2, "a" ],
[ 2, 7, 1, "a" ],
[ 3, 6, 2, "a" ],
[ 4, 5, 1, "a" ],
[ 5, 4, 2, "a" ],
[ 6, 3, 1, "b" ],
[ 7, 2, 2, "b" ],
[ 8, 1, 1, "b" ],
[ 9, 0, 2, "b" ]
])"});

// Use a Hive-style partitioning scheme.
partitioning_ = std::make_shared<HivePartitioning>(schema({field("part", utf8())}));
}

protected:
std::shared_ptr<fs::FileSystem> file_system_;
std::shared_ptr<Table> table_;
std::shared_ptr<Partitioning> partitioning_;
};

// Base class to test writing and reading encrypted dataset.
class DatasetEncryptionTestBase : public ::testing::Test {
public:
Expand Down Expand Up @@ -240,6 +298,188 @@ TEST_F(DatasetEncryptionTest, ReadSingleFile) {
ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(2)->chunk(0))->GetView(0), 1);
}

// Write encrypted _metadata file and read the dataset using the metadata.
TEST_F(DatasetEncryptionTest, ReadDatasetFromEncryptedMetadata) {
// Create decryption properties.
auto decryption_config =
std::make_shared<parquet::encryption::DecryptionConfiguration>();
auto parquet_decryption_config = std::make_shared<ParquetDecryptionConfig>();
parquet_decryption_config->crypto_factory = crypto_factory_;
parquet_decryption_config->kms_connection_config = kms_connection_config_;
parquet_decryption_config->decryption_config = std::move(decryption_config);

// Set scan options.
auto parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config);

auto file_format = std::make_shared<ParquetFileFormat>();
file_format->default_fragment_scan_options = std::move(parquet_scan_options);

auto reader_properties = parquet::default_reader_properties();
decryption_config = std::make_shared<parquet::encryption::DecryptionConfiguration>();
reader_properties.file_decryption_properties(
crypto_factory_->GetFileDecryptionProperties(*kms_connection_config_,
*decryption_config));
auto encryption_config = std::make_shared<parquet::encryption::EncryptionConfiguration>(
std::string(kFooterKeyName));
encryption_config->column_keys = kColumnKeyMapping;

auto parquet_encryption_config = std::make_shared<ParquetEncryptionConfig>();
// Directly assign shared_ptr objects to ParquetEncryptionConfig members
parquet_encryption_config->crypto_factory = crypto_factory_;
parquet_encryption_config->kms_connection_config = kms_connection_config_;
parquet_encryption_config->encryption_config = encryption_config;

auto parquet_file_write_options =
checked_pointer_cast<ParquetFileWriteOptions>(file_format->DefaultWriteOptions());
parquet_file_write_options->parquet_encryption_config =
std::move(parquet_encryption_config);

std::vector<std::string> paths = {"part=a/part0.parquet", "part=c/part0.parquet",
"part=e/part0.parquet", "part=g/part0.parquet",
"part=i/part0.parquet"};

auto file_encryption_properties = crypto_factory_->GetFileEncryptionProperties(
*kms_connection_config_, *encryption_config);

auto table_schema =
schema({field("a", int64()), field("c", int64()), field("e", int64())});
std::shared_ptr<parquet::SchemaDescriptor> schema;
auto writer_props = parquet::WriterProperties::Builder().build();

ASSERT_OK_NO_THROW(
parquet::arrow::ToParquetSchema(table_schema.get(), *writer_props, &schema));
auto fmd_builder = parquet::FileMetaDataBuilder::Make(schema.get(), writer_props);
auto metadata = fmd_builder->Finish();

std::vector<std::string> values;
std::vector<std::string> keys;

// Read metadata from all dataset files and store AADs and paths as key-value metadata.
for (size_t i = 0; i < paths.size(); i++) {
const auto path = paths[i];
ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(path));
auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties);
auto file_metadata = parquet_reader->metadata();

keys.push_back("row_group_aad_" + std::to_string(i));
keys.push_back("row_group_file_" + std::to_string(i));
values.push_back(file_metadata->file_aad());
values.push_back(path);

// Make sure file_paths are stored in metadata
file_metadata->set_file_path(path);
metadata->AppendRowGroups(*file_metadata);
}

// Create a new FileMetadata object with the created AADs and paths as
// key_value_metadata.
fmd_builder = parquet::FileMetaDataBuilder::Make(schema.get(), writer_props);
std::shared_ptr<KeyValueMetadata> file_aad_metadata = key_value_metadata(keys, values);
auto metadata2 = fmd_builder->Finish(file_aad_metadata);
metadata2->AppendRowGroups(*metadata);

// TODO: Make sure plaintext footer mode works
// encryption_config->plaintext_footer = true;
file_encryption_properties = crypto_factory_->GetFileEncryptionProperties(
*kms_connection_config_, *encryption_config);

// Write metadata to _metadata file.
std::string metadata_path = "_metadata";
ASSERT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream(metadata_path));
WriteEncryptedMetadataFile(*metadata2, stream, file_encryption_properties);
ARROW_EXPECT_OK(stream->Close());

// Set scan options.
decryption_config = std::make_shared<parquet::encryption::DecryptionConfiguration>();
parquet_decryption_config = std::make_shared<ParquetDecryptionConfig>();
parquet_decryption_config->crypto_factory = crypto_factory_;
parquet_decryption_config->kms_connection_config = kms_connection_config_;
parquet_decryption_config->decryption_config = std::move(decryption_config);

parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config);

file_format = std::make_shared<ParquetFileFormat>();
file_format->default_fragment_scan_options = std::move(parquet_scan_options);

ParquetFactoryOptions parquet_factory_options;
parquet_factory_options.partitioning = partitioning_;
parquet_factory_options.partition_base_dir = kBaseDir;

ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(metadata_path));
auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties);
auto file_metadata = parquet_reader->metadata();

ASSERT_TRUE(file_metadata->Equals(*metadata2));

// Create parquet dataset factory
ASSERT_OK_AND_ASSIGN(auto parquet_dataset_factory,
ParquetDatasetFactory::Make(metadata_path, file_system_,
file_format, parquet_factory_options));

// Create the dataset
ASSERT_OK_AND_ASSIGN(auto dataset, parquet_dataset_factory->Finish());

// Read dataset into table
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());

// Verify the data was read correctly
ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());

// Validate the table
ASSERT_OK(combined_table->ValidateFull());
AssertTablesEqual(*table_, *combined_table);
}

// Write _metadata file and read the dataset using the metadata.
TEST_F(DatasetTestBase, ReadDatasetFromMetadata) {
auto reader_properties = parquet::default_reader_properties();

std::vector<std::string> paths = {"part=a/part0.parquet", "part=b/part0.parquet"};
std::vector<std::shared_ptr<parquet::FileMetaData>> metadata;

for (const auto& path : paths) {
ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(path));
auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties);
auto file_metadata = parquet_reader->metadata();
// Make sure file_paths are stored in metadata
file_metadata->set_file_path(path);
metadata.push_back(file_metadata);
}
metadata[0]->AppendRowGroups(*metadata[1]);

std::string metadata_path = "_metadata";
ASSERT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream(metadata_path));
WriteMetaDataFile(*metadata[0], stream.get());
ARROW_EXPECT_OK(stream->Close());

auto file_format = std::make_shared<ParquetFileFormat>();
ParquetFactoryOptions factory_options;
factory_options.partitioning = partitioning_;
factory_options.partition_base_dir = kBaseDir;
ASSERT_OK_AND_ASSIGN(auto dataset_factory,
ParquetDatasetFactory::Make(metadata_path, file_system_,
file_format, factory_options));

// Create the dataset
ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish());

// Read dataset into table
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());

// Verify the data was read correctly
ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());

// Validate the table
ASSERT_OK(combined_table->ValidateFull());
AssertTablesEqual(*combined_table, *table_);
}

// GH-39444: This test covers the case where parquet dataset scanner crashes when
// processing encrypted datasets over 2^15 rows in multi-threaded mode.
class LargeRowEncryptionTest : public DatasetEncryptionTestBase {
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,14 @@ Status WriteMetaDataFile(const FileMetaData& file_metadata,
return Status::OK();
}

Status WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
PARQUET_CATCH_NOT_OK(::parquet::WriteEncryptedMetadataFile(file_metadata, sink,
file_encryption_properties));
return Status::OK();
}

Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
std::shared_ptr<::arrow::io::OutputStream> sink, int64_t chunk_size,
std::shared_ptr<WriterProperties> properties,
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ PARQUET_EXPORT
::arrow::Status WriteMetaDataFile(const FileMetaData& file_metadata,
::arrow::io::OutputStream* sink);

/// \brief Write encrypted metadata-only Parquet file to indicated Arrow OutputStream
PARQUET_EXPORT
::arrow::Status WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties);

/// \brief Write a Table to Parquet.
///
/// This writes one table in a single shot. To write a Parquet file with
Expand Down
Loading

0 comments on commit bf9ef6b

Please sign in to comment.