Skip to content

Commit

Permalink
Improve schema checking path for deploy2 (#29394)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: d244fa200dc2687b066abc5439e222679192589a
  • Loading branch information
sujayakar authored and Convex, Inc. committed Aug 31, 2024
1 parent 036e1b3 commit 64b5093
Show file tree
Hide file tree
Showing 10 changed files with 309 additions and 59 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions crates/application/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,17 @@ sync_types = { package = "convex_sync_types", path = "../../crates/convex/sync_t
tempfile = { workspace = true }
thiserror = { workspace = true }
thousands = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
usage_tracking = { path = "../../crates/usage_tracking" }
value = { path = "../value" }
vector = { path = "../vector" }

[dev-dependencies]
authentication = { path = "../../crates/authentication", features = ["testing"] }
authentication = { path = "../../crates/authentication", features = [
"testing",
] }
common = { path = "../common", features = ["testing"] }
database = { path = "../database", features = ["testing"] }
errors = { path = "../errors", features = ["testing"] }
Expand All @@ -89,7 +92,9 @@ runtime = { path = "../runtime", features = ["testing"] }
search = { path = "../search", features = ["testing"] }
shape_inference = { path = "../shape_inference", features = ["testing"] }
storage = { path = "../storage", features = ["testing"] }
usage_tracking = { path = "../../crates/usage_tracking", features = ["testing"] }
usage_tracking = { path = "../../crates/usage_tracking", features = [
"testing",
] }
value = { path = "../value", features = ["testing"] }
vector = { path = "../vector", features = ["testing"] }

Expand Down
87 changes: 87 additions & 0 deletions crates/application/src/deploy_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,3 +575,90 @@ pub struct FinishPushDiff {
pub definition_diffs: BTreeMap<ComponentDefinitionPath, ComponentDefinitionDiff>,
pub component_diffs: BTreeMap<ComponentPath, ComponentDiff>,
}

#[derive(Debug)]
pub enum SchemaStatus {
InProgress {
components: BTreeMap<ComponentPath, ComponentSchemaStatus>,
},
Failed {
error: String,
component_path: ComponentPath,
table_name: Option<String>,
},
RaceDetected,
Complete,
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
#[serde(rename_all = "camelCase")]
pub enum SchemaStatusJson {
#[serde(rename_all = "camelCase")]
InProgress {
components: BTreeMap<String, ComponentSchemaStatusJson>,
},
#[serde(rename_all = "camelCase")]
Failed {
error: String,
component_path: String,
table_name: Option<String>,
},
RaceDetected,
Complete,
}

impl From<SchemaStatus> for SchemaStatusJson {
fn from(value: SchemaStatus) -> Self {
match value {
SchemaStatus::InProgress { components } => SchemaStatusJson::InProgress {
components: components
.into_iter()
.map(|(k, v)| (String::from(k), v.into()))
.collect(),
},
SchemaStatus::Failed {
error,
component_path,
table_name,
} => SchemaStatusJson::Failed {
error,
component_path: String::from(component_path),
table_name,
},
SchemaStatus::RaceDetected => SchemaStatusJson::RaceDetected,
SchemaStatus::Complete => SchemaStatusJson::Complete,
}
}
}

#[derive(Debug)]
pub struct ComponentSchemaStatus {
pub schema_validation_complete: bool,
pub indexes_complete: usize,
pub indexes_total: usize,
}

impl ComponentSchemaStatus {
pub fn is_complete(&self) -> bool {
self.schema_validation_complete && self.indexes_complete == self.indexes_total
}
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ComponentSchemaStatusJson {
pub schema_validation_complete: bool,
pub indexes_complete: usize,
pub indexes_total: usize,
}

impl From<ComponentSchemaStatus> for ComponentSchemaStatusJson {
fn from(value: ComponentSchemaStatus) -> Self {
Self {
schema_validation_complete: value.schema_validation_complete,
indexes_complete: value.indexes_complete,
indexes_total: value.indexes_total,
}
}
}
72 changes: 45 additions & 27 deletions crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
use std::{
collections::{
BTreeMap,
BTreeSet,
HashSet,
},
ops::Bound,
sync::Arc,
time::SystemTime,
time::{
Duration,
SystemTime,
},
};

use anyhow::Context;
Expand Down Expand Up @@ -117,7 +119,9 @@ use database::{
WriteSource,
};
use deploy_config::{
ComponentSchemaStatus,
FinishPushDiff,
SchemaStatus,
StartPushResponse,
};
use errors::{
Expand Down Expand Up @@ -1903,10 +1907,12 @@ impl<RT: Runtime> Application<RT> {
&self,
identity: Identity,
schema_change: SchemaChange,
) -> anyhow::Result<()> {
timeout: Duration,
) -> anyhow::Result<SchemaStatus> {
let deadline = self.runtime().monotonic_now() + timeout;
loop {
let mut tx = self.begin(identity.clone()).await?;
let mut waiting = BTreeSet::new();
let mut components_status = BTreeMap::new();

for (component_path, schema_id) in &schema_change.schema_ids {
let Some(schema_id) = schema_id else {
Expand All @@ -1922,24 +1928,18 @@ impl<RT: Runtime> Application<RT> {
.await?
.context("Missing schema document")?;
let SchemaMetadata { state, .. } = document.into_value().0.try_into()?;
let is_pending = match state {
SchemaState::Pending => true,
SchemaState::Active | SchemaState::Validated => false,
let schema_validation_complete = match state {
SchemaState::Pending => false,
SchemaState::Active | SchemaState::Validated => true,
SchemaState::Failed { error, table_name } => {
let msg = match table_name {
Some(t) => format!("Schema for table `{t}` failed: {error}"),
None => format!("Schema failed: {error}"),
};
anyhow::bail!(ErrorMetadata::bad_request("SchemaFailed", msg))
return Ok(SchemaStatus::Failed {
error,
component_path: component_path.clone(),
table_name,
});
},
SchemaState::Overwritten => anyhow::bail!(ErrorMetadata::bad_request(
"RaceDetected",
"Push aborted since another push has been started."
)),
SchemaState::Overwritten => return Ok(SchemaStatus::RaceDetected),
};
if is_pending {
waiting.insert(component_path.clone());
}

let component_id = if component_path.is_root() {
ComponentId::Root
Expand All @@ -1956,27 +1956,45 @@ impl<RT: Runtime> Application<RT> {
ComponentId::Child(internal_id)
};
let namespace = TableNamespace::from(component_id);
let mut indexes_complete = 0;
let mut indexes_total = 0;
for index in IndexModel::new(&mut tx)
.get_application_indexes(namespace)
.await?
{
if index.config.is_backfilling() {
waiting.insert(component_path.clone());
if !index.config.is_backfilling() {
indexes_complete += 1;
}
indexes_total += 1;
}
components_status.insert(
component_path.clone(),
ComponentSchemaStatus {
schema_validation_complete,
indexes_complete,
indexes_total,
},
);
}

if waiting.is_empty() {
break;
if components_status.values().all(|c| c.is_complete()) {
return Ok(SchemaStatus::Complete);
}

tracing::info!("Waiting for schema changes to complete for {waiting:?}...");

let now = self.runtime().monotonic_now();
if now > deadline {
return Ok(SchemaStatus::InProgress {
components: components_status,
});
}
let token = tx.into_token()?;
let subscription = self.subscribe(token).await?;
subscription.wait_for_invalidation().await;

tokio::select! {
_ = subscription.wait_for_invalidation() => {},
_ = self.runtime.wait(deadline.clone() - now) => {},
}
}
Ok(())
}

pub async fn finish_push(
Expand Down
21 changes: 18 additions & 3 deletions crates/application/src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ use value::{

use crate::{
cron_jobs::CronJobExecutor,
deploy_config::StartPushRequest,
deploy_config::{
SchemaStatus,
StartPushRequest,
},
log_visibility::AllowLogging,
scheduled_jobs::{
ScheduledJobExecutor,
Expand Down Expand Up @@ -321,8 +324,20 @@ impl<RT: Runtime> ApplicationTestExt<RT> for Application<RT> {
async fn load_component_tests_modules(&self, layout: &str) -> anyhow::Result<()> {
let request = Self::load_start_push_request(Path::new(layout))?;
let start_push = self.start_push(request).await?;
self.wait_for_schema(Identity::system(), start_push.schema_change.clone())
.await?;
loop {
let schema_status = self
.wait_for_schema(
Identity::system(),
start_push.schema_change.clone(),
Duration::from_secs(10),
)
.await?;
match schema_status {
SchemaStatus::InProgress { .. } => continue,
SchemaStatus::Complete => break,
_ => anyhow::bail!("Unexpected schema status: {schema_status:?}"),
}
}
self.finish_push(start_push, false).await?;
Ok(())
}
Expand Down
32 changes: 27 additions & 5 deletions crates/local_backend/src/deploy_config2.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::collections::BTreeMap;
use std::{
collections::BTreeMap,
time::Duration,
};

use application::deploy_config::{
FinishPushDiff,
SchemaStatusJson,
StartPushRequest,
StartPushResponse,
};
Expand Down Expand Up @@ -163,11 +167,28 @@ pub async fn start_push(
Ok(Json(SerializedStartPushResponse::try_from(resp)?))
}

const DEFAULT_SCHEMA_TIMEOUT_MS: u32 = 10_000;

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WaitForSchemaRequest {
admin_key: String,
schema_change: SerializedSchemaChange,
timeout_ms: Option<u32>,
}

#[derive(Serialize)]
#[serde(tag = "type")]
pub enum WaitForSchemaResponse {
InProgress {
status: SchemaStatusJson,
},
Failed {
error: String,
table_name: Option<String>,
},
RaceDetected,
Complete,
}

#[debug_handler]
Expand All @@ -181,12 +202,13 @@ pub async fn wait_for_schema(
req.admin_key,
)
.await?;

let timeout = Duration::from_millis(req.timeout_ms.unwrap_or(DEFAULT_SCHEMA_TIMEOUT_MS) as u64);
let schema_change = req.schema_change.try_into()?;
st.application
.wait_for_schema(identity, schema_change)
let resp = st
.application
.wait_for_schema(identity, schema_change, timeout)
.await?;
Ok(Json(()))
Ok(Json(SchemaStatusJson::from(resp)))
}

#[derive(Deserialize)]
Expand Down
9 changes: 7 additions & 2 deletions npm-packages/convex/src/cli/lib/codegen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ export async function doInitialComponentCodegen(
ctx: Context,
tmpDir: TempDir,
componentDirectory: ComponentDirectory,
opts?: { dryRun?: boolean; generateCommonJSApi?: boolean; debug?: boolean },
opts?: {
dryRun?: boolean;
generateCommonJSApi?: boolean;
debug?: boolean;
verbose?: boolean;
},
) {
const { projectConfig } = await readProjectConfig(ctx);

Expand All @@ -156,7 +161,7 @@ export async function doInitialComponentCodegen(
const isPublishedPackage =
componentDirectory.definitionPath.endsWith(".js") &&
!componentDirectory.isRoot;
if (isPublishedPackage) {
if (isPublishedPackage && opts?.verbose) {
logMessage(
ctx,
`skipping initial codegen for installed package ${componentDirectory.path}`,
Expand Down
Loading

0 comments on commit 64b5093

Please sign in to comment.