Skip to content

Commit

Permalink
fix(introspection): use memory, not files
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronArinder committed Sep 18, 2024
1 parent c94d8ee commit 098e598
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 101 deletions.
2 changes: 1 addition & 1 deletion src/command/dev/introspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rover_std::Style;
use crate::command::dev::protocol::{SubgraphSdl, SubgraphUrl};
use crate::command::graph::Introspect as GraphIntrospect;
use crate::command::subgraph::Introspect as SubgraphIntrospect;
use crate::options::IntrospectOpts;
use crate::options::{IntrospectOpts, OutputOpts};

Check warning on line 10 in src/command/dev/introspect.rs

View workflow job for this annotation

GitHub Actions / Build Rover for macOS x86-64

unused import: `OutputOpts`
use crate::{RoverError, RoverErrorSuggestion, RoverResult};

#[derive(Clone, Debug)]
Expand Down
12 changes: 6 additions & 6 deletions src/command/subgraph/introspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl Introspect {
retry_period: Option<Duration>,
) -> RoverResult<RoverOutput> {
if self.opts.watch {
self.exec_and_watch(&client, output_opts, retry_period)
self.exec_and_watch(&client, &output_opts, retry_period)
.await
} else {
let sdl = self.exec(&client, true, retry_period).await?;
Expand All @@ -49,11 +49,11 @@ impl Introspect {
}
};

Ok(
introspect::run(SubgraphIntrospectInput { headers }, &client, should_retry)
.await?
.result,
)
let sdl = introspect::run(SubgraphIntrospectInput { headers }, &client, should_retry)
.await?
.result;

Ok(sdl)
}

pub async fn exec_and_watch(
Expand Down
12 changes: 5 additions & 7 deletions src/composition/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
subtask::{Subtask, SubtaskRunUnit},
watcher::{
file::FileWatcher,
subgraph_config::{SubgraphConfigWatcher, SubgraphConfigWatcherKind},
subgraph_config::{SubgraphWatcher, SubgraphWatcherKind},

Check warning on line 11 in src/composition/runner.rs

View workflow job for this annotation

GitHub Actions / Build Rover for macOS x86-64

unused import: `SubgraphWatcherKind`
supergraph_config::SupergraphConfigWatcher,
},
},
Expand Down Expand Up @@ -56,25 +56,23 @@ impl Runner {
}
}));

