Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: RocksDB storage and self-contained RevIndex with internal storage #3250

Merged
merged 17 commits into from
Jul 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 6 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
cargo-deny
cargo-wasi
cargo-codspeed
#cargo-semver-checks
cargo-semver-checks
nixpkgs-fmt
];

Expand Down
18 changes: 18 additions & 0 deletions src/core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [unreleased]

## [0.15.0] - 2024-07-27

MSRV: 1.65

Changes/additions:

* RocksDB storage and self-contained RevIndex with internal storage #3250
* Enable codspeed for Rust perf tracking (#3231)

Updates

* Bump roaring from 0.10.5 to 0.10.6 (#3245)
* Bump serde from 1.0.203 to 1.0.204 (#3244)
* Bump counter from 0.5.7 to 0.6.0 (#3235)
* Bump log from 0.4.21 to 0.4.22 (#3236)
* Bump serde_json from 1.0.117 to 1.0.120 (#3234)
* Bump proptest from 1.4.0 to 1.5.0 (#3222)

## [0.14.1] - 2024-06-19

MSRV: 1.65
Expand Down
13 changes: 7 additions & 6 deletions src/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sourmash"
version = "0.14.1"
version = "0.15.0"
authors = ["Luiz Irber <luiz@sourmash.bio>", "N. Tessa Pierce-Ward <tessa@sourmash.bio>"]
description = "tools for comparing biological sequences with k-mer sketches"
repository = "https://github.com/sourmash-bio/sourmash"
Expand All @@ -19,10 +19,11 @@ crate-type = ["lib", "staticlib", "cdylib"]
bench = false

[features]
from-finch = ["finch"]
parallel = ["rayon"]
from-finch = ["dep:finch"]
parallel = ["dep:rayon"]
maturin = []
branchwater = ["rocksdb", "rkyv", "parallel"]
branchwater = ["dep:rocksdb", "parallel"]
rkyv = ["dep:rkyv"]
default = []

[dependencies]
Expand All @@ -35,7 +36,6 @@ csv = "1.3.0"
enum_dispatch = "0.3.13"
finch = { version = "0.6.0", optional = true }
fixedbitset = "0.4.0"
getrandom = { version = "0.2", features = ["js"] }
getset = "0.1.1"
histogram = "0.11.0"
itertools = "0.13.0"
Expand Down Expand Up @@ -98,7 +98,8 @@ skip_feature_sets = [
[target.'cfg(all(target_arch = "wasm32", target_os="unknown"))'.dependencies]
js-sys = "0.3.68"
web-sys = { version = "0.3.69", features = ["console", "File", "FileReaderSync"] }
wasm-bindgen = { version = "0.2.89", features = ["serde-serialize"] }
wasm-bindgen = "0.2.89"
getrandom = { version = "0.2", features = ["js"] }

[target.'cfg(all(target_arch = "wasm32"))'.dependencies]
chrono = { version = "0.4.32", features = ["wasmbind"] }
Expand Down
11 changes: 11 additions & 0 deletions src/core/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@
pub fn selection(&self) -> Selection {
todo!("Extract selection from first sig")
}

/// Replace the storage with a new one.
///
/// # Safety
///
/// This method doesn't check if the manifest matches what is in the
/// storage (which can be expensive). It is up to the caller to
/// guarantee the manifest and storage are in sync.
pub unsafe fn set_storage_unchecked(&mut self, storage: InnerStorage) {

Check warning on line 75 in src/core/src/collection.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/collection.rs#L75

Added line #L75 was not covered by tests
self.storage = storage;
}
}

impl Collection {
Expand Down
91 changes: 50 additions & 41 deletions src/core/src/index/revindex/disk_revindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,23 @@
use byteorder::{LittleEndian, WriteBytesExt};
use log::{info, trace};
use rayon::prelude::*;
use rocksdb::{ColumnFamilyDescriptor, MergeOperands, Options};
use rocksdb::MergeOperands;

use crate::collection::{Collection, CollectionSet};
use crate::encodings::{Color, Idx};
use crate::index::revindex::{
self as module, stats_for_cf, Datasets, DbStats, HashToColor, QueryColors, RevIndexOps, DB,
HASHES, MANIFEST, METADATA, STORAGE_SPEC, VERSION,
self as module, stats_for_cf, Datasets, DbStats, HashToColor, QueryColors, RevIndexOps,
MANIFEST, STORAGE_SPEC, VERSION,
};
use crate::index::{calculate_gather_stats, GatherResult, SigCounter};
use crate::manifest::Manifest;
use crate::prelude::*;
use crate::sketch::minhash::{KmerMinHash, KmerMinHashBTree};
use crate::sketch::Sketch;
use crate::storage::{InnerStorage, Storage};
use crate::storage::{
rocksdb::{cf_descriptors, db_options, DB, HASHES, METADATA},
InnerStorage, RocksDBStorage, Storage,
};
use crate::Result;

const DB_VERSION: u8 = 1;
Expand All @@ -37,7 +40,7 @@
collection: Arc<CollectionSet>,
}

fn merge_datasets(
pub(crate) fn merge_datasets(
_: &[u8],
existing_val: Option<&[u8]>,
operands: &MergeOperands,
Expand All @@ -64,10 +67,9 @@

impl RevIndex {
pub fn create(path: &Path, collection: CollectionSet) -> Result<module::RevIndex> {
let mut opts = module::RevIndex::db_options();
let mut opts = db_options();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.prepare_for_bulk_load();

// prepare column family descriptors
let cfs = cf_descriptors();
Expand Down Expand Up @@ -104,10 +106,7 @@
read_only: bool,
storage_spec: Option<&str>,
) -> Result<module::RevIndex> {
let mut opts = module::RevIndex::db_options();
if !read_only {
opts.prepare_for_bulk_load();
}
let opts = db_options();

// prepare column family descriptors
let cfs = cf_descriptors();
Expand Down Expand Up @@ -152,7 +151,7 @@
};

let storage = if spec == "rocksdb://" {
todo!("init storage from db")
InnerStorage::new(RocksDBStorage::from_db(db.clone()))
} else {
InnerStorage::from_spec(spec)?
};
Expand Down Expand Up @@ -455,6 +454,45 @@
Ok(())
}

fn collection(&self) -> &CollectionSet {
&self.collection
}

fn internalize_storage(&mut self) -> Result<()> {
// check if collection is already internal, if so return
if self.collection.storage().spec() == "rocksdb://" {
return Ok(());

Check warning on line 464 in src/core/src/index/revindex/disk_revindex.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/revindex/disk_revindex.rs#L464

Added line #L464 was not covered by tests
}

// build new rocksdb storage from db
let new_storage = RocksDBStorage::from_db(self.db.clone());

// use manifest to copy from current storage to new one
self.collection()
.par_iter()
.try_for_each(|(_, record)| -> Result<()> {
let path = record.internal_location().as_str();
let sig_data = self.collection.storage().load(path).unwrap();
new_storage.save(path, &sig_data)?;
Ok(())
})?;

// Replace storage for collection.
// Using unchecked version because we just used the manifest
// above to make sure the storage is still consistent
unsafe {
Arc::get_mut(&mut self.collection)
.map(|v| v.set_storage_unchecked(InnerStorage::new(new_storage)));
}

// write storage spec
let cf_metadata = self.db.cf_handle(METADATA).unwrap();
let spec = "rocksdb://";
self.db.put_cf(&cf_metadata, STORAGE_SPEC, spec)?;

Ok(())
}

fn convert(&self, _output_db: module::RevIndex) -> Result<()> {
todo!()
/*
Expand Down Expand Up @@ -497,32 +535,3 @@
*/
}
}

fn cf_descriptors() -> Vec<ColumnFamilyDescriptor> {
let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
cfopts.set_merge_operator_associative("datasets operator", merge_datasets);
cfopts.set_min_write_buffer_number_to_merge(10);

// Updated default from
// https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options
cfopts.set_level_compaction_dynamic_level_bytes(true);

let cf_hashes = ColumnFamilyDescriptor::new(HASHES, cfopts);

let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);
//cfopts.set_merge_operator_associative("colors operator", merge_colors);

let cf_metadata = ColumnFamilyDescriptor::new(METADATA, cfopts);

let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);
//cfopts.set_merge_operator_associative("colors operator", merge_colors);

vec![cf_hashes, cf_metadata]
}
Loading
Loading