diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index ed5a3c9a2f..975e98eb39 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -27,7 +27,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; use std::vec; use arrow_array::RecordBatch; @@ -46,6 +46,7 @@ use futures::future::BoxFuture; use futures::StreamExt; use object_store::prefix::PrefixStore; use parquet::file::properties::WriterProperties; +use serde::{Deserialize, Serialize}; use tracing::log::*; use super::cdc::should_write_cdc; @@ -60,7 +61,9 @@ use crate::delta_datafusion::{ }; use crate::delta_datafusion::{DataFusionMixins, DeltaDataChecker}; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Action, Add, AddCDCFile, Metadata, PartitionsExt, Remove, StructType}; +use crate::kernel::{ + Action, ActionType, Add, AddCDCFile, Metadata, PartitionsExt, Remove, StructType, +}; use crate::logstore::LogStoreRef; use crate::operations::cast::{cast_record_batch, merge_schema::merge_arrow_schema}; use crate::protocol::{DeltaOperation, SaveMode}; @@ -162,6 +165,21 @@ pub struct WriteBuilder { configuration: HashMap>, } +#[derive(Default, Debug, Serialize, Deserialize)] +/// Metrics for the Write Operation +pub struct WriteMetrics { + /// Number of files added + pub num_added_files: usize, + /// Number of files removed + pub num_removed_files: usize, + /// Number of partitions + pub num_partitions: usize, + /// Number of rows added + pub num_added_rows: usize, + /// Time taken to execute the entire operation + pub execution_time_ms: u64, +} + impl super::Operation<()> for WriteBuilder {} impl WriteBuilder { @@ -765,6 +783,9 @@ impl std::future::IntoFuture for WriteBuilder { let this = self; Box::pin(async move { + let mut metrics = WriteMetrics::default(); + let exec_start = Instant::now(); + if this.mode == SaveMode::Overwrite { if let Some(snapshot) = &this.snapshot { PROTOCOL.check_append_only(&snapshot.snapshot)?; @@ -853,6 +874,8 @@ impl std::future::IntoFuture for WriteBuilder { let data = if !partition_columns.is_empty() { // TODO partitioning should probably happen in its own plan ... let mut partitions: HashMap> = HashMap::new(); + let mut num_partitions = 0; + let mut num_added_rows = 0; for batch in batches { let real_batch = match new_schema.clone() { Some(new_schema) => cast_record_batch( @@ -869,7 +892,9 @@ impl std::future::IntoFuture for WriteBuilder { partition_columns.clone(), &real_batch, )?; + num_partitions += divided.len(); for part in divided { + num_added_rows += part.record_batch.num_rows(); let key = part.partition_values.hive_partition_path(); match partitions.get_mut(&key) { Some(part_batches) => { @@ -881,11 +906,14 @@ impl std::future::IntoFuture for WriteBuilder { } } } + metrics.num_partitions = num_partitions; + metrics.num_added_rows = num_added_rows; partitions.into_values().collect::>() } else { match new_schema { Some(ref new_schema) => { let mut new_batches = vec![]; + let mut num_added_rows = 0; for batch in batches { new_batches.push(cast_record_batch( &batch, @@ -893,10 +921,15 @@ impl std::future::IntoFuture for WriteBuilder { this.safe_cast, schema_drift, // Schema drifted so we have to add the missing columns/structfields. )?); + num_added_rows += batch.num_rows(); } + metrics.num_added_rows = num_added_rows; vec![new_batches] } - None => vec![batches], + None => { + metrics.num_added_rows = batches.iter().map(|b| b.num_rows()).sum(); + vec![batches] + } } }; @@ -1002,6 +1035,7 @@ impl std::future::IntoFuture for WriteBuilder { None, ) .await?; + metrics.num_added_files = add_actions.len(); actions.extend(add_actions); // Collect remove actions if we are overwriting the table @@ -1077,8 +1111,15 @@ impl std::future::IntoFuture for WriteBuilder { } }; } + metrics.num_removed_files = actions + .iter() + .filter(|a| a.action_type() == ActionType::Remove) + .count(); } + metrics.execution_time_ms = + Instant::now().duration_since(exec_start).as_millis() as u64; + let operation = DeltaOperation::Write { mode: this.mode, partition_by: if !partition_columns.is_empty() { @@ -1089,7 +1130,13 @@ impl std::future::IntoFuture for WriteBuilder { predicate: predicate_str, }; - let commit = CommitBuilder::from(this.commit_properties) + let mut commit_properties = this.commit_properties.clone(); + commit_properties.app_metadata.insert( + "operationMetrics".to_owned(), + serde_json::to_value(&metrics)?, + ); + + let commit = CommitBuilder::from(commit_properties) .with_actions(actions) .build( this.snapshot.as_ref().map(|f| f as &dyn TableReference), @@ -1189,12 +1236,33 @@ mod tests { use itertools::Itertools; use serde_json::{json, Value}; + async fn get_write_metrics(table: DeltaTable) -> WriteMetrics { + let mut commit_info = table.history(Some(1)).await.unwrap(); + let metrics = commit_info + .first_mut() + .unwrap() + .info + .remove("operationMetrics") + .unwrap(); + serde_json::from_value(metrics).unwrap() + } + + fn assert_common_write_metrics(write_metrics: WriteMetrics) { + assert!(write_metrics.execution_time_ms > 0); + assert!(write_metrics.num_added_files > 0); + } + #[tokio::test] async fn test_write_when_delta_table_is_append_only() { let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await; let batch = get_record_batch(None, false); // Append let table = write_batch(table, batch.clone()).await; + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, batch.num_rows()); + assert_eq!(write_metrics.num_removed_files, 0); + assert_common_write_metrics(write_metrics); + // Overwrite let _err = DeltaOps(table) .write(vec![batch]) @@ -1226,6 +1294,12 @@ mod tests { .unwrap(); assert_eq!(table.version(), 1); assert_eq!(table.get_files_count(), 1); + + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, batch.num_rows()); + assert_eq!(write_metrics.num_added_files, table.get_files_count()); + assert_common_write_metrics(write_metrics); + table.load().await.unwrap(); assert_eq!(table.history(None).await.unwrap().len(), 2); assert_eq!( @@ -1233,7 +1307,7 @@ mod tests { .info .clone() .into_iter() - .filter(|(k, _)| k != "clientVersion") + .filter(|(k, _)| k == "k1") .collect::>(), metadata ); @@ -1249,6 +1323,11 @@ mod tests { .unwrap(); assert_eq!(table.version(), 2); assert_eq!(table.get_files_count(), 2); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, batch.num_rows()); + assert_eq!(write_metrics.num_added_files, 1); + assert_common_write_metrics(write_metrics); + table.load().await.unwrap(); assert_eq!(table.history(None).await.unwrap().len(), 3); assert_eq!( @@ -1256,7 +1335,7 @@ mod tests { .info .clone() .into_iter() - .filter(|(k, _)| k != "clientVersion") + .filter(|(k, _)| k == "k1") .collect::>(), metadata ); @@ -1265,13 +1344,18 @@ mod tests { let metadata: HashMap = HashMap::from_iter(vec![("k2".to_string(), json!("v2.1"))]); let mut table = DeltaOps(table) - .write(vec![batch]) + .write(vec![batch.clone()]) .with_save_mode(SaveMode::Overwrite) .with_commit_properties(CommitProperties::default().with_metadata(metadata.clone())) .await .unwrap(); assert_eq!(table.version(), 3); assert_eq!(table.get_files_count(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, batch.num_rows()); + assert!(write_metrics.num_removed_files > 0); + assert_common_write_metrics(write_metrics); + table.load().await.unwrap(); assert_eq!(table.history(None).await.unwrap().len(), 4); assert_eq!( @@ -1279,7 +1363,7 @@ mod tests { .info .clone() .into_iter() - .filter(|(k, _)| k != "clientVersion") + .filter(|(k, _)| k == "k2") .collect::>(), metadata ); @@ -1302,6 +1386,9 @@ mod tests { ) .unwrap(); let table = DeltaOps::new_in_memory().write(vec![batch]).await.unwrap(); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 2); + assert_common_write_metrics(write_metrics); let schema = Arc::new(ArrowSchema::new(vec![Field::new( "value", @@ -1326,6 +1413,10 @@ mod tests { .await .unwrap(); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 3); + assert_common_write_metrics(write_metrics); + let expected = [ "+-------+", "| value |", @@ -1359,6 +1450,10 @@ mod tests { .unwrap(); let table = DeltaOps::new_in_memory().write(vec![batch]).await.unwrap(); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 1); + assert_common_write_metrics(write_metrics); + let schema = Arc::new(ArrowSchema::new(vec![Field::new( "value", DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".to_string().into())), @@ -1394,7 +1489,9 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); - assert_eq!(table.get_files_count(), 1) + assert_eq!(table.get_files_count(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); } #[tokio::test] @@ -1408,6 +1505,10 @@ mod tests { .unwrap(); assert_eq!(table.version(), 0); assert_eq!(table.get_files_count(), 2); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert!(write_metrics.num_partitions > 0); + assert_eq!(write_metrics.num_added_files, 2); + assert_common_write_metrics(write_metrics); let table = DeltaOps::new_in_memory() .write(vec![batch]) @@ -1416,7 +1517,12 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); - assert_eq!(table.get_files_count(), 4) + assert_eq!(table.get_files_count(), 4); + + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert!(write_metrics.num_partitions > 0); + assert_eq!(write_metrics.num_added_files, 4); + assert_common_write_metrics(write_metrics); } #[tokio::test] @@ -1429,6 +1535,9 @@ mod tests { .unwrap(); assert_eq!(table.version(), 0); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); + let mut new_schema_builder = arrow_schema::SchemaBuilder::new(); for field in batch.schema().fields() { if field.name() != "modified" { @@ -1475,6 +1584,9 @@ mod tests { let fields = new_schema.fields(); let names = fields.map(|f| f.name()).collect::>(); assert_eq!(names, vec!["id", "value", "modified", "inserted_by"]); + + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); } #[tokio::test] @@ -1488,6 +1600,10 @@ mod tests { .unwrap(); assert_eq!(table.version(), 0); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert!(write_metrics.num_partitions > 0); + assert_common_write_metrics(write_metrics); + let mut new_schema_builder = arrow_schema::SchemaBuilder::new(); for field in batch.schema().fields() { if field.name() != "modified" { @@ -1536,6 +1652,10 @@ mod tests { assert_eq!(names, vec!["id", "inserted_by", "modified", "value"]); let part_cols = table.metadata().unwrap().partition_columns.clone(); assert_eq!(part_cols, vec!["id", "value"]); // we want to preserve partitions + + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert!(write_metrics.num_partitions > 0); + assert_common_write_metrics(write_metrics); } #[tokio::test] @@ -1547,7 +1667,8 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); - + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); let mut new_schema_builder = arrow_schema::SchemaBuilder::new(); for field in batch.schema().fields() { if field.name() != "modified" { @@ -1600,6 +1721,8 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); let mut new_schema_builder = arrow_schema::SchemaBuilder::new(); @@ -1655,6 +1778,8 @@ mod tests { let table = DeltaOps(table).write(vec![batch.clone()]).await.unwrap(); assert_eq!(table.version(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); let schema: StructType = serde_json::from_value(json!({ "type": "struct", @@ -1676,7 +1801,7 @@ mod tests { assert_eq!(table.version(), 0); let table = DeltaOps(table).write(vec![batch.clone()]).await; - assert!(table.is_err()) + assert!(table.is_err()); } #[tokio::test] @@ -1697,6 +1822,8 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); let actual = get_data(&table).await; let expected = DataType::Struct(Fields::from(vec![Field::new( @@ -1735,6 +1862,8 @@ mod tests { .with_partition_columns(["string"]) .await .unwrap(); + let write_metrics: WriteMetrics = get_write_metrics(_table.clone()).await; + assert_common_write_metrics(write_metrics); let table = crate::open_table(tmp_path.as_os_str().to_str().unwrap()) .await @@ -1778,6 +1907,9 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 4); + assert_common_write_metrics(write_metrics); let batch_add = RecordBatch::try_new( Arc::clone(&schema), @@ -1796,6 +1928,9 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 1); + assert_common_write_metrics(write_metrics); let expected = [ "+----+-------+------------+", @@ -1834,6 +1969,8 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); // Take clones of these before an operation resulting in error, otherwise it will // be impossible to refer to an in-memory table @@ -1876,6 +2013,8 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_common_write_metrics(write_metrics); let batch_add = RecordBatch::try_new( Arc::clone(&schema), @@ -1898,6 +2037,9 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 3); + assert_common_write_metrics(write_metrics); let expected = [ "+----+-------+------------+", @@ -1959,6 +2101,9 @@ mod tests { .await .expect("Failed to write first batch"); assert_eq!(table.version(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 3); + assert_common_write_metrics(write_metrics); let table = DeltaOps(table) .write([second_batch]) @@ -1966,6 +2111,10 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 2); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 1); + assert!(write_metrics.num_removed_files > 0); + assert_common_write_metrics(write_metrics); let snapshot_bytes = table .log_store @@ -2025,6 +2174,10 @@ mod tests { .await .expect("Failed to write first batch"); assert_eq!(table.version(), 1); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 3); + assert!(write_metrics.num_partitions > 0); + assert_common_write_metrics(write_metrics); let table = DeltaOps(table) .write([second_batch]) @@ -2033,6 +2186,12 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 2); + let write_metrics: WriteMetrics = get_write_metrics(table.clone()).await; + assert_eq!(write_metrics.num_added_rows, 1); + assert!(write_metrics.num_partitions > 0); + assert!(write_metrics.num_removed_files > 0); + assert_common_write_metrics(write_metrics); + let snapshot_bytes = table .log_store .read_commit_entry(2)