Skip to content

Commit

Permalink
Add ComponentPath to FunctionUsageStats (#29839)
Browse files Browse the repository at this point in the history
This PR supports breaking usage down by component path. This makes it possible to see within a transaction how much bandwidth was used in each component, for example. There are still some TODOs mostly around snapshot import, export and file storage.

Notable changes:
- adding `ComponentRegistry` to `FinalTransaction`
- some new helpers to get `ComponentPath` given document id
- pass around `BTreeMap<ComponentId, ComponentPath>` from `Snapshot` to avoid needing to create a transaction just to convert `ComponentId` to `ComponentPath` for snapshot export

GitOrigin-RevId: 86e4f205081331ae1ddae5a6192df558bb7c4691
  • Loading branch information
emmaling27 authored and Convex, Inc. committed Sep 12, 2024
1 parent daff7a8 commit 5be8e0e
Show file tree
Hide file tree
Showing 17 changed files with 243 additions and 66 deletions.
48 changes: 36 additions & 12 deletions crates/application/src/export_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use common::{
components::{
ComponentId,
ComponentName,
ComponentPath,
},
document::{
ParsedDocument,
Expand Down Expand Up @@ -289,7 +290,7 @@ impl<RT: Runtime> ExportWorker<RT> {
) -> anyhow::Result<(Timestamp, ObjectKey, FunctionUsageTracker)> {
tracing::info!("Beginning snapshot export...");
let storage = &self.storage;
let (ts, tables, by_id_indexes, system_tables, component_tree) = {
let (ts, tables, component_ids_to_paths, by_id_indexes, system_tables, component_tree) = {
let mut tx = self.database.begin(Identity::system()).await?;
let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?;
let component_tree = ComponentTree::new(&mut tx, component).await?;
Expand All @@ -309,6 +310,7 @@ impl<RT: Runtime> ExportWorker<RT> {
)
})
.collect();
let component_ids_to_paths = snapshot.component_ids_to_paths();
let system_tables = snapshot
.table_registry
.iter_active_system_tables()
Expand All @@ -317,6 +319,7 @@ impl<RT: Runtime> ExportWorker<RT> {
(
tx.begin_timestamp(),
tables,
component_ids_to_paths,
by_id_indexes,
system_tables,
component_tree,
Expand All @@ -335,6 +338,7 @@ impl<RT: Runtime> ExportWorker<RT> {
writer,
component_tree,
tables.clone(),
&component_ids_to_paths,
ts,
by_id_indexes,
system_tables,
Expand All @@ -355,13 +359,18 @@ impl<RT: Runtime> ExportWorker<RT> {
component_tree: &'a ComponentTree,
zip_snapshot_upload: &'a mut ZipSnapshotUpload<'b>,
tables: &'a mut BTreeMap<TabletId, (TableNamespace, TableNumber, TableName, TableSummary)>,
component_ids_to_paths: &BTreeMap<ComponentId, ComponentPath>,
snapshot_ts: RepeatableTimestamp,
by_id_indexes: &BTreeMap<TabletId, IndexId>,
system_tables: &BTreeMap<(TableNamespace, TableName), TabletId>,
include_storage: bool,
usage: FunctionUsageTracker,
) -> anyhow::Result<()> {
let namespace: TableNamespace = component_tree.id.into();
let component_path = component_ids_to_paths
.get(&component_tree.id)
.cloned()
.unwrap_or_default();
let tablet_ids: BTreeSet<_> = tables
.iter()
.filter(|(_, (ns, ..))| *ns == namespace)
Expand Down Expand Up @@ -468,6 +477,7 @@ impl<RT: Runtime> ExportWorker<RT> {
.transpose()?;
usage
.track_storage_call(
component_path.clone(),
"snapshot_export",
file_storage_entry.storage_id.clone(),
content_type,
Expand Down Expand Up @@ -507,7 +517,12 @@ impl<RT: Runtime> ExportWorker<RT> {

// Write documents from stream to table uploads
while let Some((doc, _ts)) = stream.try_next().await? {
usage.track_database_egress_size(table_name.to_string(), doc.size() as u64, false);
usage.track_database_egress_size(
component_path.clone(),
table_name.to_string(),
doc.size() as u64,
false,
);
table_upload.write(doc).await?;
}
table_upload.complete().await?;
Expand All @@ -525,6 +540,7 @@ impl<RT: Runtime> ExportWorker<RT> {
child,
zip_snapshot_upload,
tables,
component_ids_to_paths,
snapshot_ts,
by_id_indexes,
system_tables,
Expand All @@ -542,6 +558,7 @@ impl<RT: Runtime> ExportWorker<RT> {
mut writer: ChannelWriter,
component_tree: ComponentTree,
mut tables: BTreeMap<TabletId, (TableNamespace, TableNumber, TableName, TableSummary)>,
component_ids_to_paths: &BTreeMap<ComponentId, ComponentPath>,
snapshot_ts: RepeatableTimestamp,
by_id_indexes: BTreeMap<TabletId, IndexId>,
system_tables: BTreeMap<(TableNamespace, TableName), TabletId>,
Expand All @@ -555,6 +572,7 @@ impl<RT: Runtime> ExportWorker<RT> {
&component_tree,
&mut zip_snapshot_upload,
&mut tables,
component_ids_to_paths,
snapshot_ts,
&by_id_indexes,
&system_tables,
Expand Down Expand Up @@ -760,7 +778,10 @@ mod tests {
use anyhow::Context;
use bytes::Bytes;
use common::{
components::ComponentId,
components::{
ComponentId,
ComponentPath,
},
document::ParsedDocument,
types::{
ConvexOrigin,
Expand Down Expand Up @@ -929,9 +950,10 @@ mod tests {
assert_eq!(zip_entries, expected_export_entries);

let usage = usage.gather_user_stats();
assert!(usage.database_egress_size["table_0"] > 0);
assert!(usage.database_egress_size["table_1"] > 0);
assert!(usage.database_egress_size["table_2"] > 0);
let component_path = ComponentPath::test_user();
assert!(usage.database_egress_size[&(component_path.clone(), "table_0".to_string())] > 0);
assert!(usage.database_egress_size[&(component_path.clone(), "table_1".to_string())] > 0);
assert!(usage.database_egress_size[&(component_path, "table_2".to_string())] > 0);

Ok(())
}
Expand Down Expand Up @@ -1000,8 +1022,9 @@ mod tests {
expected_export_entries.insert("README.md".to_string(), README_MD_CONTENTS.to_string());

let mut tx = db.begin(Identity::system()).await?;
let (_, child_component) = BootstrapComponentsModel::new(&mut tx)
.must_component_path_to_ids(&"component".parse()?)?;
let component_path = "component".parse()?;
let (_, child_component) =
BootstrapComponentsModel::new(&mut tx).must_component_path_to_ids(&component_path)?;

for (path_prefix, component) in [
("", ComponentId::Root),
Expand Down Expand Up @@ -1041,7 +1064,7 @@ mod tests {
assert_eq!(zip_entries, expected_export_entries);

let usage = usage.gather_user_stats();
assert!(usage.database_egress_size["messages"] > 0);
assert!(usage.database_egress_size[&(component_path, "messages".to_string())] > 0);
Ok(())
}

Expand All @@ -1062,8 +1085,9 @@ mod tests {
expected_export_entries.insert("README.md".to_string(), README_MD_CONTENTS.to_string());

let mut tx = db.begin(Identity::system()).await?;
let (_, child_component) = BootstrapComponentsModel::new(&mut tx)
.must_component_path_to_ids(&"component".parse()?)?;
let component_path = "component".parse()?;
let (_, child_component) =
BootstrapComponentsModel::new(&mut tx).must_component_path_to_ids(&component_path)?;

// Data in root component doesn't matter.
write_test_data_in_component(&db, ComponentId::Root, "", &mut BTreeMap::new()).await?;
Expand Down Expand Up @@ -1100,7 +1124,7 @@ mod tests {
assert_eq!(zip_entries, expected_export_entries);

let usage = usage.gather_user_stats();
assert!(usage.database_egress_size["messages"] > 0);
assert!(usage.database_egress_size[&(component_path, "messages".to_string())] > 0);
Ok(())
}

Expand Down
3 changes: 2 additions & 1 deletion crates/application/src/snapshot_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2090,6 +2090,7 @@ async fn import_storage_table<RT: Runtime>(
.transpose()?;
usage
.track_storage_call(
component_path.clone(),
"snapshot_import",
entry.storage_id,
content_type,
Expand Down Expand Up @@ -3516,7 +3517,7 @@ a
.await?;

let stats = usage.gather_user_stats();
assert!(stats.database_ingress_size[&table_name.to_string()] > 0);
assert!(stats.database_ingress_size[&(component_path, table_name.to_string())] > 0);
assert_eq!(stats.storage_ingress_size, 9);

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion crates/common/src/components/component_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl HeapSize for ComponentName {
// path can potentially change when the component tree changes during a push, so
// we should resolve this path to a `ComponentId` within a transaction
// as soon as possible.
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
pub struct ComponentPath {
path: WithHeapSize<Vec<ComponentName>>,
Expand Down
8 changes: 8 additions & 0 deletions crates/database/src/bootstrap_model/user_facing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
.get(self.namespace, id, version)
.await;
if let Ok(Some((document, _))) = &result {
let component_path = self
.tx
.must_component_path(ComponentId::from(self.namespace))?;
self.tx.reads.record_read_document(
component_path,
table_name,
document.size(),
&self.tx.usage_tracker,
Expand Down Expand Up @@ -337,7 +341,11 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
.tx
.virtual_system_mapping()
.is_virtual_table(table_name);
let component_path = self
.tx
.must_component_path(ComponentId::from(self.namespace))?;
self.tx.reads.record_read_document(
component_path,
table_name.clone(),
document.size(),
&self.tx.usage_tracker,
Expand Down
34 changes: 32 additions & 2 deletions crates/database/src/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ use common::{
TableState,
TABLES_TABLE,
},
components::{
ComponentId,
ComponentPath,
},
document::{
DocumentUpdate,
ParsedDocument,
Expand Down Expand Up @@ -118,7 +122,9 @@ use crate::{
WriteSource,
},
writes::DocumentWrite,
ComponentRegistry,
Transaction,
TransactionReadSet,
};

enum PersistenceWrite {
Expand Down Expand Up @@ -697,6 +703,7 @@ impl<RT: Runtime> Committer<RT> {
metrics::log_write_tx(&transaction);

let table_mapping = transaction.table_mapping.clone();
let component_registry = transaction.component_registry.clone();
let usage_tracking = transaction.usage_tracker.clone();
let ValidatedCommit {
index_writes,
Expand All @@ -720,6 +727,7 @@ impl<RT: Runtime> Committer<RT> {
&index_writes,
&document_writes,
&table_mapping,
&component_registry,
);
Self::write_to_persistence(persistence, index_writes, document_writes).await?;
Ok(PersistenceWrite::Commit {
Expand All @@ -743,13 +751,24 @@ impl<RT: Runtime> Committer<RT> {
index_writes: &BTreeSet<(Timestamp, DatabaseIndexUpdate)>,
document_writes: &Vec<ValidatedDocumentWrite>,
table_mapping: &TableMapping,
component_registry: &ComponentRegistry,
) {
for (_, index_write) in index_writes {
if let DatabaseIndexValue::NonClustered(doc) = index_write.value {
if let Ok(table_name) = table_mapping.tablet_name(doc.tablet_id) {
let tablet_id = doc.tablet_id;
let Ok(table_namespace) = table_mapping.tablet_namespace(tablet_id) else {
continue;
};
let component_id = ComponentId::from(table_namespace);
let component_path = component_registry
.get_component_path(component_id, &mut TransactionReadSet::new())
// It's possible that the component gets deleted in this transaction. In that case, miscount the usage as root.
.unwrap_or(ComponentPath::root());
if let Ok(table_name) = table_mapping.tablet_name(tablet_id) {
// Index metadata is never a vector
// Database bandwidth for index writes
usage_tracker.track_database_ingress_size(
component_path,
table_name.to_string(),
index_write.key.size() as u64,
// Exclude indexes on system tables or reserved system indexes on user
Expand All @@ -768,16 +787,27 @@ impl<RT: Runtime> Committer<RT> {
} = validated_write;
if let Some(document) = document {
let document_write_size = document_id.size() + document.size();
if let Ok(table_name) = table_mapping.tablet_name(document.id().tablet_id) {
let tablet_id = document.id().tablet_id;
let Ok(table_namespace) = table_mapping.tablet_namespace(tablet_id) else {
continue;
};
let component_id = ComponentId::from(table_namespace);
let component_path = component_registry
.get_component_path(component_id, &mut TransactionReadSet::new())
// It's possible that the component gets deleted in this transaction. In that case, miscount the usage as root.
.unwrap_or(ComponentPath::root());
if let Ok(table_name) = table_mapping.tablet_name(tablet_id) {
// Database bandwidth for document writes
if *doc_in_vector_index == DocInVectorIndex::Absent {
usage_tracker.track_database_ingress_size(
component_path,
table_name.to_string(),
document_write_size as u64,
table_name.is_system(),
);
} else {
usage_tracker.track_vector_ingress_size(
component_path,
table_name.to_string(),
document_write_size as u64,
table_name.is_system(),
Expand Down
22 changes: 22 additions & 0 deletions crates/database/src/component_registry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::BTreeMap;

use anyhow::Context;
use common::{
bootstrap_model::{
components::{
Expand Down Expand Up @@ -136,6 +137,27 @@ impl ComponentRegistry {
Some(ComponentPath::from(path))
}

pub fn must_component_path(
&self,
component_id: ComponentId,
reads: &mut TransactionReadSet,
) -> anyhow::Result<ComponentPath> {
self.get_component_path(component_id, reads)
.with_context(|| format!("Component {component_id:?} not found"))
}

pub fn component_path_from_document_id(
&self,
table_mapping: &TableMapping,
id: ResolvedDocumentId,
reads: &mut TransactionReadSet,
) -> anyhow::Result<Option<ComponentPath>> {
let tablet_id = id.tablet_id;
let table_namespace = table_mapping.tablet_namespace(tablet_id)?;
let component_id = ComponentId::from(table_namespace);
Ok(self.get_component_path(component_id, reads))
}

pub fn all_component_paths(
&self,
reads: &mut TransactionReadSet,
Expand Down
7 changes: 6 additions & 1 deletion crates/database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2062,9 +2062,10 @@ impl<RT: Runtime> Database<RT> {
let timer = metrics::vector::vector_search_timer();
let usage = FunctionUsageTracker::new();
let snapshot = self.snapshot(ts)?;
let component_id = query.component_id;
let table_mapping = snapshot
.table_mapping()
.namespace(TableNamespace::from(query.component_id));
.namespace(TableNamespace::from(component_id));
if !table_mapping.name_exists(query.index_name.table()) {
return Ok((vec![], usage.gather_user_stats()));
}
Expand All @@ -2091,7 +2092,11 @@ impl<RT: Runtime> Database<RT> {
.map(|r| r.to_public(table_number))
.collect();
let size: u64 = results.iter().map(|row| row.size() as u64).sum();
let component_path = snapshot
.component_registry
.must_component_path(component_id, &mut TransactionReadSet::new())?;
usage.track_vector_egress_size(
component_path,
table_mapping.tablet_name(*index_name.table())?.to_string(),
size,
// We don't have system owned vector indexes.
Expand Down
3 changes: 3 additions & 0 deletions crates/database/src/query/index_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
use async_trait::async_trait;
use common::{
bootstrap_model::index::database_index::IndexedFields,
components::ComponentId,
document::DeveloperDocument,
index::IndexKeyBytes,
interval::Interval,
Expand Down Expand Up @@ -212,7 +213,9 @@ impl IndexRange {
.record_read_document(&v, self.printable_index_name.table())?;

// Database bandwidth for index reads
let component_path = tx.must_component_path(ComponentId::from(self.namespace))?;
tx.usage_tracker.track_database_egress_size(
component_path,
self.printable_index_name.table().to_string(),
index_bytes as u64,
self.printable_index_name.is_system_owned(),
Expand Down
Loading

0 comments on commit 5be8e0e

Please sign in to comment.