Skip to content

Commit

Permalink
run SchemaWorker in all namespaces (#26817)
Browse files Browse the repository at this point in the history
run schema worker in every namespace that has a `_schemas` table

GitOrigin-RevId: e1a9776a68c7c9bed8300f682c62f41c30bc28f5
  • Loading branch information
ldanilek authored and Convex, Inc. committed Jun 7, 2024
1 parent b162768 commit 00f5ec1
Showing 1 changed file with 45 additions and 30 deletions.
75 changes: 45 additions & 30 deletions crates/application/src/schema_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use database::{
IndexModel,
SchemaModel,
Transaction,
SCHEMAS_TABLE,
};
use errors::ErrorMetadataAnyhowExt;
use futures::{
Expand All @@ -25,7 +26,6 @@ use metrics::{
log_document_validated,
schema_validation_timer,
};
use value::TableNamespace;

use crate::metrics::log_worker_starting;

Expand Down Expand Up @@ -68,36 +68,51 @@ impl<RT: Runtime> SchemaWorker<RT> {
let status = log_worker_starting("SchemaWorker");
let mut tx: Transaction<RT> = self.database.begin(Identity::system()).await?;
let snapshot = self.database.snapshot(tx.begin_timestamp())?;
let mut pending_schema_work = None;
if let Some((id, db_schema)) = SchemaModel::new(&mut tx, TableNamespace::Global)
.get_by_state(SchemaState::Pending)
.await?
{
tracing::debug!("SchemaWorker found a pending schema and is validating it...");
let timer = schema_validation_timer();
let table_mapping = tx.table_mapping().namespace(TableNamespace::Global);
let virtual_table_mapping = tx.virtual_table_mapping().clone();

let active_schema = SchemaModel::new(&mut tx, TableNamespace::Global)
.get_by_state(SchemaState::Active)
let mut pending_schema_work = Vec::new();
let namespaces: Vec<_> = tx
.table_mapping()
.iter()
.filter_map(|(_, namespace, _, table_name)| {
if *table_name == *SCHEMAS_TABLE {
Some(namespace)
} else {
None
}
})
.collect();
for namespace in namespaces {
if let Some((id, db_schema)) = SchemaModel::new(&mut tx, namespace)
.get_by_state(SchemaState::Pending)
.await?
.map(|(_id, active_schema)| active_schema);
let ts = tx.begin_timestamp();
let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?;
pending_schema_work = Some((
id,
timer,
table_mapping,
virtual_table_mapping,
db_schema,
ts,
active_schema,
by_id_indexes,
));
{
tracing::debug!("SchemaWorker found a pending schema and is validating it...");
let timer = schema_validation_timer();
let table_mapping = tx.table_mapping().namespace(namespace);
let virtual_table_mapping = tx.virtual_table_mapping().clone();

let active_schema = SchemaModel::new(&mut tx, namespace)
.get_by_state(SchemaState::Active)
.await?
.map(|(_id, active_schema)| active_schema);
let ts = tx.begin_timestamp();
let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?;
pending_schema_work.push((
namespace,
id,
timer,
table_mapping,
virtual_table_mapping,
db_schema,
ts,
active_schema,
by_id_indexes,
));
}
}
let token = tx.into_token()?;

if let Some((
for (
namespace,
id,
timer,
table_mapping,
Expand All @@ -106,7 +121,7 @@ impl<RT: Runtime> SchemaWorker<RT> {
ts,
active_schema,
by_id_indexes,
)) = pending_schema_work
) in pending_schema_work
{
let tables_to_check = DatabaseSchema::tables_to_validate(
&db_schema,
Expand Down Expand Up @@ -141,7 +156,7 @@ impl<RT: Runtime> SchemaWorker<RT> {
let mut backoff = Backoff::new(INITIAL_COMMIT_BACKOFF, MAX_COMMIT_BACKOFF);
while backoff.failures() < MAX_COMMIT_FAILURES {
let mut tx = self.database.begin(Identity::system()).await?;
SchemaModel::new(&mut tx, TableNamespace::Global)
SchemaModel::new(&mut tx, namespace)
.mark_failed(id, schema_error.clone())
.await?;
if let Err(e) = self
Expand Down Expand Up @@ -171,7 +186,7 @@ impl<RT: Runtime> SchemaWorker<RT> {
}
}
let mut tx = self.database.begin(Identity::system()).await?;
if let Err(error) = SchemaModel::new(&mut tx, TableNamespace::Global)
if let Err(error) = SchemaModel::new(&mut tx, namespace)
.mark_validated(id)
.await
{
Expand Down

0 comments on commit 00f5ec1

Please sign in to comment.