Skip to content

Commit

Permalink
cover path functions for rocksdb storage
Browse files Browse the repository at this point in the history
  • Loading branch information
luizirber committed Jul 25, 2024
1 parent 2af23f9 commit 600915a
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 18 deletions.
25 changes: 11 additions & 14 deletions src/core/src/index/revindex/disk_revindex.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::hash::{BuildHasher, BuildHasherDefault, Hash, Hasher};
use std::path::{Path, PathBuf};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

Expand Down Expand Up @@ -38,7 +38,6 @@ fn compute_color(idxs: &Datasets) -> Color {
pub struct RevIndex {
db: Arc<DB>,
collection: Arc<CollectionSet>,
path: PathBuf,
}

pub(crate) fn merge_datasets(
Expand Down Expand Up @@ -82,7 +81,6 @@ impl RevIndex {
let index = Self {
db,
collection: Arc::new(collection),
path: path.into(),
};

index.collection.par_iter().for_each(|(dataset_id, _)| {
Expand All @@ -108,7 +106,7 @@ impl RevIndex {
read_only: bool,
storage_spec: Option<&str>,
) -> Result<module::RevIndex> {
let mut opts = db_options();
let opts = db_options();

// prepare column family descriptors
let cfs = cf_descriptors();
Expand All @@ -129,11 +127,7 @@ impl RevIndex {
storage_spec,
)?);

Ok(module::RevIndex::Plain(Self {
db,
collection,
path: path.as_ref().into(),
}))
Ok(module::RevIndex::Plain(Self { db, collection }))
}

fn load_collection_from_rocksdb(
Expand Down Expand Up @@ -471,11 +465,14 @@ impl RevIndexOps for RevIndex {
let new_storage = RocksDBStorage::from_db(self.db.clone());

// use manifest to copy from current storage to new one
self.collection().par_iter().for_each(|(_, record)| {
let path = record.internal_location().as_str();
let sig_data = self.collection.storage().load(path).unwrap();
new_storage.save(path, &sig_data);
});
self.collection()
.par_iter()
.try_for_each(|(_, record)| -> Result<()> {
let path = record.internal_location().as_str();
let sig_data = self.collection.storage().load(path).unwrap();
new_storage.save(path, &sig_data)?;
Ok(())
})?;

// Replace storage for collection.
// Using unchecked version because we just used the manifest
Expand Down
74 changes: 73 additions & 1 deletion src/core/src/index/revindex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ mod test {
use crate::collection::Collection;
use crate::prelude::*;
use crate::selection::Selection;
use crate::storage::{InnerStorage, RocksDBStorage};
use crate::Result;

use super::{prepare_query, RevIndex, RevIndexOps};
Expand Down Expand Up @@ -931,7 +932,7 @@ mod test {
assert_eq!(matches_external, matches_internal);
}
let new_path = outdir.path().join("new_index_path");
std::fs::rename(output.as_path(), new_path.as_path());
std::fs::rename(output.as_path(), new_path.as_path())?;

let index = RevIndex::open(new_path, false, None)?;

Expand All @@ -949,4 +950,75 @@ mod test {

Ok(())
}

#[test]
fn rocksdb_storage_from_path() -> Result<()> {
let basedir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));

let mut zip_collection = basedir.clone();
zip_collection.push("../../tests/test-data/track_abund/track_abund.zip");

let outdir = TempDir::new()?;

let zip_copy = PathBuf::from(
outdir
.path()
.join("sigs.zip")
.into_os_string()
.into_string()
.unwrap(),
);
std::fs::copy(zip_collection, zip_copy.as_path())?;

let selection = Selection::builder().ksize(31).scaled(10000).build();
let collection = Collection::from_zipfile(zip_copy.as_path())?.select(&selection)?;
let output = outdir.path().join("index");

// Step 1: create an index
let index = RevIndex::create(output.as_path(), collection.try_into()?, false)?;

// Step 2: internalize the storage for the index
{
let mut index = index;
index
.internalize_storage()
.expect("Error internalizing storage");
}

// Step 3: load rocksdb storage from path
// should have the same content as zipfile

// Iter thru collection, make sure all records are present
let collection = Collection::from_zipfile(zip_copy.as_path())?.select(&selection)?;
assert_eq!(collection.len(), 2);
let col_storage = collection.storage();

let spec;
{
let rdb_storage = RocksDBStorage::from_path(output.as_os_str().to_str().unwrap());
spec = rdb_storage.spec();
collection.iter().for_each(|(_, r)| {
assert_eq!(
rdb_storage.load(r.internal_location().as_str()).unwrap(),
col_storage.load(r.internal_location().as_str()).unwrap()
);
});
}

// Step 4: verify rocksdb storage spec
assert_eq!(
spec,
format!("rocksdb://{}", output.as_os_str().to_str().unwrap())
);

let storage = InnerStorage::from_spec(spec)?;
collection.iter().for_each(|(_, r)| {
assert_eq!(
storage.load(r.internal_location().as_str()).unwrap(),
col_storage.load(r.internal_location().as_str()).unwrap()
);
});

Ok(())
}
}
4 changes: 1 addition & 3 deletions src/core/src/storage/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ impl RocksDBStorage {

let db = Arc::new(DB::open_cf_descriptors(&opts, path, cfs).unwrap());

// TODO: save storage_args

Self { db }
}

Expand Down Expand Up @@ -62,7 +60,7 @@ impl Storage for RocksDBStorage {
}

fn spec(&self) -> String {
"rocksdb://".into()
format!("rocksdb://{}", self.db.path().display())
}
}

Expand Down

0 comments on commit 600915a

Please sign in to comment.