Skip to content

Commit

Permalink
[nexus] Add StartSaga::saga_run to await completion
Browse files Browse the repository at this point in the history
Nexus API handlers (and other sagas) that start sagas have the ability
to wait for the completion of the saga by `await`ing the `RunningSaga`
future returned by `RunnableSaga::start`. Background tasks, on the other
hand, cannot currently do this, as their only interface to the saga
executor is an `Arc<dyn StartSaga>`, which provides only the
[`saga_start` method][1]. This method throws away the `RunningSaga`
returned by `RunnableSaga::start`, so the completion of the saga cannot
be awaited. In some cases, it's desirable for background tasks to be
able to await a saga running to completion. I described some motivations
for this in #6569.

This commit adds a new `StartSaga::saga_run` method to the `StartSaga`
trait, which starts a saga *and* waits for it to finish. Since many
tests use a `NoopStartSaga` type which doesn't actually start sagas,
this interface still throws away most of the saga *outputs* provided by
`StoppedSaga`, to make it easier for the noop test implementation to
implement this method. If that's an issue in the future, we can revisit
the interface, and maybe make `NoopStartSaga` return fake saga results
or something.

I've updated the `instance_watcher` and `instance_updater` background
tasks to use the `saga_run` method, because they were written with the
intent to spawn tasks that run sagas to completion --- this is necessary
for how the number of concurrent update sagas is *supposed* to be
limited by `instance_watcher`. I left the region-replacement tasks using
`StartSaga::saga_start`, because --- as far as I can tell --- the "fire
and forget" behavior is desirable there. Perhaps @jmpesp can confirm
this?

Closes #6569

[1]: https://github.com/oxidecomputer/omicron/blob/8be99b0c0dd18495d4a98187145961eafdb17d8f/nexus/src/app/saga.rs#L96-L108
  • Loading branch information
hawkw committed Sep 13, 2024
1 parent 4c72357 commit 2f9ec4d
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 3 deletions.
13 changes: 13 additions & 0 deletions nexus/src/app/background/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,7 @@ fn init_dns(
#[cfg(test)]
pub mod test {
use crate::app::saga::StartSaga;
use crate::app::saga::StartSagaCompletionFuture;
use dropshot::HandlerTaskMode;
use futures::FutureExt;
use nexus_db_model::DnsGroup;
Expand Down Expand Up @@ -953,6 +954,18 @@ pub mod test {
let _ = self.count.fetch_add(1, Ordering::SeqCst);
async { Ok(()) }.boxed()
}

fn saga_run(
&self,
dag: steno::SagaDag,
) -> futures::prelude::future::BoxFuture<
'_,
Result<(), omicron_common::api::external::Error>,
> {
// Because we don't actually run sagas, this is equivalent to
// `saga_start`.
self.saga_start(dag)
}
}

type ControlPlaneTestContext =
Expand Down
3 changes: 2 additions & 1 deletion nexus/src/app/background/tasks/instance_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ impl InstanceUpdater {
let start_saga = self.sagas.clone();
sagas.spawn(async move {
start_saga
.saga_start(saga)
// Start the saga and wait for it to complete
.saga_run(saga)
.await
.map_err(|e| (instance_id, e))
});
Expand Down
2 changes: 1 addition & 1 deletion nexus/src/app/background/tasks/instance_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl InstanceWatcher {
}
Ok(Some((_, saga))) => {
check.update_saga_queued = true;
if let Err(e) = sagas.saga_start(saga).await {
if let Err(e) = sagas.saga_run(saga).await {
warn!(opctx.log, "update saga failed"; "error" => ?e);
check.result = Err(Incomplete::UpdateFailed);
}
Expand Down
2 changes: 2 additions & 0 deletions nexus/src/app/background/tasks/region_replacement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ impl RegionReplacementDetector {
};

let saga_dag = SagaRegionReplacementStart::prepare(&params)?;
// We only care that the saga was started, and don't wish to wait for it
// to complete, so use `StartSaga::saga_start`, rather than `saga_run`.
self.sagas.saga_start(saga_dag).await
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ impl RegionSnapshotReplacementGarbageCollect {

let saga_dag =
SagaRegionSnapshotReplacementGarbageCollect::prepare(&params)?;
// We only care that the saga was started, and don't wish to wait for it
// to complete, so use `StartSaga::saga_start`, rather than `saga_run`.
self.sagas.saga_start(saga_dag).await
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ impl RegionSnapshotReplacementDetector {
};

let saga_dag = SagaRegionSnapshotReplacementStart::prepare(&params)?;
// We only care that the saga was started, and don't wish to wait for it
// to complete, so use `StartSaga::saga_start`, rather than `saga_run`.
self.sagas.saga_start(saga_dag).await
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ impl RegionSnapshotReplacementFindAffected {
};

let saga_dag = SagaRegionSnapshotReplacementStep::prepare(&params)?;
// We only care that the saga was started, and don't wish to wait for it
// to complete, so use `StartSaga::saga_start`, rather than `saga_run`.
self.sagas.saga_start(saga_dag).await
}

Expand Down Expand Up @@ -89,7 +91,12 @@ impl RegionSnapshotReplacementFindAffected {

let saga_dag =
SagaRegionSnapshotReplacementStepGarbageCollect::prepare(&params)?;
self.sagas.saga_start(saga_dag).await

// We only care that the saga was started, and don't wish to wait for it
// to complete, so throwing out the future returned by `saga_start` is
// fine. Dropping it will *not* cancel the saga itself.
self.sagas.saga_start(saga_dag).await?;
Ok(())
}

async fn clean_up_region_snapshot_replacement_step_volumes(
Expand Down
28 changes: 28 additions & 0 deletions nexus/src/app/saga.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ pub(crate) trait StartSaga: Send + Sync {
/// Create a new saga (of type `N` with parameters `params`), start it
/// running, but do not wait for it to finish.
fn saga_start(&self, dag: SagaDag) -> BoxFuture<'_, Result<(), Error>>;

/// Create a new saga (of type `N` with parameters `params`), start it
/// running, and wait for it to finish.
fn saga_run(&self, dag: SagaDag) -> BoxFuture<'_, Result<(), Error>>;
}

impl StartSaga for SagaExecutor {
Expand All @@ -101,10 +105,34 @@ impl StartSaga for SagaExecutor {
// complete. We don't need that here. (Cancelling this has no
// effect on the running saga.)
let _ = runnable_saga.start().await?;

Ok(())
}
.boxed()
}

fn saga_run(&self, dag: SagaDag) -> BoxFuture<'_, Result<(), Error>> {
async move {
// Prepare the saga
self.saga_prepare(dag)
.await?
// Start the saga, returning a future that can be used to wait for
// its completion
.start()
.await?
// Wait for the saga to complete.
.wait_until_stopped()
.await
// Eat the saga's outputs, saga log, etc., and just return
// whether it succeeded or failed. This is necessary because
// some tests rely on a `NoopStartSaga` implementation that
// doesn't actually run sagas and therefore cannot produce a
// real saga log or outputs.
.into_omicron_result()
.map(|_| ())
}
.boxed()
}
}

/// Handle to a self-contained subsystem for kicking off sagas
Expand Down

0 comments on commit 2f9ec4d

Please sign in to comment.