From 33d15879bd7eababa9bd543b99310f0e2ffc3a16 Mon Sep 17 00:00:00 2001 From: Emma Forman Ling Date: Mon, 23 Sep 2024 15:13:22 -0700 Subject: [PATCH] Fix TableNamespace todo in snapshot import (#29681) GitOrigin-RevId: 7aa5306411e509bfd553281487713d6590749b2b --- crates/application/src/snapshot_import.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/crates/application/src/snapshot_import.rs b/crates/application/src/snapshot_import.rs index 4db262af..ceeea55b 100644 --- a/crates/application/src/snapshot_import.rs +++ b/crates/application/src/snapshot_import.rs @@ -768,7 +768,7 @@ impl SnapshotImportWorker { let object_key = object_key.clone(); async move { self.read_snapshot_import(&object_key).await } }; - let objects = parse_objects(format.clone(), component_path, body_stream).boxed(); + let objects = parse_objects(format.clone(), component_path.clone(), body_stream).boxed(); // Remapping could be more extensive here, it's just relatively simple to handle // optional types. We do remapping after parsing rather than during parsing @@ -778,9 +778,19 @@ impl SnapshotImportWorker { let initial_schemas = schemas_for_import(&mut tx).await?; + let mut components_model = BootstrapComponentsModel::new(&mut tx); + let (_, component_id) = components_model + .must_component_path_to_ids(&component_path) + .with_context(|| ImportError::ComponentMissing(component_path))?; let objects = match format { ImportFormat::Csv(table_name) => { - remap_empty_string_by_schema(table_name, &mut tx, objects).await? + remap_empty_string_by_schema( + TableNamespace::from(component_id), + table_name, + &mut tx, + objects, + ) + .await? }, _ => objects, } @@ -2680,11 +2690,12 @@ async fn table_number_for_import( } async fn remap_empty_string_by_schema<'a, RT: Runtime>( + namespace: TableNamespace, table_name: TableName, tx: &mut Transaction, objects: BoxStream<'a, anyhow::Result>, ) -> anyhow::Result>> { - if let Some((_, schema)) = SchemaModel::new(tx, TableNamespace::by_component_TODO()) + if let Some((_, schema)) = SchemaModel::new(tx, namespace) .get_by_state(SchemaState::Active) .await? {