Skip to content

Commit

Permalink
wire up composition runner (#2154)
Browse files Browse the repository at this point in the history
  • Loading branch information
dotdat committed Sep 18, 2024
1 parent d52e3c5 commit 61b41d5
Show file tree
Hide file tree
Showing 19 changed files with 381 additions and 373 deletions.
2 changes: 1 addition & 1 deletion src/command/dev/introspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rover_std::Style;
use crate::command::dev::protocol::{SubgraphSdl, SubgraphUrl};
use crate::command::graph::Introspect as GraphIntrospect;
use crate::command::subgraph::Introspect as SubgraphIntrospect;
use crate::options::{IntrospectOpts, OutputOpts};
use crate::options::{IntrospectOpts};
use crate::{RoverError, RoverErrorSuggestion, RoverResult};

#[derive(Clone, Debug)]
Expand Down
64 changes: 52 additions & 12 deletions src/command/supergraph/compose/do_compose.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::env::current_dir;
use std::{fs::File, io::Write, process::Command, str};

use anyhow::{anyhow, Context};
Expand All @@ -11,15 +10,19 @@ use apollo_federation_types::{
use camino::Utf8PathBuf;
use clap::{Args, Parser};
use derive_getters::Getters;
use rover_std::errln;
use semver::Version;
use serde::Serialize;
use std::io::Read;
use tempfile::NamedTempFile;
use tokio_stream::StreamExt;

use rover_client::shared::GraphRef;
use rover_client::RoverClientError;

use crate::composition::supergraph::binary::{OutputTarget, SupergraphBinary};
use crate::composition::supergraph::config::SupergraphConfigResolver;
use crate::composition::supergraph::version::SupergraphVersion;
use crate::utils::supergraph_config::get_supergraph_config;
use crate::utils::{client::StudioClientConfig, parsers::FileDescriptorType};
use crate::{
Expand Down Expand Up @@ -128,21 +131,25 @@ impl Compose {
) -> RoverResult<RoverOutput> {
#[cfg(debug_assertions)]
if self.opts.watch {
let supergraph_config_root = if let Some(FileDescriptorType::File(file_path)) =
// Get the current supergraph config path.
let supergraph_config_path = if let Some(FileDescriptorType::File(file_path)) =
&self.opts.supergraph_config_source.supergraph_yaml
{
file_path
.parent()
.ok_or_else(|| {
anyhow!("Could not get the parent directory of ({})", file_path)
})?
.to_path_buf()
} else {
Utf8PathBuf::try_from(current_dir()?)?
todo!("only start subgraph watchers");
};

let supergraph_config_root = supergraph_config_path
.parent()
.ok_or_else(|| {
anyhow!("Could not get the parent directory of ({supergraph_config_path})")
})?
.to_path_buf();

let studio_client =
client_config.get_authenticated_client(&self.opts.plugin_opts.profile)?;
let internal_supergraph_config_path =
let target_supergraph_config_path =
Utf8PathBuf::from_path_buf(NamedTempFile::new()?.into_temp_path().to_path_buf())
.map_err(|err| {
anyhow!("Unable to convert PathBuf ({:?}) to Utf8PathBuf", err)
Expand All @@ -158,9 +165,42 @@ impl Compose {
.await?
.lazily_resolve_subgraphs(&supergraph_config_root)
.await?
.write(internal_supergraph_config_path)?;
let runner = crate::composition::runner::Runner::new(supergraph_config);
runner.run().await?;
.with_target(target_supergraph_config_path);

let federation_version = supergraph_config.federation_version();
let install_path = self
.maybe_install_supergraph(override_install_path, client_config, federation_version)
.await?;
let exact_federation_version = Self::extract_federation_version(&install_path)?;
let exact_version = match exact_federation_version {
FederationVersion::ExactFedTwo(exact_version) => exact_version,
FederationVersion::ExactFedOne(exact_version) => exact_version,
_ => {
errln!(
"Unable to extract the exact federation version from the supergraph binary from path {}", install_path
);
return Err(anyhow!(
"Unable to extract the exact federation version from the supergraph binary"
)
.into());
}
};
let supergraph_binary = SupergraphBinary::new(
install_path,
SupergraphVersion::new(exact_version),
output_file
.map(OutputTarget::File)
.unwrap_or_else(|| OutputTarget::Stdout),
);
let runner =
crate::composition::runner::Runner::new(supergraph_config, supergraph_binary);
let mut messages = runner.run().await?;
tokio::task::spawn(async move {
while let Some(message) = messages.next().await {
eprintln!("{:?}", message);
}
});

return Ok(RoverOutput::EmptySuccess);
}
let mut supergraph_config = get_supergraph_config(
Expand Down
1 change: 1 addition & 0 deletions src/composition/events/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{CompositionError, CompositionSuccess};

/// Events emitted from composition
#[derive(Debug)]
pub enum CompositionEvent {
/// The composition has started and may not have finished yet. This is useful for letting users
/// know composition is running
Expand Down
124 changes: 1 addition & 123 deletions src/composition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,9 @@ use apollo_federation_types::{
};
use camino::Utf8PathBuf;
use derive_getters::Getters;
use events::CompositionEvent;
use futures::{stream::BoxStream, StreamExt};
use supergraph::binary::SupergraphBinary;
use tokio::task::AbortHandle;
use tokio_stream::wrappers::UnboundedReceiverStream;
use watchers::{
subtask::{Subtask, SubtaskHandleUnit, SubtaskRunUnit},
watcher::{router_config::RouterConfigMessage, supergraph_config::SupergraphConfigDiff},
};

use crate::utils::effect::{exec::TokioCommand, read_file::FsReadFile};

pub mod events;
pub mod run_composition;
pub mod runner;
pub mod supergraph;
pub mod types;
Expand Down Expand Up @@ -53,115 +43,3 @@ pub enum CompositionError {
// (alternatively, we could just reword the error message to allow for either)
},
}

// NB: this is where we'll contain the logic for kicking off watchers
struct Composition {
supergraph_binary: SupergraphBinary,
supergraph_config_events: Option<InputEvent>,
router_config_events: Option<InputEvent>,
}

enum InputEvent {
SupergraphConfig(BoxStream<'static, SupergraphConfigDiff>),
RouterConfig(BoxStream<'static, RouterConfigMessage>),
}

impl Composition {
fn new(supergraph_binary: SupergraphBinary) -> Self {
Self {
supergraph_binary,
supergraph_config_events: None,
router_config_events: None,
}
}

fn with_supergraph_config_events(
&mut self,
supergraph_config_events: BoxStream<'static, SupergraphConfigDiff>,
) -> &mut Self {
self.supergraph_config_events =
Some(InputEvent::SupergraphConfig(supergraph_config_events));
self
}

fn with_router_config_events(
&mut self,
router_config_events: BoxStream<'static, RouterConfigMessage>,
) -> &mut Self {
self.router_config_events = Some(InputEvent::RouterConfig(router_config_events));
self
}

async fn watch(self) -> WatchResultBetterName {
let (composition_events, composition_subtask): (
UnboundedReceiverStream<CompositionEvent>,
Subtask<Composition, CompositionEvent>,
) = Subtask::new(self);

let abort_handle = composition_subtask.run();

WatchResultBetterName {
abort_handle,
composition_events,
}
}
}

struct WatchResultBetterName {
abort_handle: AbortHandle,
composition_events: UnboundedReceiverStream<CompositionEvent>,
}

// NB: this is where we'll bring it all together to actually watch incoming events from watchers to
// decide whether we need to recompose/etc
impl SubtaskHandleUnit for Composition {
type Output = CompositionEvent;

fn handle(
self,
sender: tokio::sync::mpsc::UnboundedSender<Self::Output>,
) -> tokio::task::AbortHandle {
let mut events = Vec::new();
if let Some(supergraph_config_events) = self.supergraph_config_events {
events.push(supergraph_config_events);
}
if let Some(router_config_events) = self.router_config_events {
events.push(router_config_events);
}

tokio::spawn(async move {
for event_source in events {
match event_source {
InputEvent::SupergraphConfig(mut events) => {
while let Some(event) = events.next().await {
sender.send(CompositionEvent::Started);
let current_supergraph_config = event.current();

// TODO: write current_supergraph_config to a path
match self
.supergraph_binary
.compose(
&TokioCommand::default(),
&FsReadFile::default(),
&Utf8PathBuf::new(),
)
.await
{
Ok(success) => sender.send(CompositionEvent::Success(success)),
Err(failure) => sender.send(CompositionEvent::Error(failure)),
};
}
}
InputEvent::RouterConfig(mut events) => {
while let Some(_event) = events.next().await {
// TODO: nothing, this is just an example of how to handle different
// streams; composition _shouldn't_ run when the router config changes,
// unless I'm mistaken
}
}
}
}
})
.abort_handle()
}
}
62 changes: 62 additions & 0 deletions src/composition/run_composition.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use buildstructor::Builder;
use tap::TapFallible;
use tokio_stream::StreamExt;

use crate::utils::effect::{exec::ExecCommand, read_file::ReadFile};

use super::{
events::CompositionEvent,
supergraph::{binary::SupergraphBinary, config::FinalSupergraphConfig},
watchers::{subtask::SubtaskHandleStream, watcher::subgraph::SubgraphChanged},
};

#[derive(Builder)]
pub struct RunComposition<ReadF, ExecC> {
supergraph_config: FinalSupergraphConfig,
supergraph_binary: SupergraphBinary,
exec_command: ExecC,
read_file: ReadF,
}

impl<ReadF, ExecC> SubtaskHandleStream for RunComposition<ReadF, ExecC>
where
ReadF: ReadFile + Clone + Send + Sync + 'static,
ExecC: ExecCommand + Clone + Send + Sync + 'static,
{
type Input = SubgraphChanged;
type Output = CompositionEvent;
fn handle(
self,
sender: tokio::sync::mpsc::UnboundedSender<Self::Output>,
mut input: futures::stream::BoxStream<'static, Self::Input>,
) -> tokio::task::AbortHandle {
let supergraph_config = self.supergraph_config.clone();
tokio::task::spawn(async move {
while let Some(_) = input.next().await {
// this block makes sure that the read lock is dropped asap
let output = {
let path = supergraph_config.read_lock().await;
let _ = sender
.send(CompositionEvent::Started)
.tap_err(|err| tracing::error!("{:?}", err));
self.supergraph_binary
.compose(&self.exec_command, &self.read_file, &*path)
.await
};
match output {
Ok(success) => {
let _ = sender
.send(CompositionEvent::Success(success))
.tap_err(|err| tracing::error!("{:?}", err));
}
Err(err) => {
let _ = sender
.send(CompositionEvent::Error(err))
.tap_err(|err| tracing::error!("{:?}", err));
}
}
}
})
.abort_handle()
}
}
Loading

0 comments on commit 61b41d5

Please sign in to comment.