Skip to content

Commit

Permalink
some probing 3
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Sep 11, 2024
1 parent 4a8f0eb commit d784b1e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
10 changes: 10 additions & 0 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ class FileReaderImpl : public FileReader {
manifest_.schema_fields.size(), ")");
}
auto ctx = std::make_shared<ReaderContext>();
// TMPTODO
ctx->reader = reader_.get();
ctx->pool = pool_;
ctx->iterator_factory = SomeRowGroupsFactory(row_groups);
Expand Down Expand Up @@ -281,6 +282,9 @@ class FileReaderImpl : public FileReader {
{"parquet.arrow.physicaltype", phys_type},
{"parquet.arrow.records_to_read", records_to_read}});
#endif
// reader
auto kvmd = this->reader_->metadata()->key_value_metadata();

return reader->NextBatch(records_to_read, out);
END_PARQUET_CATCH_EXCEPTIONS
}
Expand Down Expand Up @@ -1163,6 +1167,11 @@ class RowGroupGenerator {
const int row_group, const std::vector<int>& column_indices) {
// Skips bound checks/pre-buffering, since we've done that already
const int64_t batch_size = self->properties().batch_size();
// self->reader_->metadata()->key_value_metadata()[row_group];
// auto kvmd = self->reader_->metadata()->key_value_metadata();

// auto reader = parquet::ParquetFileReader::Open("", self->reader_properties_);

return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor)
.Then([batch_size](const std::shared_ptr<Table>& table)
-> ::arrow::Result<RecordBatchGenerator> {
Expand Down Expand Up @@ -1251,6 +1260,7 @@ Future<std::shared_ptr<Table>> FileReaderImpl::DecodeRowGroups(
// in a sync context too so use `this` over `self`
std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
std::shared_ptr<::arrow::Schema> result_schema;

RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));
// OptionalParallelForAsync requires an executor
if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/parquet/encryption/encryption_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,10 @@ int32_t AesEncryptor::AesEncryptorImpl::GcmEncrypt(span<const uint8_t> plaintext
throw ParquetException(ss.str());
}
ARROW_LOG(INFO) << "GcmEncryp aad.data() " << aad.data();
// if ((!aad.empty()) && (1 != EVP_EncryptUpdate(ctx_, nullptr, &len, aad.data(),
// static_cast<int>(aad.size())))) {
// throw ParquetException("Couldn't set AAD");
// }
if ((!aad.empty()) && (1 != EVP_EncryptUpdate(ctx_, nullptr, &len, aad.data(),
static_cast<int>(aad.size())))) {
throw ParquetException("Couldn't set AAD");
}

// Encryption
if (plaintext.size() > static_cast<size_t>(std::numeric_limits<int>::max())) {
Expand Down Expand Up @@ -626,10 +626,10 @@ int32_t AesDecryptor::AesDecryptorImpl::GcmDecrypt(span<const uint8_t> ciphertex
throw ParquetException(ss.str());
}
ARROW_LOG(INFO) << "GcmDecrypt aad.data() " << aad.data();
// if ((!aad.empty()) && (1 != EVP_DecryptUpdate(ctx_, nullptr, &len, aad.data(),
// static_cast<int>(aad.size())))) {
// throw ParquetException("Couldn't set AAD");
// }
if ((!aad.empty()) && (1 != EVP_DecryptUpdate(ctx_, nullptr, &len, aad.data(),
static_cast<int>(aad.size())))) {
throw ParquetException("Couldn't set AAD");
}

// Decryption
int decryption_length =
Expand Down
22 changes: 9 additions & 13 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -826,20 +826,16 @@ class FileMetaData::FileMetaDataImpl {
// TODO: if we have parameters for a new file_decryptor, we should create and pass it to the RowGroupMetaData::Make
auto kvmd = metadata_->key_value_metadata;

if (static_cast<int>(kvmd.size()) > i && file_decryptor_->file_aad() != kvmd[i].value) {
std::shared_ptr<InternalFileDecryptor> fds = std::make_shared<InternalFileDecryptor>(file_decryptor_->properties(),
kvmd[i].value,
file_decryptor_->algorithm(),
file_decryptor_->footer_key_metadata(),
file_decryptor_->pool());

// for (size_t j = 0; j < kvmd.size(); j++) {
// if (kvmd[j].key == "aads") {
//
// }
// }
// if (!kvmd.empty() && kvmd[0].key == "file.crypto.metadata") {
// auto crypto_metadata = kvmd[0].value;
// auto file_decryptor = std::make_shared<InternalFileDecryptor>(
// crypto_metadata, properties_, encryption_algorithm(), footer_signing_key_metadata(),
// file_decryptor_->pool());
// return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, properties_,
// &writer_version_, file_decryptor);
// }
return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, properties_,
&writer_version_, fds);
}
auto fds = std::make_shared<InternalFileDecryptor>(file_decryptor_->properties(),
file_decryptor_->file_aad(),
file_decryptor_->algorithm(),
Expand Down

0 comments on commit d784b1e

Please sign in to comment.