From 5be8e0e9558e3c70cdde3382272ed420684cca2a Mon Sep 17 00:00:00 2001 From: Emma Forman Ling Date: Thu, 12 Sep 2024 16:30:09 -0700 Subject: [PATCH] Add ComponentPath to FunctionUsageStats (#29839) This PR supports breaking usage down by component path. This makes it possible to see within a transaction how much bandwidth was used in each component, for example. There are still some TODOs mostly around snapshot import, export and file storage. Notable changes: - adding `ComponentRegistry` to `FinalTransaction` - some new helpers to get `ComponentPath` given document id - pass around `BTreeMap` from `Snapshot` to avoid needing to create a transaction just to convert `ComponentId` to `ComponentPath` for snapshot export GitOrigin-RevId: 86e4f205081331ae1ddae5a6192df558bb7c4691 --- crates/application/src/export_worker.rs | 48 ++++++--- crates/application/src/snapshot_import.rs | 3 +- .../common/src/components/component_path.rs | 2 +- .../src/bootstrap_model/user_facing.rs | 8 ++ crates/database/src/committer.rs | 34 ++++++- crates/database/src/component_registry.rs | 22 +++++ crates/database/src/database.rs | 7 +- crates/database/src/query/index_range.rs | 3 + crates/database/src/reads.rs | 3 + crates/database/src/snapshot_manager.rs | 10 ++ crates/database/src/transaction.rs | 33 +++++++ crates/events/src/usage.rs | 1 + crates/file_storage/src/core.rs | 17 +++- .../isolate/src/environment/action/storage.rs | 9 +- crates/pb/protos/usage.proto | 3 +- crates/usage_tracking/src/lib.rs | 99 +++++++++++-------- crates/value/src/table_mapping.rs | 7 +- 17 files changed, 243 insertions(+), 66 deletions(-) diff --git a/crates/application/src/export_worker.rs b/crates/application/src/export_worker.rs index b24de1b2..46dd8f4c 100644 --- a/crates/application/src/export_worker.rs +++ b/crates/application/src/export_worker.rs @@ -27,6 +27,7 @@ use common::{ components::{ ComponentId, ComponentName, + ComponentPath, }, document::{ ParsedDocument, @@ -289,7 +290,7 @@ impl ExportWorker { ) -> anyhow::Result<(Timestamp, ObjectKey, FunctionUsageTracker)> { tracing::info!("Beginning snapshot export..."); let storage = &self.storage; - let (ts, tables, by_id_indexes, system_tables, component_tree) = { + let (ts, tables, component_ids_to_paths, by_id_indexes, system_tables, component_tree) = { let mut tx = self.database.begin(Identity::system()).await?; let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?; let component_tree = ComponentTree::new(&mut tx, component).await?; @@ -309,6 +310,7 @@ impl ExportWorker { ) }) .collect(); + let component_ids_to_paths = snapshot.component_ids_to_paths(); let system_tables = snapshot .table_registry .iter_active_system_tables() @@ -317,6 +319,7 @@ impl ExportWorker { ( tx.begin_timestamp(), tables, + component_ids_to_paths, by_id_indexes, system_tables, component_tree, @@ -335,6 +338,7 @@ impl ExportWorker { writer, component_tree, tables.clone(), + &component_ids_to_paths, ts, by_id_indexes, system_tables, @@ -355,6 +359,7 @@ impl ExportWorker { component_tree: &'a ComponentTree, zip_snapshot_upload: &'a mut ZipSnapshotUpload<'b>, tables: &'a mut BTreeMap, + component_ids_to_paths: &BTreeMap, snapshot_ts: RepeatableTimestamp, by_id_indexes: &BTreeMap, system_tables: &BTreeMap<(TableNamespace, TableName), TabletId>, @@ -362,6 +367,10 @@ impl ExportWorker { usage: FunctionUsageTracker, ) -> anyhow::Result<()> { let namespace: TableNamespace = component_tree.id.into(); + let component_path = component_ids_to_paths + .get(&component_tree.id) + .cloned() + .unwrap_or_default(); let tablet_ids: BTreeSet<_> = tables .iter() .filter(|(_, (ns, ..))| *ns == namespace) @@ -468,6 +477,7 @@ impl ExportWorker { .transpose()?; usage .track_storage_call( + component_path.clone(), "snapshot_export", file_storage_entry.storage_id.clone(), content_type, @@ -507,7 +517,12 @@ impl ExportWorker { // Write documents from stream to table uploads while let Some((doc, _ts)) = stream.try_next().await? { - usage.track_database_egress_size(table_name.to_string(), doc.size() as u64, false); + usage.track_database_egress_size( + component_path.clone(), + table_name.to_string(), + doc.size() as u64, + false, + ); table_upload.write(doc).await?; } table_upload.complete().await?; @@ -525,6 +540,7 @@ impl ExportWorker { child, zip_snapshot_upload, tables, + component_ids_to_paths, snapshot_ts, by_id_indexes, system_tables, @@ -542,6 +558,7 @@ impl ExportWorker { mut writer: ChannelWriter, component_tree: ComponentTree, mut tables: BTreeMap, + component_ids_to_paths: &BTreeMap, snapshot_ts: RepeatableTimestamp, by_id_indexes: BTreeMap, system_tables: BTreeMap<(TableNamespace, TableName), TabletId>, @@ -555,6 +572,7 @@ impl ExportWorker { &component_tree, &mut zip_snapshot_upload, &mut tables, + component_ids_to_paths, snapshot_ts, &by_id_indexes, &system_tables, @@ -760,7 +778,10 @@ mod tests { use anyhow::Context; use bytes::Bytes; use common::{ - components::ComponentId, + components::{ + ComponentId, + ComponentPath, + }, document::ParsedDocument, types::{ ConvexOrigin, @@ -929,9 +950,10 @@ mod tests { assert_eq!(zip_entries, expected_export_entries); let usage = usage.gather_user_stats(); - assert!(usage.database_egress_size["table_0"] > 0); - assert!(usage.database_egress_size["table_1"] > 0); - assert!(usage.database_egress_size["table_2"] > 0); + let component_path = ComponentPath::test_user(); + assert!(usage.database_egress_size[&(component_path.clone(), "table_0".to_string())] > 0); + assert!(usage.database_egress_size[&(component_path.clone(), "table_1".to_string())] > 0); + assert!(usage.database_egress_size[&(component_path, "table_2".to_string())] > 0); Ok(()) } @@ -1000,8 +1022,9 @@ mod tests { expected_export_entries.insert("README.md".to_string(), README_MD_CONTENTS.to_string()); let mut tx = db.begin(Identity::system()).await?; - let (_, child_component) = BootstrapComponentsModel::new(&mut tx) - .must_component_path_to_ids(&"component".parse()?)?; + let component_path = "component".parse()?; + let (_, child_component) = + BootstrapComponentsModel::new(&mut tx).must_component_path_to_ids(&component_path)?; for (path_prefix, component) in [ ("", ComponentId::Root), @@ -1041,7 +1064,7 @@ mod tests { assert_eq!(zip_entries, expected_export_entries); let usage = usage.gather_user_stats(); - assert!(usage.database_egress_size["messages"] > 0); + assert!(usage.database_egress_size[&(component_path, "messages".to_string())] > 0); Ok(()) } @@ -1062,8 +1085,9 @@ mod tests { expected_export_entries.insert("README.md".to_string(), README_MD_CONTENTS.to_string()); let mut tx = db.begin(Identity::system()).await?; - let (_, child_component) = BootstrapComponentsModel::new(&mut tx) - .must_component_path_to_ids(&"component".parse()?)?; + let component_path = "component".parse()?; + let (_, child_component) = + BootstrapComponentsModel::new(&mut tx).must_component_path_to_ids(&component_path)?; // Data in root component doesn't matter. write_test_data_in_component(&db, ComponentId::Root, "", &mut BTreeMap::new()).await?; @@ -1100,7 +1124,7 @@ mod tests { assert_eq!(zip_entries, expected_export_entries); let usage = usage.gather_user_stats(); - assert!(usage.database_egress_size["messages"] > 0); + assert!(usage.database_egress_size[&(component_path, "messages".to_string())] > 0); Ok(()) } diff --git a/crates/application/src/snapshot_import.rs b/crates/application/src/snapshot_import.rs index 0ec9773b..402e7445 100644 --- a/crates/application/src/snapshot_import.rs +++ b/crates/application/src/snapshot_import.rs @@ -2090,6 +2090,7 @@ async fn import_storage_table( .transpose()?; usage .track_storage_call( + component_path.clone(), "snapshot_import", entry.storage_id, content_type, @@ -3516,7 +3517,7 @@ a .await?; let stats = usage.gather_user_stats(); - assert!(stats.database_ingress_size[&table_name.to_string()] > 0); + assert!(stats.database_ingress_size[&(component_path, table_name.to_string())] > 0); assert_eq!(stats.storage_ingress_size, 9); Ok(()) diff --git a/crates/common/src/components/component_path.rs b/crates/common/src/components/component_path.rs index 58275fac..44ca3d59 100644 --- a/crates/common/src/components/component_path.rs +++ b/crates/common/src/components/component_path.rs @@ -72,7 +72,7 @@ impl HeapSize for ComponentName { // path can potentially change when the component tree changes during a push, so // we should resolve this path to a `ComponentId` within a transaction // as soon as possible. -#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Default)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub struct ComponentPath { path: WithHeapSize>, diff --git a/crates/database/src/bootstrap_model/user_facing.rs b/crates/database/src/bootstrap_model/user_facing.rs index 54349710..b0753bb2 100644 --- a/crates/database/src/bootstrap_model/user_facing.rs +++ b/crates/database/src/bootstrap_model/user_facing.rs @@ -134,7 +134,11 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> { .get(self.namespace, id, version) .await; if let Ok(Some((document, _))) = &result { + let component_path = self + .tx + .must_component_path(ComponentId::from(self.namespace))?; self.tx.reads.record_read_document( + component_path, table_name, document.size(), &self.tx.usage_tracker, @@ -337,7 +341,11 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> { .tx .virtual_system_mapping() .is_virtual_table(table_name); + let component_path = self + .tx + .must_component_path(ComponentId::from(self.namespace))?; self.tx.reads.record_read_document( + component_path, table_name.clone(), document.size(), &self.tx.usage_tracker, diff --git a/crates/database/src/committer.rs b/crates/database/src/committer.rs index 816002a9..51497f54 100644 --- a/crates/database/src/committer.rs +++ b/crates/database/src/committer.rs @@ -15,6 +15,10 @@ use common::{ TableState, TABLES_TABLE, }, + components::{ + ComponentId, + ComponentPath, + }, document::{ DocumentUpdate, ParsedDocument, @@ -118,7 +122,9 @@ use crate::{ WriteSource, }, writes::DocumentWrite, + ComponentRegistry, Transaction, + TransactionReadSet, }; enum PersistenceWrite { @@ -697,6 +703,7 @@ impl Committer { metrics::log_write_tx(&transaction); let table_mapping = transaction.table_mapping.clone(); + let component_registry = transaction.component_registry.clone(); let usage_tracking = transaction.usage_tracker.clone(); let ValidatedCommit { index_writes, @@ -720,6 +727,7 @@ impl Committer { &index_writes, &document_writes, &table_mapping, + &component_registry, ); Self::write_to_persistence(persistence, index_writes, document_writes).await?; Ok(PersistenceWrite::Commit { @@ -743,13 +751,24 @@ impl Committer { index_writes: &BTreeSet<(Timestamp, DatabaseIndexUpdate)>, document_writes: &Vec, table_mapping: &TableMapping, + component_registry: &ComponentRegistry, ) { for (_, index_write) in index_writes { if let DatabaseIndexValue::NonClustered(doc) = index_write.value { - if let Ok(table_name) = table_mapping.tablet_name(doc.tablet_id) { + let tablet_id = doc.tablet_id; + let Ok(table_namespace) = table_mapping.tablet_namespace(tablet_id) else { + continue; + }; + let component_id = ComponentId::from(table_namespace); + let component_path = component_registry + .get_component_path(component_id, &mut TransactionReadSet::new()) + // It's possible that the component gets deleted in this transaction. In that case, miscount the usage as root. + .unwrap_or(ComponentPath::root()); + if let Ok(table_name) = table_mapping.tablet_name(tablet_id) { // Index metadata is never a vector // Database bandwidth for index writes usage_tracker.track_database_ingress_size( + component_path, table_name.to_string(), index_write.key.size() as u64, // Exclude indexes on system tables or reserved system indexes on user @@ -768,16 +787,27 @@ impl Committer { } = validated_write; if let Some(document) = document { let document_write_size = document_id.size() + document.size(); - if let Ok(table_name) = table_mapping.tablet_name(document.id().tablet_id) { + let tablet_id = document.id().tablet_id; + let Ok(table_namespace) = table_mapping.tablet_namespace(tablet_id) else { + continue; + }; + let component_id = ComponentId::from(table_namespace); + let component_path = component_registry + .get_component_path(component_id, &mut TransactionReadSet::new()) + // It's possible that the component gets deleted in this transaction. In that case, miscount the usage as root. + .unwrap_or(ComponentPath::root()); + if let Ok(table_name) = table_mapping.tablet_name(tablet_id) { // Database bandwidth for document writes if *doc_in_vector_index == DocInVectorIndex::Absent { usage_tracker.track_database_ingress_size( + component_path, table_name.to_string(), document_write_size as u64, table_name.is_system(), ); } else { usage_tracker.track_vector_ingress_size( + component_path, table_name.to_string(), document_write_size as u64, table_name.is_system(), diff --git a/crates/database/src/component_registry.rs b/crates/database/src/component_registry.rs index 193a7d13..1bb32c04 100644 --- a/crates/database/src/component_registry.rs +++ b/crates/database/src/component_registry.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; +use anyhow::Context; use common::{ bootstrap_model::{ components::{ @@ -136,6 +137,27 @@ impl ComponentRegistry { Some(ComponentPath::from(path)) } + pub fn must_component_path( + &self, + component_id: ComponentId, + reads: &mut TransactionReadSet, + ) -> anyhow::Result { + self.get_component_path(component_id, reads) + .with_context(|| format!("Component {component_id:?} not found")) + } + + pub fn component_path_from_document_id( + &self, + table_mapping: &TableMapping, + id: ResolvedDocumentId, + reads: &mut TransactionReadSet, + ) -> anyhow::Result> { + let tablet_id = id.tablet_id; + let table_namespace = table_mapping.tablet_namespace(tablet_id)?; + let component_id = ComponentId::from(table_namespace); + Ok(self.get_component_path(component_id, reads)) + } + pub fn all_component_paths( &self, reads: &mut TransactionReadSet, diff --git a/crates/database/src/database.rs b/crates/database/src/database.rs index 8234e0c7..3eafabe6 100644 --- a/crates/database/src/database.rs +++ b/crates/database/src/database.rs @@ -2062,9 +2062,10 @@ impl Database { let timer = metrics::vector::vector_search_timer(); let usage = FunctionUsageTracker::new(); let snapshot = self.snapshot(ts)?; + let component_id = query.component_id; let table_mapping = snapshot .table_mapping() - .namespace(TableNamespace::from(query.component_id)); + .namespace(TableNamespace::from(component_id)); if !table_mapping.name_exists(query.index_name.table()) { return Ok((vec![], usage.gather_user_stats())); } @@ -2091,7 +2092,11 @@ impl Database { .map(|r| r.to_public(table_number)) .collect(); let size: u64 = results.iter().map(|row| row.size() as u64).sum(); + let component_path = snapshot + .component_registry + .must_component_path(component_id, &mut TransactionReadSet::new())?; usage.track_vector_egress_size( + component_path, table_mapping.tablet_name(*index_name.table())?.to_string(), size, // We don't have system owned vector indexes. diff --git a/crates/database/src/query/index_range.rs b/crates/database/src/query/index_range.rs index 7a2d2c46..9cae0aff 100644 --- a/crates/database/src/query/index_range.rs +++ b/crates/database/src/query/index_range.rs @@ -6,6 +6,7 @@ use std::{ use async_trait::async_trait; use common::{ bootstrap_model::index::database_index::IndexedFields, + components::ComponentId, document::DeveloperDocument, index::IndexKeyBytes, interval::Interval, @@ -212,7 +213,9 @@ impl IndexRange { .record_read_document(&v, self.printable_index_name.table())?; // Database bandwidth for index reads + let component_path = tx.must_component_path(ComponentId::from(self.namespace))?; tx.usage_tracker.track_database_egress_size( + component_path, self.printable_index_name.table().to_string(), index_bytes as u64, self.printable_index_name.is_system_owned(), diff --git a/crates/database/src/reads.rs b/crates/database/src/reads.rs index 0fdf6be8..691fb486 100644 --- a/crates/database/src/reads.rs +++ b/crates/database/src/reads.rs @@ -7,6 +7,7 @@ use std::{ use cmd_util::env::env_config; use common::{ bootstrap_model::index::database_index::IndexedFields, + components::ComponentPath, document::PackedDocument, interval::{ Interval, @@ -336,6 +337,7 @@ impl TransactionReadSet { pub fn record_read_document( &mut self, + component_path: ComponentPath, table_name: TableName, document_size: usize, usage_tracker: &FunctionUsageTracker, @@ -344,6 +346,7 @@ impl TransactionReadSet { // Database bandwidth for document reads let is_system_table = table_name.is_system() && !is_virtual_table; usage_tracker.track_database_egress_size( + component_path, table_name.to_string(), document_size as u64, is_system_table, diff --git a/crates/database/src/snapshot_manager.rs b/crates/database/src/snapshot_manager.rs index 697e13a5..ababc97f 100644 --- a/crates/database/src/snapshot_manager.rs +++ b/crates/database/src/snapshot_manager.rs @@ -10,6 +10,10 @@ use anyhow::Context; use async_trait::async_trait; use common::{ bootstrap_model::tables::TableState, + components::{ + ComponentId, + ComponentPath, + }, document::{ DocumentUpdate, ResolvedDocument, @@ -53,6 +57,7 @@ use crate::{ ComponentRegistry, TableRegistry, TableSummary, + TransactionReadSet, }; const MAX_TRANSACTION_WINDOW: Duration = Duration::from_secs(10); @@ -349,6 +354,11 @@ impl Snapshot { Ok(document_storage_by_table) } + + pub fn component_ids_to_paths(&self) -> BTreeMap { + self.component_registry + .all_component_paths(&mut TransactionReadSet::new()) + } } impl SnapshotManager { diff --git a/crates/database/src/transaction.rs b/crates/database/src/transaction.rs index 9299e1d8..c80e30e7 100644 --- a/crates/database/src/transaction.rs +++ b/crates/database/src/transaction.rs @@ -703,6 +703,28 @@ impl Transaction { .get_component_path(component_id, &mut self.reads) } + pub fn must_component_path( + &mut self, + component_id: ComponentId, + ) -> anyhow::Result { + self.component_registry + .must_component_path(component_id, &mut self.reads) + } + + /// Get the component path for a document ID. This might be None when table + /// namespaces for new components are created in `start_push`, but + /// components have not yet been created. + pub fn component_path_for_document_id( + &mut self, + id: ResolvedDocumentId, + ) -> anyhow::Result> { + self.component_registry.component_path_from_document_id( + self.metadata.table_mapping(), + id, + &mut self.reads, + ) + } + // XXX move to table model? #[cfg(any(test, feature = "testing"))] pub async fn create_system_table_testing( @@ -859,7 +881,11 @@ impl Transaction { let result = match range_results.into_iter().next() { Some((_, doc, timestamp)) => { let is_virtual_table = self.virtual_system_mapping().is_virtual_table(&table_name); + let component_path = self + .component_path_for_document_id(doc.id())? + .unwrap_or_default(); self.reads.record_read_document( + component_path, table_name, doc.size(), &self.usage_tracker, @@ -993,7 +1019,11 @@ impl Transaction { table_name: &TableName, ) -> anyhow::Result<()> { let is_virtual_table = self.virtual_system_mapping().is_virtual_table(table_name); + let component_path = self + .component_path_for_document_id(document.id())? + .unwrap_or_default(); self.reads.record_read_document( + component_path, table_name.clone(), document.size(), &self.usage_tracker, @@ -1061,6 +1091,7 @@ pub struct IndexRangeRequest { pub struct FinalTransaction { pub(crate) begin_timestamp: RepeatableTimestamp, pub(crate) table_mapping: TableMapping, + pub(crate) component_registry: ComponentRegistry, pub(crate) reads: TransactionReadSet, pub(crate) writes: Writes, @@ -1082,6 +1113,7 @@ impl FinalTransaction { let begin_timestamp = transaction.begin_timestamp(); let table_mapping = transaction.table_mapping().clone(); + let component_registry = transaction.component_registry.deref().clone(); // Note that we do a best effort validation for memory index sizes. We // use the latest snapshot instead of the transaction base snapshot. This // is both more accurate and also avoids pedant hitting transient errors. @@ -1090,6 +1122,7 @@ impl FinalTransaction { Ok(Self { begin_timestamp, table_mapping, + component_registry, reads: transaction.reads, writes: transaction.writes.into_flat()?, usage_tracker: transaction.usage_tracker.clone(), diff --git a/crates/events/src/usage.rs b/crates/events/src/usage.rs index 3a731230..3f092845 100644 --- a/crates/events/src/usage.rs +++ b/crates/events/src/usage.rs @@ -67,6 +67,7 @@ pub enum UsageEvent { /// snapshot import/export) StorageCall { id: String, + component_path: Option, storage_id: String, call: String, content_type: Option, diff --git a/crates/file_storage/src/core.rs b/crates/file_storage/src/core.rs index fccc7038..bc480471 100644 --- a/crates/file_storage/src/core.rs +++ b/crates/file_storage/src/core.rs @@ -7,7 +7,10 @@ use std::{ use anyhow::Context; use bytes::Bytes; use common::{ - components::ComponentId, + components::{ + ComponentId, + ComponentPath, + }, runtime::{ Runtime, UnixTimestamp, @@ -240,8 +243,13 @@ impl TransactionalFileStorage { let stream = storage_get_stream.stream; let content_length = ContentLength(storage_get_stream.content_length as u64); - let call_tracker = - usage_tracker.track_storage_call("get range", storage_id, content_type.clone(), sha256); + let call_tracker = usage_tracker.track_storage_call( + ComponentPath::TODO(), + "get range", + storage_id, + content_type.clone(), + sha256, + ); Ok(FileRangeStream { content_length, @@ -415,12 +423,13 @@ impl FileStorage { .transactional_file_storage .store_file_entry(&mut tx, namespace, entry) .await?; + let component_path = tx.must_component_path(ComponentId::from(namespace))?; self.database .commit_with_write_source(tx, "file_storage_store_file") .await?; usage_tracker - .track_storage_call("store", storage_id, content_type, sha256) + .track_storage_call(component_path, "store", storage_id, content_type, sha256) .track_storage_ingress_size(size as u64); Ok(virtual_id) } diff --git a/crates/isolate/src/environment/action/storage.rs b/crates/isolate/src/environment/action/storage.rs index 2adabd2b..8508906a 100644 --- a/crates/isolate/src/environment/action/storage.rs +++ b/crates/isolate/src/environment/action/storage.rs @@ -2,6 +2,7 @@ use std::str::FromStr; use anyhow::Context; use common::{ + components::ComponentPath, runtime::Runtime, sha256::{ DigestHeader, @@ -80,7 +81,13 @@ impl TaskExecutor { .await?; self.usage_tracker - .track_storage_call("store", storage_id, content_type, sha256) + .track_storage_call( + ComponentPath::TODO(), + "store", + storage_id, + content_type, + sha256, + ) .track_storage_ingress_size(size as u64); Ok(storage_doc_id) diff --git a/crates/pb/protos/usage.proto b/crates/pb/protos/usage.proto index d0714289..5762d1ea 100644 --- a/crates/pb/protos/usage.proto +++ b/crates/pb/protos/usage.proto @@ -13,6 +13,7 @@ message FunctionUsageStats { } message CounterWithTag { - optional string name = 1; + optional string component_path = 3; + optional string table_name = 1; optional uint64 count = 2; } diff --git a/crates/usage_tracking/src/lib.rs b/crates/usage_tracking/src/lib.rs index d417506f..614ad6bc 100644 --- a/crates/usage_tracking/src/lib.rs +++ b/crates/usage_tracking/src/lib.rs @@ -10,6 +10,7 @@ use std::{ use anyhow::Context; use common::{ + components::ComponentPath, execution_context::ExecutionId, types::{ ModuleEnvironment, @@ -179,10 +180,10 @@ impl UsageCounter { ) { // Merge the storage stats. let (component_path, udf_id) = udf_path.clone().into_component_and_udf_path(); - for (storage_api, function_count) in stats.storage_calls { + for ((component_path, storage_api), function_count) in stats.storage_calls { usage_metrics.push(UsageEvent::FunctionStorageCalls { id: execution_id.to_string(), - component_path: component_path.clone(), + component_path: component_path.serialize(), udf_id: udf_id.clone(), call: storage_api, count: function_count, @@ -196,40 +197,40 @@ impl UsageCounter { egress: stats.storage_egress_size, }); // Merge "by table" bandwidth stats. - for (table_name, ingress_size) in stats.database_ingress_size { + for ((component_path, table_name), ingress_size) in stats.database_ingress_size { usage_metrics.push(UsageEvent::DatabaseBandwidth { id: execution_id.to_string(), - component_path: component_path.clone(), + component_path: component_path.serialize(), udf_id: udf_id.clone(), table_name, ingress: ingress_size, egress: 0, }); } - for (table_name, egress_size) in stats.database_egress_size { + for ((component_path, table_name), egress_size) in stats.database_egress_size { usage_metrics.push(UsageEvent::DatabaseBandwidth { id: execution_id.to_string(), - component_path: component_path.clone(), + component_path: component_path.serialize(), udf_id: udf_id.clone(), table_name, ingress: 0, egress: egress_size, }); } - for (table_name, ingress_size) in stats.vector_ingress_size { + for ((component_path, table_name), ingress_size) in stats.vector_ingress_size { usage_metrics.push(UsageEvent::VectorBandwidth { id: execution_id.to_string(), - component_path: component_path.clone(), + component_path: component_path.serialize(), udf_id: udf_id.clone(), table_name, ingress: ingress_size, egress: 0, }); } - for (table_name, egress_size) in stats.vector_egress_size { + for ((component_path, table_name), egress_size) in stats.vector_egress_size { usage_metrics.push(UsageEvent::VectorBandwidth { id: execution_id.to_string(), - component_path: component_path.clone(), + component_path: component_path.serialize(), udf_id: udf_id.clone(), table_name, ingress: 0, @@ -245,6 +246,7 @@ impl UsageCounter { pub trait StorageUsageTracker: Send + Sync { fn track_storage_call( &self, + component_path: ComponentPath, storage_api: &'static str, storage_id: StorageUuid, content_type: Option, @@ -294,6 +296,7 @@ impl StorageCallTracker for IndependentStorageCallTracker { impl StorageUsageTracker for UsageCounter { fn track_storage_call( &self, + component_path: ComponentPath, storage_api: &'static str, storage_id: StorageUuid, content_type: Option, @@ -303,6 +306,7 @@ impl StorageUsageTracker for UsageCounter { metrics::storage::log_storage_call(); self.usage_logger.record(vec![UsageEvent::StorageCall { id: execution_id.to_string(), + component_path: component_path.serialize(), // Ideally we would track the Id<_storage> instead of the StorageUuid // but it's a bit annoying for now, so just going with this. storage_id: storage_id.to_string(), @@ -361,6 +365,7 @@ impl FunctionUsageTracker { // calling this method. pub fn track_database_ingress_size( &self, + component_path: ComponentPath, table_name: String, ingress_size: u64, skip_logging: bool, @@ -372,11 +377,12 @@ impl FunctionUsageTracker { let mut state = self.state.lock(); state .database_ingress_size - .mutate_entry_or_default(table_name.clone(), |count| *count += ingress_size); + .mutate_entry_or_default((component_path, table_name), |count| *count += ingress_size); } pub fn track_database_egress_size( &self, + component_path: ComponentPath, table_name: String, egress_size: u64, skip_logging: bool, @@ -388,7 +394,7 @@ impl FunctionUsageTracker { let mut state = self.state.lock(); state .database_egress_size - .mutate_entry_or_default(table_name.clone(), |count| *count += egress_size); + .mutate_entry_or_default((component_path, table_name), |count| *count += egress_size); } // Tracks the vector ingress surcharge and database usage for documents @@ -405,6 +411,7 @@ impl FunctionUsageTracker { // have at least one vector that's actually used in the index. pub fn track_vector_ingress_size( &self, + component_path: ComponentPath, table_name: String, ingress_size: u64, skip_logging: bool, @@ -416,14 +423,15 @@ impl FunctionUsageTracker { // Note that vector search counts as both database and vector bandwidth // per the comment above. let mut state = self.state.lock(); + let key = (component_path, table_name); state .database_ingress_size - .mutate_entry_or_default(table_name.clone(), |count| { + .mutate_entry_or_default(key.clone(), |count| { *count += ingress_size; }); state .vector_ingress_size - .mutate_entry_or_default(table_name.clone(), |count| { + .mutate_entry_or_default(key, |count| { *count += ingress_size; }); } @@ -443,6 +451,7 @@ impl FunctionUsageTracker { // impact a vector index. pub fn track_vector_egress_size( &self, + component_path: ComponentPath, table_name: String, egress_size: u64, skip_logging: bool, @@ -454,12 +463,13 @@ impl FunctionUsageTracker { // Note that vector search counts as both database and vector bandwidth // per the comment above. let mut state = self.state.lock(); + let key = (component_path, table_name); state .database_egress_size - .mutate_entry_or_default(table_name.clone(), |count| *count += egress_size); + .mutate_entry_or_default(key.clone(), |count| *count += egress_size); state .vector_egress_size - .mutate_entry_or_default(table_name.clone(), |count| *count += egress_size); + .mutate_entry_or_default(key, |count| *count += egress_size); } } @@ -483,6 +493,7 @@ impl StorageCallTracker for FunctionUsageTracker { impl StorageUsageTracker for FunctionUsageTracker { fn track_storage_call( &self, + component_path: ComponentPath, storage_api: &'static str, _storage_id: StorageUuid, _content_type: Option, @@ -492,7 +503,9 @@ impl StorageUsageTracker for FunctionUsageTracker { metrics::storage::log_storage_call(); state .storage_calls - .mutate_entry_or_default(storage_api.to_string(), |count| *count += 1); + .mutate_entry_or_default((component_path, storage_api.to_string()), |count| { + *count += 1 + }); Box::new(self.clone()) } } @@ -504,13 +517,13 @@ type StorageAPI = String; #[derive(Debug, Clone, PartialEq, Eq, Default)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub struct FunctionUsageStats { - pub storage_calls: WithHeapSize>, + pub storage_calls: WithHeapSize>, pub storage_ingress_size: u64, pub storage_egress_size: u64, - pub database_ingress_size: WithHeapSize>, - pub database_egress_size: WithHeapSize>, - pub vector_ingress_size: WithHeapSize>, - pub vector_egress_size: WithHeapSize>, + pub database_ingress_size: WithHeapSize>, + pub database_egress_size: WithHeapSize>, + pub vector_ingress_size: WithHeapSize>, + pub vector_egress_size: WithHeapSize>, } impl FunctionUsageStats { @@ -527,51 +540,57 @@ impl FunctionUsageStats { fn merge(&mut self, other: Self) { // Merge the storage stats. - for (storage_api, function_count) in other.storage_calls { + for (key, function_count) in other.storage_calls { self.storage_calls - .mutate_entry_or_default(storage_api, |count| *count += function_count); + .mutate_entry_or_default(key, |count| *count += function_count); } self.storage_ingress_size += other.storage_ingress_size; self.storage_egress_size += other.storage_egress_size; // Merge "by table" bandwidth other. - for (table_name, ingress_size) in other.database_ingress_size { + for (key, ingress_size) in other.database_ingress_size { self.database_ingress_size - .mutate_entry_or_default(table_name.clone(), |count| *count += ingress_size); + .mutate_entry_or_default(key.clone(), |count| *count += ingress_size); } - for (table_name, egress_size) in other.database_egress_size { + for (key, egress_size) in other.database_egress_size { self.database_egress_size - .mutate_entry_or_default(table_name.clone(), |count| *count += egress_size); + .mutate_entry_or_default(key.clone(), |count| *count += egress_size); } - for (table_name, ingress_size) in other.vector_ingress_size { + for (key, ingress_size) in other.vector_ingress_size { self.vector_ingress_size - .mutate_entry_or_default(table_name.clone(), |count| *count += ingress_size); + .mutate_entry_or_default(key.clone(), |count| *count += ingress_size); } - for (table_name, egress_size) in other.vector_egress_size { + for (key, egress_size) in other.vector_egress_size { self.vector_egress_size - .mutate_entry_or_default(table_name.clone(), |count| *count += egress_size); + .mutate_entry_or_default(key.clone(), |count| *count += egress_size); } } } -fn to_by_tag_count(counts: impl Iterator) -> Vec { +fn to_by_tag_count( + counts: impl Iterator, +) -> Vec { counts - .map(|(tag, count)| CounterWithTagProto { - name: Some(tag), - count: Some(count), - }) + .map( + |((component_path, table_name), count)| CounterWithTagProto { + component_path: component_path.serialize(), + table_name: Some(table_name), + count: Some(count), + }, + ) .collect() } fn from_by_tag_count( counts: Vec, -) -> anyhow::Result> { +) -> anyhow::Result> { let counts: Vec<_> = counts .into_iter() .map(|c| -> anyhow::Result<_> { - let name = c.name.context("Missing `tag` field")?; + let component_path = ComponentPath::deserialize(c.component_path.as_deref())?; + let name = c.table_name.context("Missing `tag` field")?; let count = c.count.context("Missing `count` field")?; - Ok((name, count)) + Ok(((component_path, name), count)) }) .try_collect()?; Ok(counts.into_iter()) diff --git a/crates/value/src/table_mapping.rs b/crates/value/src/table_mapping.rs index 21178e4b..8ed045b2 100644 --- a/crates/value/src/table_mapping.rs +++ b/crates/value/src/table_mapping.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; +use anyhow::Context; use errors::ErrorMetadata; use imbl::OrdMap; use serde::Serialize; @@ -204,21 +205,21 @@ impl TableMapping { self.tablet_to_table .get(&id) .map(|(_, _, name)| name.clone()) - .ok_or_else(|| anyhow::anyhow!("cannot find table {id:?}")) + .with_context(|| format!("cannot find table {id:?}")) } pub fn tablet_number(&self, id: TabletId) -> anyhow::Result { self.tablet_to_table .get(&id) .map(|(_, number, ..)| *number) - .ok_or_else(|| anyhow::anyhow!("cannot find table {id:?}")) + .with_context(|| format!("cannot find table {id:?}")) } pub fn tablet_namespace(&self, id: TabletId) -> anyhow::Result { self.tablet_to_table .get(&id) .map(|(namespace, ..)| *namespace) - .ok_or_else(|| anyhow::anyhow!("cannot find table {id:?}")) + .with_context(|| format!("cannot find table {id:?}")) } pub fn tablet_to_name(&self) -> impl Fn(TabletId) -> anyhow::Result + '_ {