Skip to content

Commit

Permalink
refactor: Add query resolvers into web UI (#9182)
Browse files Browse the repository at this point in the history
### Description

Adds the query resolvers such as `packages` and `file` into the web UI
GraphQL server. This will allow us to display repo general info in
studio.

You can review this commit by commit

### Testing Instructions

<!--
  Give a quick description of steps to test your changes.
-->
  • Loading branch information
NicholasLYang committed Sep 24, 2024
1 parent e4f73a0 commit 6d4e655
Show file tree
Hide file tree
Showing 14 changed files with 115 additions and 97 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/turborepo-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ tokio-stream = { version = "0.1.12", features = ["net"] }
tokio-util = { version = "0.7.7", features = ["compat"] }
tonic = { version = "0.11.0", features = ["transport"] }
tower = "0.4.13"
tower-http = { version = "0.5.2", features = ["cors"] }
tracing-appender = "0.2.2"
tracing-chrome = "0.7.1"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
Expand Down
10 changes: 7 additions & 3 deletions crates/turborepo-lib/src/commands/query.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fs;
use std::{fs, sync::Arc};

use async_graphql::{EmptyMutation, EmptySubscription, Schema, ServerError};
use miette::{Diagnostic, Report, SourceSpan};
Expand All @@ -10,7 +10,7 @@ use crate::{
cli::Command,
commands::{run::get_signal, CommandBase},
query,
query::{Error, Query},
query::{Error, RepositoryQuery},
run::builder::RunBuilder,
signal::SignalHandler,
};
Expand Down Expand Up @@ -84,7 +84,11 @@ pub async fn run(
fs::read_to_string(AbsoluteSystemPathBuf::from_unknown(run.repo_root(), query))?
};

let schema = Schema::new(Query::new(run), EmptyMutation, EmptySubscription);
let schema = Schema::new(
RepositoryQuery::new(Arc::new(run)),
EmptyMutation,
EmptySubscription,
);

let result = schema.execute(&query).await;
if result.errors.is_empty() {
Expand Down
12 changes: 7 additions & 5 deletions crates/turborepo-lib/src/commands/run.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::future::Future;
use std::{future::Future, sync::Arc};

use tracing::error;
use turborepo_telemetry::events::command::CommandEventBuilder;
Expand Down Expand Up @@ -40,10 +40,12 @@ pub async fn run(base: CommandBase, telemetry: CommandEventBuilder) -> Result<i3

let run_fut = async {
let (analytics_sender, analytics_handle) = run_builder.start_analytics();
let mut run = run_builder
.with_analytics_sender(analytics_sender)
.build(&handler, telemetry)
.await?;
let run = Arc::new(
run_builder
.with_analytics_sender(analytics_sender)
.build(&handler, telemetry)
.await?,
);

let (sender, handle) = run.start_ui()?.unzip();

Expand Down
18 changes: 11 additions & 7 deletions crates/turborepo-lib/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ pub enum Error {
Path(#[from] turbopath::PathError),
}

pub struct Query {
pub struct RepositoryQuery {
run: Arc<Run>,
}

impl Query {
pub fn new(run: Run) -> Self {
Self { run: Arc::new(run) }
impl RepositoryQuery {
pub fn new(run: Arc<Run>) -> Self {
Self { run }
}
}

Expand Down Expand Up @@ -267,7 +267,7 @@ impl PackagePredicate {
}

#[Object]
impl Query {
impl RepositoryQuery {
async fn affected_packages(
&self,
base: Option<String>,
Expand Down Expand Up @@ -339,12 +339,16 @@ impl Query {
}
}

async fn graphiql() -> impl IntoResponse {
pub async fn graphiql() -> impl IntoResponse {
response::Html(GraphiQLSource::build().endpoint("/").finish())
}

pub async fn run_server(run: Run, signal: SignalHandler) -> Result<(), Error> {
let schema = Schema::new(Query::new(run), EmptyMutation, EmptySubscription);
let schema = Schema::new(
RepositoryQuery::new(Arc::new(run)),
EmptyMutation,
EmptySubscription,
);
let app = Router::new().route("/", get(graphiql).post_service(GraphQL::new(schema)));

let subscriber = signal.subscribe().ok_or(Error::NoSignalHandler)?;
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-lib/src/query/package_graph.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

11 changes: 6 additions & 5 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod scope;
pub(crate) mod summary;
pub mod task_access;
pub mod task_id;
mod ui;
pub mod watch;

use std::{
Expand Down Expand Up @@ -211,7 +212,7 @@ impl Run {
&& tui::terminal_big_enough()?)
}

pub fn start_ui(&self) -> UIResult<UISender> {
pub fn start_ui(self: &Arc<Self>) -> UIResult<UISender> {
// Print prelude here as this needs to happen before the UI is started
if self.should_print_prelude {
self.print_run_prelude();
Expand All @@ -227,10 +228,10 @@ impl Run {
.map(|res| res.map(|(sender, handle)| (UISender::Wui(sender), handle))),
}
}
fn start_web_ui(&self) -> WuiResult {
fn start_web_ui(self: &Arc<Self>) -> WuiResult {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

let handle = tokio::spawn(turborepo_ui::wui::server::start_server(rx));
let handle = tokio::spawn(ui::start_web_ui_server(rx, self.clone()));

Ok(Some((WebUISender { tx }, handle)))
}
Expand Down Expand Up @@ -260,7 +261,7 @@ impl Run {
}
}

pub async fn run(&mut self, ui_sender: Option<UISender>, is_watch: bool) -> Result<i32, Error> {
pub async fn run(&self, ui_sender: Option<UISender>, is_watch: bool) -> Result<i32, Error> {
let skip_cache_writes = self.opts.runcache_opts.skip_writes;
if let Some(subscriber) = self.signal_handler.subscribe() {
let run_cache = self.run_cache.clone();
Expand Down Expand Up @@ -356,7 +357,7 @@ impl Run {
self.engine.task_definitions(),
&self.repo_root,
&self.run_telemetry,
&mut self.daemon,
&self.daemon,
)?;

let root_workspace = self
Expand Down
55 changes: 55 additions & 0 deletions crates/turborepo-lib/src/run/ui.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::sync::Arc;

use async_graphql::{EmptyMutation, EmptySubscription, MergedObject, Schema};
use async_graphql_axum::GraphQL;
use axum::{http::Method, routing::get, Router};
use tokio::net::TcpListener;
use tower_http::cors::{Any, CorsLayer};
use turborepo_ui::wui::{event::WebUIEvent, server::SharedState};

use crate::{query, query::graphiql, run::Run};

pub async fn start_web_ui_server(
rx: tokio::sync::mpsc::UnboundedReceiver<WebUIEvent>,
run: Arc<Run>,
) -> Result<(), turborepo_ui::Error> {
let state = SharedState::default();
let subscriber = turborepo_ui::wui::subscriber::Subscriber::new(rx);
tokio::spawn(subscriber.watch(state.clone()));

run_server(state.clone(), run).await?;

Ok(())
}

#[derive(MergedObject)]
struct Query(turborepo_ui::wui::RunQuery, query::RepositoryQuery);

async fn run_server(state: SharedState, run: Arc<Run>) -> Result<(), turborepo_ui::Error> {
let cors = CorsLayer::new()
// allow `GET` and `POST` when accessing the resource
.allow_methods([Method::GET, Method::POST])
.allow_headers(Any)
// allow requests from any origin
.allow_origin(Any);

let web_ui_query = turborepo_ui::wui::RunQuery::new(state.clone());
let turbo_query = query::RepositoryQuery::new(run);
let combined_query = Query(web_ui_query, turbo_query);

let schema = Schema::new(combined_query, EmptyMutation, EmptySubscription);
let app = Router::new()
.route("/", get(graphiql).post_service(GraphQL::new(schema)))
.layer(cors);

axum::serve(
TcpListener::bind("127.0.0.1:8000")
.await
.map_err(turborepo_ui::wui::Error::Server)?,
app,
)
.await
.map_err(turborepo_ui::wui::Error::Server)?;

Ok(())
}
19 changes: 11 additions & 8 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl ChangedPackages {
}

pub struct WatchClient {
run: Run,
run: Arc<Run>,
watched_packages: HashSet<PackageName>,
persistent_tasks_handle: Option<PersistentRunHandle>,
connector: DaemonConnector,
Expand Down Expand Up @@ -130,9 +130,11 @@ impl WatchClient {
execution_args: execution_args.clone(),
});

let run = RunBuilder::new(new_base)?
.build(&handler, telemetry.clone())
.await?;
let run = Arc::new(
RunBuilder::new(new_base)?
.build(&handler, telemetry.clone())
.await?,
);

let watched_packages = run.get_relevant_packages();

Expand Down Expand Up @@ -288,7 +290,7 @@ impl WatchClient {
let signal_handler = self.handler.clone();
let telemetry = self.telemetry.clone();

let mut run = RunBuilder::new(new_base)?
let run = RunBuilder::new(new_base)?
.with_entrypoint_packages(packages)
.hide_prelude()
.build(&signal_handler, telemetry)
Expand Down Expand Up @@ -331,7 +333,8 @@ impl WatchClient {
self.run = RunBuilder::new(base.clone())?
.hide_prelude()
.build(&self.handler, self.telemetry.clone())
.await?;
.await?
.into();

self.watched_packages = self.run.get_relevant_packages();

Expand All @@ -357,7 +360,7 @@ impl WatchClient {
self.persistent_tasks_handle.is_none(),
"persistent handle should be empty before creating a new one"
);
let mut persistent_run = self.run.create_run_for_persistent_tasks();
let persistent_run = self.run.create_run_for_persistent_tasks();
let ui_sender = self.ui_sender.clone();
// If we have persistent tasks, we run them on a separate thread
// since persistent tasks don't finish
Expand All @@ -369,7 +372,7 @@ impl WatchClient {
});

// But we still run the regular tasks blocking
let mut non_persistent_run = self.run.create_run_without_persistent_tasks();
let non_persistent_run = self.run.create_run_without_persistent_tasks();
Ok(non_persistent_run.run(self.ui_sender.clone(), true).await?)
} else {
Ok(self.run.run(self.ui_sender.clone(), true).await?)
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/task_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl PackageInputsHashes {
task_definitions: &HashMap<TaskId<'static>, TaskDefinition>,
repo_root: &AbsoluteSystemPath,
telemetry: &GenericEventBuilder,
daemon: &mut Option<DaemonClient<DaemonConnector>>,
daemon: &Option<DaemonClient<DaemonConnector>>,
) -> Result<PackageInputsHashes, Error> {
tracing::trace!(scm_manual=%scm.is_manual(), "scm running in {} mode", if scm.is_manual() { "manual" } else { "git" });

Expand Down
1 change: 0 additions & 1 deletion crates/turborepo-ui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }

tower-http = { version = "0.5.2", features = ["cors"] }
tracing = { workspace = true }
tui-term = { workspace = true }
turbopath = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions crates/turborepo-ui/src/wui/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Web UI for Turborepo. Creates a WebSocket server that can be subscribed to
//! by a web client to display the status of tasks.

mod event;
pub mod event;
pub mod sender;
pub mod server;
mod subscriber;
pub mod subscriber;

use event::WebUIEvent;
pub use server::RunQuery;
use thiserror::Error;

#[derive(Debug, Error)]
Expand Down
Loading

0 comments on commit 6d4e655

Please sign in to comment.