From d784b1e230dcb9ade6935a80a3a4bc5053baaed4 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 11 Sep 2024 04:29:02 +0200 Subject: [PATCH] some probing 3 --- cpp/src/parquet/arrow/reader.cc | 10 +++++++++ .../parquet/encryption/encryption_internal.cc | 16 +++++++------- cpp/src/parquet/metadata.cc | 22 ++++++++----------- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 285e2a597389d..eedf4c808efe0 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -213,6 +213,7 @@ class FileReaderImpl : public FileReader { manifest_.schema_fields.size(), ")"); } auto ctx = std::make_shared(); + // TMPTODO ctx->reader = reader_.get(); ctx->pool = pool_; ctx->iterator_factory = SomeRowGroupsFactory(row_groups); @@ -281,6 +282,9 @@ class FileReaderImpl : public FileReader { {"parquet.arrow.physicaltype", phys_type}, {"parquet.arrow.records_to_read", records_to_read}}); #endif +// reader + auto kvmd = this->reader_->metadata()->key_value_metadata(); + return reader->NextBatch(records_to_read, out); END_PARQUET_CATCH_EXCEPTIONS } @@ -1163,6 +1167,11 @@ class RowGroupGenerator { const int row_group, const std::vector& column_indices) { // Skips bound checks/pre-buffering, since we've done that already const int64_t batch_size = self->properties().batch_size(); +// self->reader_->metadata()->key_value_metadata()[row_group]; +// auto kvmd = self->reader_->metadata()->key_value_metadata(); + +// auto reader = parquet::ParquetFileReader::Open("", self->reader_properties_); + return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor) .Then([batch_size](const std::shared_ptr& table) -> ::arrow::Result { @@ -1251,6 +1260,7 @@ Future> FileReaderImpl::DecodeRowGroups( // in a sync context too so use `this` over `self` std::vector> readers; std::shared_ptr<::arrow::Schema> result_schema; + RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema)); // OptionalParallelForAsync requires an executor if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool(); diff --git a/cpp/src/parquet/encryption/encryption_internal.cc b/cpp/src/parquet/encryption/encryption_internal.cc index 604e154929245..ed2b48186f1cb 100644 --- a/cpp/src/parquet/encryption/encryption_internal.cc +++ b/cpp/src/parquet/encryption/encryption_internal.cc @@ -234,10 +234,10 @@ int32_t AesEncryptor::AesEncryptorImpl::GcmEncrypt(span plaintext throw ParquetException(ss.str()); } ARROW_LOG(INFO) << "GcmEncryp aad.data() " << aad.data(); - // if ((!aad.empty()) && (1 != EVP_EncryptUpdate(ctx_, nullptr, &len, aad.data(), - // static_cast(aad.size())))) { - // throw ParquetException("Couldn't set AAD"); - // } + if ((!aad.empty()) && (1 != EVP_EncryptUpdate(ctx_, nullptr, &len, aad.data(), + static_cast(aad.size())))) { + throw ParquetException("Couldn't set AAD"); + } // Encryption if (plaintext.size() > static_cast(std::numeric_limits::max())) { @@ -626,10 +626,10 @@ int32_t AesDecryptor::AesDecryptorImpl::GcmDecrypt(span ciphertex throw ParquetException(ss.str()); } ARROW_LOG(INFO) << "GcmDecrypt aad.data() " << aad.data(); - // if ((!aad.empty()) && (1 != EVP_DecryptUpdate(ctx_, nullptr, &len, aad.data(), - // static_cast(aad.size())))) { - // throw ParquetException("Couldn't set AAD"); - // } + if ((!aad.empty()) && (1 != EVP_DecryptUpdate(ctx_, nullptr, &len, aad.data(), + static_cast(aad.size())))) { + throw ParquetException("Couldn't set AAD"); + } // Decryption int decryption_length = diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index 974afdd27a207..41632310a4005 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -826,20 +826,16 @@ class FileMetaData::FileMetaDataImpl { // TODO: if we have parameters for a new file_decryptor, we should create and pass it to the RowGroupMetaData::Make auto kvmd = metadata_->key_value_metadata; + if (static_cast(kvmd.size()) > i && file_decryptor_->file_aad() != kvmd[i].value) { + std::shared_ptr fds = std::make_shared(file_decryptor_->properties(), + kvmd[i].value, + file_decryptor_->algorithm(), + file_decryptor_->footer_key_metadata(), + file_decryptor_->pool()); -// for (size_t j = 0; j < kvmd.size(); j++) { -// if (kvmd[j].key == "aads") { -// -// } -// } -// if (!kvmd.empty() && kvmd[0].key == "file.crypto.metadata") { -// auto crypto_metadata = kvmd[0].value; -// auto file_decryptor = std::make_shared( -// crypto_metadata, properties_, encryption_algorithm(), footer_signing_key_metadata(), -// file_decryptor_->pool()); -// return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, properties_, -// &writer_version_, file_decryptor); -// } + return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, properties_, + &writer_version_, fds); + } auto fds = std::make_shared(file_decryptor_->properties(), file_decryptor_->file_aad(), file_decryptor_->algorithm(),