Skip to content

Commit

Permalink
refactor(merge_pipe): helper macro
Browse files Browse the repository at this point in the history
  • Loading branch information
PhotonQuantum committed Dec 4, 2022
1 parent 35fca1f commit 5279e69
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 63 deletions.
29 changes: 8 additions & 21 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod gradle;
mod homebrew;
mod html_scanner;
mod index_pipe;
#[macro_use]
mod merge_pipe;
mod metadata;
mod opts;
Expand Down Expand Up @@ -289,27 +290,13 @@ fn main() {
true,
);

let unified = merge_pipe::MergePipe::new(
packages_src,
merge_pipe::MergePipe::new(
hls_src,
merge_pipe::MergePipe::new(
stack_src,
merge_pipe::MergePipe::new(
yaml_src,
script_src,
String::from("yaml"),
Some(String::from("script")),
),
String::from("stack"),
None,
),
String::from("hls"),
None,
),
String::from("packages"),
None,
);
let unified = merge_pipe! {
packages: packages_src,
hls: hls_src,
stack: stack_src,
yaml: yaml_src,
script: script_src,
};

let indexed = index_pipe::IndexPipe::new(
unified,
Expand Down
87 changes: 45 additions & 42 deletions src/merge_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
//! In such case, several different sources of distinct base urls can be
//! implemented, and they should be unified by `MergePipe` later.
//!
//! You may use `MergePipe` recursively if you have more than two sources
//! to merge by setting the second prefix as `None`.
//! # Example
//! ```ignore
//! merge_pipe! {
//! metadata: MetaSource,
//! pkg: PackageSource,
//! }

use async_trait::async_trait;
use slog::info;
Expand All @@ -15,33 +19,30 @@ use crate::common::{Mission, SnapshotConfig};
use crate::error::{Error, Result};
use crate::traits::{Key, SnapshotStorage, SourceStorage};

/// Generate MergePipe from a list of sources.
macro_rules! merge_pipe {
($name:ident: $source:expr, $($tt: tt)+) => {
crate::merge_pipe::MergePipe::new(stringify!($name), $source, merge_pipe!($($tt)+))
};
($name:ident: $source:expr $(,)?) => {
crate::merge_pipe::MergePipe::new(stringify!($name), $source, crate::merge_pipe::NilPipe)
};
}

pub struct MergePipe<Source1, Source2> {
prefix: String,
s1: Source1,
s2: Source2,
s1_prefix: String,
s2_prefix: Option<String>,
}

impl<Source1, Source2> MergePipe<Source1, Source2> {
pub fn new(s1: Source1, s2: Source2, s1_prefix: String, s2_prefix: Option<String>) -> Self {
let s1_prefix = if s1_prefix.ends_with('/') {
s1_prefix
pub fn new(prefix: &str, s1: Source1, s2: Source2) -> Self {
let prefix = if prefix.ends_with('/') {
prefix.to_string()
} else {
format!("{}/", s1_prefix)
format!("{}/", prefix)
};
let s2_prefix = s2_prefix.map(|prefix| {
if prefix.ends_with('/') {
prefix
} else {
format!("{}/", prefix)
}
});
Self {
s1,
s2,
s1_prefix,
s2_prefix,
}
Self { prefix, s1, s2 }
}
}

Expand All @@ -59,23 +60,13 @@ where
) -> Result<Vec<SnapshotItem>> {
let logger = mission.logger.clone();

info!(logger, "merge_pipe: snapshotting {}", self.s1_prefix);
info!(logger, "merge_pipe: snapshotting {}", self.prefix);
let mut snapshot1 = self.s1.snapshot(mission.clone(), config).await?;
snapshot1.iter_mut().for_each(|item| {
*item.key_mut() = format!("{}{}", self.s1_prefix, item.key());
*item.key_mut() = format!("{}{}", self.prefix, item.key());
});

if let Some(prefix) = &self.s2_prefix {
info!(logger, "merge_pipe: snapshotting {}", prefix);
} else {
info!(logger, "merge_pipe: snapshotting remaining source(s)");
}
let mut snapshot2 = self.s2.snapshot(mission.clone(), config).await?;
if let Some(prefix) = &self.s2_prefix {
snapshot2.iter_mut().for_each(|item| {
*item.key_mut() = format!("{}{}", prefix, item.key());
});
}

snapshot1.append(&mut snapshot2);

Expand All @@ -99,20 +90,32 @@ where
async fn get_object(&self, snapshot: &SnapshotItem, mission: &Mission) -> Result<Source> {
let path = snapshot.key();

if let Some(key) = path.strip_prefix(&self.s1_prefix) {
if let Some(key) = path.strip_prefix(&self.prefix) {
let mut snapshot = snapshot.clone();
*snapshot.key_mut() = String::from(key);
self.s1.get_object(&snapshot, mission).await
} else if let Some(prefix) = &self.s2_prefix {
if let Some(key) = path.strip_prefix(prefix) {
let mut snapshot = snapshot.clone();
*snapshot.key_mut() = String::from(key);
self.s2.get_object(&snapshot, mission).await
} else {
Err(Error::PipeError(String::from("unexpected prefix")))
}
} else {
self.s2.get_object(snapshot, mission).await
}
}
}

pub struct NilPipe;

#[async_trait]
impl<T> SnapshotStorage<T> for NilPipe {
async fn snapshot(&mut self, _: Mission, _: &SnapshotConfig) -> Result<Vec<T>> {
Ok(vec![])
}

fn info(&self) -> String {
String::from("nil")
}
}

#[async_trait]
impl<T: Sync, U> SourceStorage<T, U> for NilPipe {
async fn get_object(&self, _: &T, _: &Mission) -> Result<U> {
Err(Error::PipeError(String::from("unexpected prefix")))
}
}

0 comments on commit 5279e69

Please sign in to comment.