Skip to content

Commit

Permalink
fix: compile error with lifetime issues on optimize (delta-io#1843)
Browse files Browse the repository at this point in the history
  • Loading branch information
dispanser committed Nov 13, 2023
1 parent 2b913b3 commit 21271ba
Showing 1 changed file with 26 additions and 21 deletions.
47 changes: 26 additions & 21 deletions crates/deltalake-core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ impl MergePlan {
Some(task_parameters.input_parameters.target_size as usize),
None,
)?;
let mut writer = PartitionWriter::try_with_config(object_store.clone(), writer_config)?;
let mut writer = PartitionWriter::try_with_config(object_store, writer_config)?;

let mut read_stream = read_stream.await?;

Expand Down Expand Up @@ -478,19 +478,7 @@ impl MergePlan {

let object_store_ref = context.object_store.clone();
// Read all batches into a vec
let batches: Vec<RecordBatch> = futures::stream::iter(files.clone())
.then(|file| {
let object_store_ref = object_store_ref.clone();
async move {
let file_reader = ParquetObjectReader::new(object_store_ref.clone(), file);
ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.build()
}
})
.try_flatten()
.try_collect::<Vec<_>>()
.await?;
let batches = zorder::collect_batches(object_store_ref, files).await?;

// For each batch, compute the zorder key
let zorder_keys: Vec<ArrayRef> =
Expand Down Expand Up @@ -608,7 +596,7 @@ impl MergePlan {
for file in files.iter() {
debug!(" file {}", file.location);
}
let object_store_ref = log_store.object_store().clone();
let object_store_ref = log_store.object_store();
let batch_stream = futures::stream::iter(files.clone())
.then(move |file| {
let object_store_ref = object_store_ref.clone();
Expand Down Expand Up @@ -636,14 +624,13 @@ impl MergePlan {
#[cfg(not(feature = "datafusion"))]
let exec_context = Arc::new(zorder::ZOrderExecContext::new(
zorder_columns,
log_store.object_store().clone(),
log_store.object_store(),
// If there aren't enough bins to use all threads, then instead
// use threads within the bins. This is important for the case where
// the table is un-partitioned, in which case the entire table is just
// one big bin.
bins.len() <= num_cpus::get(),
));
let object_store = log_store.object_store().clone();

#[cfg(feature = "datafusion")]
let exec_context = Arc::new(zorder::ZOrderExecContext::new(
Expand All @@ -652,17 +639,15 @@ impl MergePlan {
max_spill_size,
)?);
let task_parameters = self.task_parameters.clone();
let log_store = log_store.clone();
futures::stream::iter(bins)
.map(move |(partition, files)| {
let batch_stream = Self::read_zorder(files.clone(), exec_context.clone());

let object_store = object_store.clone();

let rewrite_result = tokio::task::spawn(Self::rewrite_files(
task_parameters.clone(),
partition,
files,
object_store,
log_store.object_store(),
batch_stream,
));
util::flatten_join_error(rewrite_result)
Expand Down Expand Up @@ -1107,6 +1092,26 @@ pub(super) mod zorder {
}
}

// #[cfg(not(feature = "datafusion"))]
/// Read all batches into a vec - is an async function in disguise
pub(super) fn collect_batches(
object_store: ObjectStoreRef,
files: MergeBin,
) -> impl Future<Output = Result<Vec<RecordBatch>, ParquetError>> {
futures::stream::iter(files.clone())
.then(move |file| {
let object_store = object_store.clone();
async move {
let file_reader = ParquetObjectReader::new(object_store.clone(), file);
ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.build()
}
})
.try_flatten()
.try_collect::<Vec<_>>()
}

#[cfg(feature = "datafusion")]
pub use self::datafusion::ZOrderExecContext;

Expand Down

0 comments on commit 21271ba

Please sign in to comment.