Skip to content

Commit

Permalink
Merge pull request #17 from DeterminateSystems/eelcodolstra/fh-218-cl…
Browse files Browse the repository at this point in the history
…ean-up-the-magic-nix-cache-priv-backend-to-be-published

Get rid of unwraps/expects
  • Loading branch information
edolstra committed Feb 26, 2024
2 parents 75b1450 + 1f46e11 commit 308fa51
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 79 deletions.
16 changes: 9 additions & 7 deletions magic-nix-cache/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ async fn workflow_finish(
Extension(state): Extension<State>,
) -> Result<Json<WorkflowFinishResponse>> {
tracing::info!("Workflow finished");

let original_paths = state.original_paths.lock().await;
let final_paths = get_store_paths(&state.store).await?;
let new_paths = final_paths
Expand All @@ -63,12 +62,15 @@ async fn workflow_finish(
upload_paths(new_paths.clone(), &store_uri).await?;
}

let sender = state.shutdown_sender.lock().await.take().unwrap();
sender.send(()).unwrap();
if let Some(sender) = state.shutdown_sender.lock().await.take() {
sender
.send(())
.map_err(|_| Error::Internal("Sending shutdown server message".to_owned()))?;

// Wait for the Attic push workers to finish.
if let Some(attic_state) = state.flakehub_state.write().await.take() {
attic_state.push_session.wait().await.unwrap();
// Wait for the Attic push workers to finish.
if let Some(attic_state) = state.flakehub_state.write().await.take() {
attic_state.push_session.wait().await?;
}
}

let reply = WorkflowFinishResponse {
Expand All @@ -93,7 +95,7 @@ fn make_store_uri(self_endpoint: &SocketAddr) -> String {
.authority(self_endpoint.to_string())
.path_and_query("/?compression=zstd&parallel-compression=true")
.build()
.unwrap()
.expect("Cannot construct URL to self")
.to_string()
}

Expand Down
6 changes: 6 additions & 0 deletions magic-nix-cache/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ pub enum Error {

#[error("Bad URL")]
BadUrl(reqwest::Url),

#[error("Configuration error: {0}")]
Config(String),

#[error("Internal error: {0}")]
Internal(String),
}

impl IntoResponse for Error {
Expand Down
35 changes: 23 additions & 12 deletions magic-nix-cache/src/flakehub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub async fn init_cache(
netrc_rs::Netrc::parse(netrc_contents, false).map_err(Error::Netrc)?
};

let netrc_entry = {
let flakehub_netrc_entry = {
netrc
.machines
.iter()
Expand All @@ -55,6 +55,20 @@ pub async fn init_cache(
.ok_or_else(|| Error::BadUrl(flakehub_cache_server.to_owned()))?
.to_string();

let flakehub_login = flakehub_netrc_entry.login.as_ref().ok_or_else(|| {
Error::Config(format!(
"netrc file does not contain a login for '{}'",
flakehub_api_server
))
})?;

let flakehub_password = flakehub_netrc_entry.password.as_ref().ok_or_else(|| {
Error::Config(format!(
"netrc file does not contain a password for '{}'",
flakehub_api_server
))
})?;

// Append an entry for the FlakeHub cache server to netrc.
if !netrc
.machines
Expand All @@ -69,9 +83,8 @@ pub async fn init_cache(
netrc_file
.write_all(
format!(
"\nmachine {} password {}\n\n",
flakehub_cache_server_hostname,
netrc_entry.password.as_ref().unwrap(),
"\nmachine {} login {} password {}\n\n",
flakehub_cache_server_hostname, flakehub_login, flakehub_password,
)
.as_bytes(),
)
Expand All @@ -80,20 +93,18 @@ pub async fn init_cache(

// Get the cache UUID for this project.
let cache_name = {
let github_repo = env::var("GITHUB_REPOSITORY")
.expect("GITHUB_REPOSITORY environment variable is not set");
let github_repo = env::var("GITHUB_REPOSITORY").map_err(|_| {
Error::Config("GITHUB_REPOSITORY environment variable is not set".to_owned())
})?;

let url = flakehub_api_server
.join(&format!("project/{}", github_repo))
.unwrap();
.map_err(|_| Error::Config(format!("bad URL '{}'", flakehub_api_server)))?;

let response = reqwest::Client::new()
.get(url.to_owned())
.header("User-Agent", USER_AGENT)
.basic_auth(
netrc_entry.login.as_ref().unwrap(),
netrc_entry.password.as_ref(),
)
.basic_auth(flakehub_login, Some(flakehub_password))
.send()
.await?;

Expand Down Expand Up @@ -124,7 +135,7 @@ pub async fn init_cache(

let api = ApiClient::from_server_config(ServerConfig {
endpoint: flakehub_cache_server.to_string(),
token: netrc_entry.password.as_ref().cloned(),
token: flakehub_netrc_entry.password.as_ref().cloned(),
})?;

let cache_config = api.get_cache_config(&cache).await?;
Expand Down
124 changes: 69 additions & 55 deletions magic-nix-cache/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;

use ::attic::nix_store::NixStore;
use anyhow::{anyhow, Context, Result};
use axum::{extract::Extension, routing::get, Router};
use clap::Parser;
use tempfile::NamedTempFile;
Expand Down Expand Up @@ -139,34 +140,35 @@ struct StateInner {
flakehub_state: RwLock<Option<flakehub::State>>,
}

async fn main_cli() {
async fn main_cli() -> Result<()> {
init_logging();

let args = Args::parse();

create_dir_all(Path::new(&args.nix_conf).parent().unwrap())
.expect("Creating parent directories of nix.conf");
if let Some(parent) = Path::new(&args.nix_conf).parent() {
create_dir_all(parent).with_context(|| "Creating parent directories of nix.conf")?;
}

let mut nix_conf = OpenOptions::new()
.create(true)
.append(true)
.open(args.nix_conf)
.expect("Opening nix.conf");
.with_context(|| "Creating nix.conf")?;

let store = Arc::new(NixStore::connect().expect("Connecting to the Nix store"));
let store = Arc::new(NixStore::connect()?);

let flakehub_state = if args.use_flakehub {
let flakehub_cache_server = args
.flakehub_cache_server
.expect("--flakehub-cache-server is required");
.ok_or_else(|| anyhow!("--flakehub-cache-server is required"))?;
let flakehub_api_server_netrc = args
.flakehub_api_server_netrc
.expect("--flakehub-api-server-netrc is required");
.ok_or_else(|| anyhow!("--flakehub-api-server-netrc is required"))?;

match flakehub::init_cache(
&args
.flakehub_api_server
.expect("--flakehub-api-server is required"),
.ok_or_else(|| anyhow!("--flakehub-api-server is required"))?,
&flakehub_api_server_netrc,
&flakehub_cache_server,
store.clone(),
Expand All @@ -183,7 +185,7 @@ async fn main_cli() {
)
.as_bytes(),
)
.expect("Writing to nix.conf");
.with_context(|| "Writing to nix.conf")?;

tracing::info!("FlakeHub cache is enabled.");
Some(state)
Expand All @@ -201,24 +203,35 @@ async fn main_cli() {
let api = if args.use_gha_cache {
let credentials = if let Some(credentials_file) = &args.credentials_file {
tracing::info!("Loading credentials from {:?}", credentials_file);
let bytes = fs::read(credentials_file).expect("Failed to read credentials file");

serde_json::from_slice(&bytes).expect("Failed to deserialize credentials file")
let bytes = fs::read(credentials_file).with_context(|| {
format!(
"Failed to read credentials file '{}'",
credentials_file.display()
)
})?;

serde_json::from_slice(&bytes).with_context(|| {
format!(
"Failed to deserialize credentials file '{}'",
credentials_file.display()
)
})?
} else {
tracing::info!("Loading credentials from environment");
Credentials::load_from_env()
.expect("Failed to load credentials from environment (see README.md)")
.with_context(|| "Failed to load credentials from environment (see README.md)")?
};

let mut api = Api::new(credentials).expect("Failed to initialize GitHub Actions Cache API");
let mut api = Api::new(credentials)
.with_context(|| "Failed to initialize GitHub Actions Cache API")?;

if let Some(cache_version) = &args.cache_version {
api.mutate_version(cache_version.as_bytes());
}

nix_conf
.write_all(format!("extra-substituters = http://{}?trusted=1&compression=zstd&parallel-compression=true&priority=1\n", args.listen).as_bytes())
.expect("Writing to nix.conf");
.with_context(|| "Writing to nix.conf")?;

tracing::info!("Native GitHub Action cache is enabled.");
Some(api)
Expand All @@ -231,24 +244,27 @@ async fn main_cli() {
* ignores errors, to avoid the Nix build from failing. */
let post_build_hook_script = {
let mut file = NamedTempFile::with_prefix("magic-nix-cache-build-hook-")
.expect("Creating a temporary file");
.with_context(|| "Creating a temporary file for the post-build hook")?;
file.write_all(
format!(
// NOTE(cole-h): We want to exit 0 even if the hook failed, otherwise it'll fail the
// build itself
"#! /bin/sh\nRUST_BACKTRACE=full {} --server {}\nexit 0\n",
std::env::current_exe()
.expect("Getting the path of magic-nix-cache")
.with_context(|| "Getting the path of magic-nix-cache")?
.display(),
args.listen
)
.as_bytes(),
)
.expect("Writing the post-build hook");
file.keep().unwrap().1
.with_context(|| "Writing the post-build hook")?;
file.keep()
.with_context(|| "Keeping the post-build hook")?
.1
};

fs::set_permissions(&post_build_hook_script, fs::Permissions::from_mode(0o755)).unwrap();
fs::set_permissions(&post_build_hook_script, fs::Permissions::from_mode(0o755))
.with_context(|| "Setting permissions on the post-build hook")?;

/* Update nix.conf. */
nix_conf
Expand All @@ -259,7 +275,7 @@ async fn main_cli() {
)
.as_bytes(),
)
.expect("Writing to nix.conf");
.with_context(|| "Writing to nix.conf")?;

drop(nix_conf);

Expand Down Expand Up @@ -309,15 +325,18 @@ async fn main_cli() {
match response {
Ok(response) => {
if !response.status().is_success() {
panic!(
Err(anyhow!(
"Startup notification returned an error: {}\n{}",
response.status(),
response.text().await.unwrap_or_else(|_| "".to_owned())
);
response
.text()
.await
.unwrap_or_else(|_| "<no response text>".to_owned())
))?;
}
}
Err(err) => {
panic!("Startup notification failed: {}", err);
err @ Err(_) => {
err.with_context(|| "Startup notification failed")?;
}
}
}
Expand All @@ -334,10 +353,12 @@ async fn main_cli() {
state.metrics.send(diagnostic_endpoint).await;
}

ret.unwrap()
ret?;

Ok(())
}

async fn post_build_hook(out_paths: &str) {
async fn post_build_hook(out_paths: &str) -> Result<()> {
#[derive(Parser, Debug)]
struct Args {
/// `magic-nix-cache` daemon to connect to.
Expand All @@ -357,43 +378,36 @@ async fn post_build_hook(out_paths: &str) {
let response = reqwest::Client::new()
.post(format!("http://{}/api/enqueue-paths", &args.server))
.header(reqwest::header::CONTENT_TYPE, "application/json")
.body(serde_json::to_string(&request).unwrap())
.body(
serde_json::to_string(&request)
.with_context(|| "Decoding the response from the magic-nix-cache server")?,
)
.send()
.await;

let mut err_message = None;
match response {
Ok(response) if !response.status().is_success() => {
err_message = Some(format!(
"magic-nix-cache server failed to enqueue the push request: {}\n{}",
response.status(),
response.text().await.unwrap_or_else(|_| "".to_owned()),
));
}
Ok(response) => {
let enqueue_paths_response = response.json::<api::EnqueuePathsResponse>().await;
if let Err(err) = enqueue_paths_response {
err_message = Some(format!(
"magic-nix-cache-server didn't return a valid response: {}",
err
))
}
}
Ok(response) if !response.status().is_success() => Err(anyhow!(
"magic-nix-cache server failed to enqueue the push request: {}\n{}",
response.status(),
response
.text()
.await
.unwrap_or_else(|_| "<no response text>".to_owned()),
))?,
Ok(response) => response
.json::<api::EnqueuePathsResponse>()
.await
.with_context(|| "magic-nix-cache-server didn't return a valid response")?,
Err(err) => {
err_message = Some(format!(
"magic-nix-cache server failed to send the enqueue request: {}",
err
));
Err(err).with_context(|| "magic-nix-cache server failed to send the enqueue request")?
}
}
};

if let Some(err_message) = err_message {
eprintln!("{err_message}");
}
Ok(())
}

#[tokio::main]
async fn main() {
async fn main() -> Result<()> {
match std::env::var("OUT_PATHS") {
Ok(out_paths) => post_build_hook(&out_paths).await,
Err(_) => main_cli().await,
Expand Down
Loading

0 comments on commit 308fa51

Please sign in to comment.