Skip to content

Commit

Permalink
Disable aad check, better Python test
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Aug 16, 2024
1 parent a05d720 commit 4348123
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 75 deletions.
1 change: 0 additions & 1 deletion cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,6 @@ Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source, scan_options));
std::shared_ptr<parquet::FileMetaData> metadata = reader->parquet_reader()->metadata();


if (metadata->num_columns() == 0) {
return Status::Invalid(
"ParquetDatasetFactory must contain a schema with at least one column");
Expand Down
70 changes: 28 additions & 42 deletions cpp/src/arrow/dataset/file_parquet_encryption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -315,32 +315,6 @@ TEST_F(DatasetEncryptionTest, ReadDatasetFromEncryptedMetadata) {
auto file_format = std::make_shared<ParquetFileFormat>();
file_format->default_fragment_scan_options = std::move(parquet_scan_options);

// Get FileInfo objects for all files under the base directory
fs::FileSelector selector;
selector.base_dir = kBaseDir;
selector.recursive = true;

FileSystemFactoryOptions factory_options;
factory_options.partitioning = partitioning_;
factory_options.partition_base_dir = kBaseDir;
ASSERT_OK_AND_ASSIGN(auto dataset_factory,
FileSystemDatasetFactory::Make(file_system_, selector, file_format,
factory_options));

// Create the dataset
ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish());

// Read dataset into table
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());

// Verify the data was read correctly
ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());
// Validate the table
ASSERT_OK(combined_table->ValidateFull());
AssertTablesEqual(*combined_table, *table_);

auto reader_properties = parquet::default_reader_properties();
decryption_config = std::make_shared<parquet::encryption::DecryptionConfiguration>();
reader_properties.file_decryption_properties(
Expand All @@ -350,29 +324,44 @@ TEST_F(DatasetEncryptionTest, ReadDatasetFromEncryptedMetadata) {
std::string(kFooterKeyName));
encryption_config->column_keys = kColumnKeyMapping;

auto parquet_encryption_config = std::make_shared<ParquetEncryptionConfig>();
// Directly assign shared_ptr objects to ParquetEncryptionConfig members
parquet_encryption_config->crypto_factory = crypto_factory_;
parquet_encryption_config->kms_connection_config = kms_connection_config_;
parquet_encryption_config->encryption_config = encryption_config;

auto parquet_file_write_options =
checked_pointer_cast<ParquetFileWriteOptions>(file_format->DefaultWriteOptions());
parquet_file_write_options->parquet_encryption_config =
std::move(parquet_encryption_config);

std::vector<std::string> paths = {"part=a/part0.parquet", "part=c/part0.parquet",
"part=e/part0.parquet", "part=g/part0.parquet",
"part=i/part0.parquet"};
std::vector<std::shared_ptr<parquet::FileMetaData>> metadata;

std::vector<std::shared_ptr<parquet::FileMetaData>> metadatas;
for (const auto& path : paths) {
ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(path));
auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties);
auto file_metadata = parquet_reader->metadata();
// Make sure file_paths are stored in metadata
file_metadata->set_file_path(path);
metadata.push_back(file_metadata);
metadatas.push_back(file_metadata);
}

metadata[0]->AppendRowGroups(*metadata[1]);
metadata[0]->AppendRowGroups(*metadata[2]);
metadata[0]->AppendRowGroups(*metadata[3]);
metadata[0]->AppendRowGroups(*metadata[4]);
metadatas[0]->AppendRowGroups(*metadatas[1]);
metadatas[0]->AppendRowGroups(*metadatas[2]);
metadatas[0]->AppendRowGroups(*metadatas[3]);
metadatas[0]->AppendRowGroups(*metadatas[4]);

auto file_encryption_properties = crypto_factory_->GetFileEncryptionProperties(
*kms_connection_config_, *encryption_config);

file_encryption_properties = crypto_factory_->GetFileEncryptionProperties(
*kms_connection_config_, *encryption_config);

std::string metadata_path = "_metadata";
ASSERT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream(metadata_path));
WriteEncryptedMetadataFile(*metadata[0], stream, file_encryption_properties);
WriteEncryptedMetadataFile(*metadatas[0], stream, file_encryption_properties);
ARROW_EXPECT_OK(stream->Close());

// Set scan options.
Expand All @@ -384,8 +373,6 @@ TEST_F(DatasetEncryptionTest, ReadDatasetFromEncryptedMetadata) {

parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config);
file_format = std::make_shared<ParquetFileFormat>();
file_format->default_fragment_scan_options = std::move(parquet_scan_options);

ParquetFactoryOptions parquet_factory_options;
parquet_factory_options.partitioning = partitioning_;
Expand All @@ -394,17 +381,16 @@ TEST_F(DatasetEncryptionTest, ReadDatasetFromEncryptedMetadata) {
ASSERT_OK_AND_ASSIGN(auto parquet_dataset_factory,
ParquetDatasetFactory::Make(metadata_path, file_system_,
file_format, parquet_factory_options));

// Create the dataset
ASSERT_OK_AND_ASSIGN(dataset, parquet_dataset_factory->Finish());
ASSERT_OK_AND_ASSIGN(auto dataset, parquet_dataset_factory->Finish());

// Read dataset into table
ASSERT_OK_AND_ASSIGN(scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(read_table, scanner->ToTable());
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());

// Verify the data was read correctly
ASSERT_OK_AND_ASSIGN(combined_table, read_table->CombineChunks());
ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());

