From 00f5ec15f8a5bfe044ec3d7548b139b2ce8d9710 Mon Sep 17 00:00:00 2001 From: Lee Danilek Date: Thu, 6 Jun 2024 20:32:00 -0400 Subject: [PATCH] run SchemaWorker in all namespaces (#26817) run schema worker in every namespace that has a `_schemas` table GitOrigin-RevId: e1a9776a68c7c9bed8300f682c62f41c30bc28f5 --- crates/application/src/schema_worker/mod.rs | 75 ++++++++++++--------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/crates/application/src/schema_worker/mod.rs b/crates/application/src/schema_worker/mod.rs index a8ef560d..428700db 100644 --- a/crates/application/src/schema_worker/mod.rs +++ b/crates/application/src/schema_worker/mod.rs @@ -12,6 +12,7 @@ use database::{ IndexModel, SchemaModel, Transaction, + SCHEMAS_TABLE, }; use errors::ErrorMetadataAnyhowExt; use futures::{ @@ -25,7 +26,6 @@ use metrics::{ log_document_validated, schema_validation_timer, }; -use value::TableNamespace; use crate::metrics::log_worker_starting; @@ -68,36 +68,51 @@ impl SchemaWorker { let status = log_worker_starting("SchemaWorker"); let mut tx: Transaction = self.database.begin(Identity::system()).await?; let snapshot = self.database.snapshot(tx.begin_timestamp())?; - let mut pending_schema_work = None; - if let Some((id, db_schema)) = SchemaModel::new(&mut tx, TableNamespace::Global) - .get_by_state(SchemaState::Pending) - .await? - { - tracing::debug!("SchemaWorker found a pending schema and is validating it..."); - let timer = schema_validation_timer(); - let table_mapping = tx.table_mapping().namespace(TableNamespace::Global); - let virtual_table_mapping = tx.virtual_table_mapping().clone(); - - let active_schema = SchemaModel::new(&mut tx, TableNamespace::Global) - .get_by_state(SchemaState::Active) + let mut pending_schema_work = Vec::new(); + let namespaces: Vec<_> = tx + .table_mapping() + .iter() + .filter_map(|(_, namespace, _, table_name)| { + if *table_name == *SCHEMAS_TABLE { + Some(namespace) + } else { + None + } + }) + .collect(); + for namespace in namespaces { + if let Some((id, db_schema)) = SchemaModel::new(&mut tx, namespace) + .get_by_state(SchemaState::Pending) .await? - .map(|(_id, active_schema)| active_schema); - let ts = tx.begin_timestamp(); - let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?; - pending_schema_work = Some(( - id, - timer, - table_mapping, - virtual_table_mapping, - db_schema, - ts, - active_schema, - by_id_indexes, - )); + { + tracing::debug!("SchemaWorker found a pending schema and is validating it..."); + let timer = schema_validation_timer(); + let table_mapping = tx.table_mapping().namespace(namespace); + let virtual_table_mapping = tx.virtual_table_mapping().clone(); + + let active_schema = SchemaModel::new(&mut tx, namespace) + .get_by_state(SchemaState::Active) + .await? + .map(|(_id, active_schema)| active_schema); + let ts = tx.begin_timestamp(); + let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?; + pending_schema_work.push(( + namespace, + id, + timer, + table_mapping, + virtual_table_mapping, + db_schema, + ts, + active_schema, + by_id_indexes, + )); + } } let token = tx.into_token()?; - if let Some(( + for ( + namespace, id, timer, table_mapping, @@ -106,7 +121,7 @@ impl SchemaWorker { ts, active_schema, by_id_indexes, - )) = pending_schema_work + ) in pending_schema_work { let tables_to_check = DatabaseSchema::tables_to_validate( &db_schema, @@ -141,7 +156,7 @@ impl SchemaWorker { let mut backoff = Backoff::new(INITIAL_COMMIT_BACKOFF, MAX_COMMIT_BACKOFF); while backoff.failures() < MAX_COMMIT_FAILURES { let mut tx = self.database.begin(Identity::system()).await?; - SchemaModel::new(&mut tx, TableNamespace::Global) + SchemaModel::new(&mut tx, namespace) .mark_failed(id, schema_error.clone()) .await?; if let Err(e) = self @@ -171,7 +186,7 @@ impl SchemaWorker { } } let mut tx = self.database.begin(Identity::system()).await?; - if let Err(error) = SchemaModel::new(&mut tx, TableNamespace::Global) + if let Err(error) = SchemaModel::new(&mut tx, namespace) .mark_validated(id) .await {