// Create subgraph config watchers.
// Create subgraph watchers.
for (subgraph, subgraph_config) in supergraph_config.into_iter() {
// Create a new watcher kind.
let watcher_kind: SubgraphConfigWatcherKind = match subgraph_config.schema.try_into() {
let subgraph_watcher: SubgraphWatcher = match subgraph_config.schema.try_into() {
Ok(kind) => kind,
Err(err) => {
errln!("skipping subgraph {subgraph}: {err}");
continue;
}
};

// Construct a subgraph config watcher from the file watcher kind.
let watcher = SubgraphConfigWatcher::new(watcher_kind, &subgraph);
// Create and run the file watcher in a sub task.
let (mut stream, subtask) = Subtask::new(watcher);
let (mut stream, subtask) = Subtask::new(subgraph_watcher);
subtask.run();

let task = tokio::task::spawn(async move {
while let Some(_) = stream.next().await {
// TODO: emit composition events
eprintln!("subgraph update: {subgraph}");
}
});
Expand Down
172 changes: 87 additions & 85 deletions src/composition/watchers/watcher/subgraph_config.rs
Original file line number Diff line number Diff line change
@@ -1,62 +1,93 @@
use std::{fs::OpenOptions, marker::Send, pin::Pin};
use std::{marker::Send, pin::Pin};

use anyhow::anyhow;
use apollo_federation_types::config::SchemaSource;
use camino::Utf8PathBuf;
use futures::{Stream, StreamExt};
use tap::TapFallible;
use tokio::{sync::mpsc::UnboundedSender, task::AbortHandle};
use tokio::{
sync::mpsc::{unbounded_channel, UnboundedSender},
task::AbortHandle,
};
use tokio_stream::wrappers::UnboundedReceiverStream;

use crate::{
cli::RoverOutputFormatKind,
command::subgraph::introspect::Introspect as SubgraphIntrospect,
composition::{types::SubgraphUrl, watchers::subtask::SubtaskHandleUnit},
options::{IntrospectOpts, OutputOpts},
options::{IntrospectOpts, OutputChannelKind, OutputOpts},
};

use super::file::FileWatcher;

/// A subgraph watcher watches subgraphs for changes. It's important to know when a subgraph
/// changes because it informs any listeners that they may need to react (eg, by recomposing when
/// the listener is composition)
pub struct SubgraphWatcher {
/// The kind of watcher used (eg, file, introspection)
watcher: SubgraphWatcherKind,
}

/// The kind of watcher attached to the subgraph. This may be either file watching, when we're
/// paying attention to a particular subgraph's SDL file, or introspection, when we get the SDL by
/// polling an endpoint that has introspection enabled
#[derive(Debug, Clone)]
pub enum SubgraphConfigWatcherKind {
pub enum SubgraphWatcherKind {
/// Watch a file on disk.
File(FileWatcher),
/// Poll an endpoint via introspection.
Introspect(SubgraphIntrospection),
/// Don't ever update, schema is only pulled once.
// TODO: figure out what to do with this; is it ever used? can we remove it?
_Once(String),
}

impl SubgraphConfigWatcherKind {
async fn watch(&self, subgraph_name: &str) -> Pin<Box<dyn Stream<Item = String> + Send>> {
match self {
Self::File(file_watcher) => file_watcher.clone().watch(),
Self::Introspect(introspection) => introspection.watch(subgraph_name).await.watch(),
Self::_Once(_) => todo!(),
}
}
}

impl TryFrom<SchemaSource> for SubgraphConfigWatcherKind {
// FIXME: anyhow error -> bespoke error with impl From to rovererror or whatever
impl TryFrom<SchemaSource> for SubgraphWatcher {
type Error = anyhow::Error;

// SchemaSource comes from Apollo Federation types. Importantly, it strips comments and
// directives from introspection (but not when the source is a file)
fn try_from(schema_source: SchemaSource) -> Result<Self, Self::Error> {
match schema_source {
SchemaSource::File { file } => Ok(Self::File(FileWatcher::new(file))),
SchemaSource::File { file } => {
println!("wtf?");

Ok(Self {
watcher: SubgraphWatcherKind::File(FileWatcher::new(file)),
})
}
SchemaSource::SubgraphIntrospection {
subgraph_url,
introspection_headers,
} => Ok(Self::Introspect(SubgraphIntrospection::new(
subgraph_url,
introspection_headers.map(|header_map| header_map.into_iter().collect()),
))),
// SDL (stdin? not sure) / Subgraph (ie, from graph-ref)
} => Ok(Self {
watcher: SubgraphWatcherKind::Introspect(SubgraphIntrospection::new(
subgraph_url,
introspection_headers.map(|header_map| header_map.into_iter().collect()),
)),
}),
// TODO: figure out if there are any other sources to worry about; SDL (stdin? not sure) / Subgraph (ie, from graph-ref)
unsupported_source => Err(anyhow!(
"unsupported subgraph introspection source: {unsupported_source:?}"
)),
}
}
}

impl SubgraphWatcherKind {
/// Watch the subgraph for changes based on the kind of watcher attached
///
/// Development note: this is a stream of Strings, but in the future we might want something
/// more flexible to get type safety
async fn watch(&self) -> Pin<Box<dyn Stream<Item = String> + Send>> {
match self {
Self::File(file_watcher) => file_watcher.clone().watch(),
Self::Introspect(introspection) => introspection.watch(),
// TODO: figure out what this is; sdl? stdin one-off? either way, probs not watching
Self::_Once(_) => unimplemented!(),
}
}
}

/// Subgraph introspection
#[derive(Debug, Clone)]
pub struct SubgraphIntrospection {
endpoint: SubgraphUrl,
Expand All @@ -70,94 +101,65 @@ impl SubgraphIntrospection {
Self { endpoint, headers }
}

async fn watch(&self, subgraph_name: &str) -> FileWatcher {
//FIXME: unwrap removed
// TODO: does this re-use tmp dirs? or, what? don't want errors second time we run
// TODO: clean up after?
let tmp_dir = tempfile::Builder::new().tempdir().unwrap();
let tmp_config_dir_path = Utf8PathBuf::try_from(tmp_dir.into_path()).unwrap();

// NOTE: this assumes subgraph names are unique; are they?
let tmp_introspection_file = tmp_config_dir_path.join(subgraph_name);

let _ = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(tmp_introspection_file.clone())
// FIXME: unwrap
.unwrap();

let output_opts = OutputOpts {
format_kind: RoverOutputFormatKind::default(),
output_file: Some(tmp_introspection_file.clone()),
};

// TODO: better typing so that it's over some impl, not string; makes all watch() fns require
// returning a string
fn watch(&self) -> Pin<Box<dyn Stream<Item = String> + Send>> {
let client = reqwest::Client::new();
let endpoint = self.endpoint.clone();
let headers = self.headers.clone();

let (tx, rx) = unbounded_channel();
let rx_stream = UnboundedReceiverStream::new(rx);

// Spawn a tokio task in the background to watch for subgraph changes
tokio::spawn(async move {
// TODO: handle errors?
let _ = SubgraphIntrospect {
opts: IntrospectOpts {
endpoint,
headers,
// TODO impl retries (at least for dev from cli flag)
watch: true,
},
}
.run(client, &output_opts, None)
.await
.map_err(|err| anyhow!(err));
.run(
client,
&OutputOpts {
format_kind: RoverOutputFormatKind::default(),
output_file: None,
// Attach a transmitter to stream back any subgraph changes
channel: Some(tx),
},
// TODO: impl retries (at least for dev from cli flag)
None,
)
.await;
});

FileWatcher::new(tmp_introspection_file)
}
}

pub struct SubgraphConfigWatcher {
subgraph_name: String,
watcher: SubgraphConfigWatcherKind,
}

impl SubgraphConfigWatcher {
// not sure we need the subgraph config?
pub fn new(watcher: SubgraphConfigWatcherKind, subgraph_name: &str) -> Self {
Self {
watcher,
subgraph_name: subgraph_name.to_string(),
}
// Stream any subgraph changes, filtering out empty responses (None) while passing along
// the sdl changes
rx_stream
.filter_map(|change| async move {
match change {
OutputChannelKind::Sdl(sdl) => Some(sdl),
}
})
.boxed()
}
}

/// A unit struct denoting a change to a subgraph, used by composition to know whether to recompose
pub struct SubgraphChanged;

impl SubtaskHandleUnit for SubgraphConfigWatcher {
impl SubtaskHandleUnit for SubgraphWatcher {
type Output = SubgraphChanged;

fn handle(self, sender: UnboundedSender<Self::Output>) -> AbortHandle {
tokio::spawn(async move {
while let Some(content) = self.watcher.watch(&self.subgraph_name).await.next().await {
// TODO: fix parsing; see wtf is up
//let parsed_config: Result<SubgraphConfig, serde_yaml::Error> =
// serde_yaml::from_str(&content);
let mut watcher = self.watcher.watch().await;
while let Some(_change) = watcher.next().await {
let _ = sender
.send(SubgraphChanged)
.tap_err(|err| tracing::error!("{:?}", err));

// We're only looking at whether a subgraph has changed, but we won't emit events
// if the subgraph config can't be parsed to fail early for composition
//match parsed_config {
// Ok(_subgraph_config) => {
// let _ = sender
// .send(SubgraphChanged)
// .tap_err(|err| tracing::error!("{:?}", err));
// }
// Err(err) => {
// tracing::error!("Could not parse subgraph config file: {:?}", err);
// errln!("could not parse subgraph config file");
// }
//}
}
})
.abort_handle()
Expand Down
13 changes: 12 additions & 1 deletion src/options/introspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use crate::{
RoverOutput, RoverResult,
};

use super::OutputChannelKind;

#[derive(Debug, Serialize, Deserialize, Parser)]
pub struct IntrospectOpts {
/// The endpoint of the subgraph to introspect
Expand Down Expand Up @@ -48,8 +50,13 @@ impl IntrospectOpts {
}

if was_updated {
let output = RoverOutput::Introspection(sdl.to_string());
let sdl = sdl.to_string();
let output = RoverOutput::Introspection(sdl.clone());
let _ = output.write_or_print(output_opts).map_err(|e| e.print());
if let Some(channel) = &output_opts.channel {
// TODO: error handling
let _ = channel.send(OutputChannelKind::Sdl(sdl));
}
}
last_result = Some(sdl);
}
Expand All @@ -63,6 +70,10 @@ impl IntrospectOpts {
}
if was_updated {
let _ = error.write_or_print(output_opts).map_err(|e| e.print());
if let Some(channel) = &output_opts.channel {
// TODO: error handling
let _ = channel.send(OutputChannelKind::Sdl(e.clone()));
}
}
last_result = Some(e);
}
Expand Down
13 changes: 12 additions & 1 deletion src/options/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use serde::Serialize;
use serde_json::{json, Value};

use rover_std::{Fs, Style};
use tokio::sync::mpsc::UnboundedSender;

use crate::{cli::RoverOutputFormatKind, RoverError, RoverOutput, RoverResult};

Expand Down Expand Up @@ -77,7 +78,13 @@ impl RoverPrinter for RoverError {
}
}

#[derive(Debug, Parser, Serialize)]
/// The output expected by the channel used for OutputOpts
pub enum OutputChannelKind {
/// SDL as a String, often via introspection
Sdl(String),
}

#[derive(Debug, Parser, Serialize, Default)]
pub struct OutputOpts {
/// Specify Rover's format type
#[arg(long = "format", global = true, default_value_t)]
Expand All @@ -86,6 +93,10 @@ pub struct OutputOpts {
/// Specify a file to write Rover's output to
#[arg(long = "output", short = 'o', global = true, value_parser = Self::parse_absolute_path)]
pub output_file: Option<Utf8PathBuf>,

#[arg(skip)]
#[serde(skip_serializing)]
pub channel: Option<UnboundedSender<OutputChannelKind>>,
}

impl OutputOpts {
Expand Down

0 comments on commit 098e598

Please sign in to comment.