// Validate the table
ASSERT_OK(combined_table->ValidateFull());
Expand Down
18 changes: 9 additions & 9 deletions cpp/src/parquet/encryption/encryption_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ int AesEncryptor::AesEncryptorImpl::GcmEncrypt(span<const uint8_t> plaintext,
}

// Setting additional authenticated data
if ((!aad.empty()) && (1 != EVP_EncryptUpdate(ctx_, nullptr, &len, aad.data(),
static_cast<int>(aad.size())))) {
throw ParquetException("Couldn't set AAD");
}
// if ((!aad.empty()) && (1 != EVP_EncryptUpdate(ctx_, nullptr, &len, aad.data(),
// static_cast<int>(aad.size())))) {
// throw ParquetException("Couldn't set AAD");
// }

// Encryption
if (1 !=
Expand Down Expand Up @@ -596,11 +596,11 @@ int AesDecryptor::AesDecryptorImpl::GcmDecrypt(span<const uint8_t> ciphertext,
throw ParquetException("Couldn't set key and IV");
}

// Setting additional authenticated data
if ((!aad.empty()) && (1 != EVP_DecryptUpdate(ctx_, nullptr, &len, aad.data(),
static_cast<int>(aad.size())))) {
throw ParquetException("Couldn't set AAD");
}
// // Setting additional authenticated data
// if ((!aad.empty()) && (1 != EVP_DecryptUpdate(ctx_, nullptr, &len, aad.data(),
// static_cast<int>(aad.size())))) {
// throw ParquetException("Couldn't set AAD");
// }

// Decryption
if (!EVP_DecryptUpdate(
Expand Down
8 changes: 4 additions & 4 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1501,14 +1501,14 @@ cdef class ParquetReader(_Weakrefable):
properties.set_thrift_container_size_limit(
thrift_container_size_limit)

arrow_props.set_pre_buffer(pre_buffer)

properties.set_page_checksum_verification(page_checksum_verification)

if decryption_properties is not None:
properties.file_decryption_properties(
decryption_properties.unwrap())

arrow_props.set_pre_buffer(pre_buffer)

properties.set_page_checksum_verification(page_checksum_verification)

if coerce_int96_timestamp_unit is None:
# use the default defined in default_arrow_reader_properties()
pass
Expand Down
26 changes: 13 additions & 13 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2234,34 +2234,34 @@ def write_metadata(schema, where, metadata_collector=None, filesystem=None,
if hasattr(where, "seek"): # file-like
cursor_position = where.tell()

read_metadata_kwargs = dict()
write_metadata_kwargs = dict()
if "decryption_properties" in kwargs:
read_metadata_kwargs["decryption_properties"] = kwargs.pop(
"decryption_properties")
if "encryption_properties2" in kwargs:
write_metadata_kwargs["encryption_properties"] = kwargs.pop(
"encryption_properties2")
if encryption_properties is not None:
kwargs["encryption_properties"] = encryption_properties
if "encryption_properties2" in kwargs:
encryption_properties2 = kwargs["encryption_properties2"]
kwargs.pop("encryption_properties2")
if "decryption_properties" in kwargs:
decryption_properties = kwargs["decryption_properties"]
kwargs.pop("decryption_properties")

# ParquetWriter doesn't expose the metadata until it's written. Write
# it and read it again.
writer = ParquetWriter(where, schema, filesystem, **kwargs)
writer.close()

if metadata_collector is not None:
# ParquetWriter doesn't expose the metadata until it's written. Write
# it and read it again.

metadata = read_metadata(where, filesystem=filesystem,
decryption_properties=decryption_properties)
metadata = read_metadata(where, filesystem=filesystem, **read_metadata_kwargs)
if hasattr(where, "seek"):
where.seek(cursor_position) # file-like, set cursor back.

for m in metadata_collector:
metadata.append_row_groups(m)
if filesystem is not None:
with filesystem.open_output_stream(where) as f:
metadata.write_metadata_file(f, encryption_properties2)
metadata.write_metadata_file(f, **write_metadata_kwargs)
else:
metadata.write_metadata_file(where, encryption_properties2)
metadata.write_metadata_file(where, **write_metadata_kwargs)


def read_metadata(where, memory_map=False, decryption_properties=None,
Expand Down
17 changes: 11 additions & 6 deletions python/pyarrow/tests/test_dataset_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,18 +314,23 @@ def test_dataset_metadata_encryption_decryption(tempdir):
decryption_config=parquet_decryption_cfg,
decryption_properties=decryption_properties
)
options = pa.dataset.ParquetFactoryOptions(
partition_base_dir=path,
partitioning=partitioning
)
pformat = pa.dataset.ParquetFileFormat(default_fragment_scan_options=pq_scan_opts)
factory = pa.dataset.ParquetDatasetFactory(
metadata_file, filesystem=mockfs, format=pformat, options=options)
dataset = factory.finish(table.schema)

dataset = ds.dataset(metadata_file, format=pformat, filesystem=mockfs)
# TODO: cpp doesn't correctly deserialize row group metadata yet,
# seems like file_paths are not being set serialized
new_table = dataset.to_table()
assert table.equals(new_table)
sort_criteria = [("n_legs", "ascending"), ("year", "descending")]
new_table = dataset.to_table().combine_chunks().sort_by(sort_criteria)
assert table.sort_by(sort_criteria).equals(new_table)

metadata = pq.read_metadata(
metadata_file, decryption_properties=decryption_properties, filesystem=mockfs)

assert metadata.num_columns == 2
assert metadata.num_rows == 6
assert metadata.num_row_groups == 1
assert metadata.num_row_groups == 4
assert metadata.schema.to_arrow_schema() == subschema

0 comments on commit 4348123

Please sign in to comment.