Skip to content

Commit

Permalink
feat: introduce small object disk cache (#658)
Browse files Browse the repository at this point in the history
* stash, todo: rebase me

* fix: fix sodc checksum and test

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* feat: make store builder support mixed disk cache engine

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* feat: impl the new builder, update bench and examples

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix: make clippy happy

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix: fix engine clap

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* refactor: update eviction picker API

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix: fix small batch copy

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix: upgrade deps

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix: fix set panic on invalid len

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* feat: impl better bloom filter

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* feat: add options for foyer-bench to control small engine

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* refactor: rename a const

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* test: try add test for small batch

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* chore: make ffmt happy

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* refactor: refine the small engine to fit the new design

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* chore: make ffmt happy

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* refactor: use 12B entry header for small entry

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix: fix stats for small object disk cache

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* test: add ci for mixed engine

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* chore: fix ci

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

---------

Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx authored Sep 26, 2024
1 parent b6524e2 commit 656055a
Show file tree
Hide file tree
Showing 32 changed files with 2,544 additions and 557 deletions.
16 changes: 12 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ jobs:
CI: true
run: |
mkdir -p $GITHUB_WORKSPACE/foyer-data/foyer-bench/codecov
cargo llvm-cov --no-report run --package foyer-bench --bin foyer-bench --features "strict_assertions,sanity" -- --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/codecov --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --time 60
cargo llvm-cov --no-report run --package foyer-bench --bin foyer-bench --features "strict_assertions,sanity" -- --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/codecov --engine large --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 2KiB --entry-size-max 128KiB --time 60
cargo llvm-cov --no-report run --package foyer-bench --bin foyer-bench --features "strict_assertions,sanity" -- --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/codecov --engine small --mem 4MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 1MiB --entry-size-min 1KiB --entry-size-max 24KiB --time 60
cargo llvm-cov --no-report run --package foyer-bench --bin foyer-bench --features "strict_assertions,sanity" -- --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/codecov --engine mixed=0.1 --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 1KiB --entry-size-max 128KiB --time 60
- name: Generate codecov report
run: |
cargo llvm-cov report --lcov --output-path lcov.info
Expand Down Expand Up @@ -240,7 +242,9 @@ jobs:
run: |-
cargo build --all --features deadlock
mkdir -p $GITHUB_WORKSPACE/foyer-data/foyer-storage/deadlock
timeout 2m ./target/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/deadlock --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --time 60
timeout 2m ./target/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/deadlock --engine large --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 2KiB --entry-size-max 128KiB --time 60
timeout 2m ./target/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/deadlock --engine small --mem 4MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 1MiB --entry-size-min 1KiB --entry-size-max 24KiB --time 60
timeout 2m ./target/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/deadlock --engine mixed=0.1 --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 1KiB --entry-size-max 128KiB --time 60
asan:
name: run with address saniziter
runs-on: ubuntu-latest
Expand Down Expand Up @@ -278,7 +282,9 @@ jobs:
run: |-
cargo +${{ env.RUST_TOOLCHAIN_NIGHTLY }} build --all --target x86_64-unknown-linux-gnu
mkdir -p $GITHUB_WORKSPACE/foyer-data/foyer-bench/asan
timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/asan --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --time 60
timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/asan --engine large --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 2KiB --entry-size-max 128KiB --time 60
timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/asan --engine small --mem 4MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 1MiB --entry-size-min 1KiB --entry-size-max 24KiB --time 60
timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/asan --engine mixed=0.1 --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 1KiB --entry-size-max 128KiB --time 60
- name: Prepare Artifacts on Failure
if: ${{ failure() }}
run: |-
Expand Down Expand Up @@ -326,7 +332,9 @@ jobs:
run: |-
cargo +${{ env.RUST_TOOLCHAIN_NIGHTLY }} build --all --target x86_64-unknown-linux-gnu
mkdir -p $GITHUB_WORKSPACE/foyer-data/foyer-bench/lsan
timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/lsan --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --time 60
timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/lsan --engine large --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 2KiB --entry-size-max 128KiB --time 60
timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/lsan --engine small --mem 4MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 1MiB --entry-size-min 1KiB --entry-size-max 24KiB --time 60
timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/lsan --engine mixed=0.1 --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 1KiB --entry-size-max 128KiB --time 60
- name: Prepare Artifacts on Failure
if: ${{ failure() }}
run: |-
Expand Down
4 changes: 2 additions & 2 deletions examples/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use foyer::{DirectFsDeviceOptionsBuilder, HybridCache, HybridCacheBuilder};
use foyer::{DirectFsDeviceOptionsBuilder, Engine, HybridCache, HybridCacheBuilder};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dir = tempfile::tempdir()?;

let hybrid: HybridCache<u64, String> = HybridCacheBuilder::new()
.memory(64 * 1024 * 1024)
.storage()
.storage(Engine::Large) // use large object disk cache engine only
.with_device_config(
DirectFsDeviceOptionsBuilder::new(dir.path())
.with_capacity(256 * 1024 * 1024)
Expand Down
41 changes: 25 additions & 16 deletions examples/hybrid_full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::sync::Arc;
use anyhow::Result;
use chrono::Datelike;
use foyer::{
DirectFsDeviceOptionsBuilder, FifoPicker, HybridCache, HybridCacheBuilder, LruConfig, RateLimitPicker, RecoverMode,
RuntimeConfig, TokioRuntimeConfig, TombstoneLogConfigBuilder,
DirectFsDeviceOptionsBuilder, Engine, FifoPicker, HybridCache, HybridCacheBuilder, LargeEngineOptions, LruConfig,
RateLimitPicker, RecoverMode, RuntimeConfig, SmallEngineOptions, TokioRuntimeConfig, TombstoneLogConfigBuilder,
};
use tempfile::tempdir;

Expand All @@ -35,30 +35,17 @@ async fn main() -> Result<()> {
.with_object_pool_capacity(1024)
.with_hash_builder(ahash::RandomState::default())
.with_weighter(|_key, value: &String| value.len())
.storage()
.storage(Engine::Mixed(0.1))
.with_device_config(
DirectFsDeviceOptionsBuilder::new(dir.path())
.with_capacity(64 * 1024 * 1024)
.with_file_size(4 * 1024 * 1024)
.build(),
)
.with_flush(true)
.with_indexer_shards(64)
.with_recover_mode(RecoverMode::Quiet)
.with_recover_concurrency(8)
.with_flushers(2)
.with_reclaimers(2)
.with_buffer_pool_size(256 * 1024 * 1024)
.with_clean_region_threshold(4)
.with_eviction_pickers(vec![Box::<FifoPicker>::default()])
.with_admission_picker(Arc::new(RateLimitPicker::new(100 * 1024 * 1024)))
.with_reinsertion_picker(Arc::new(RateLimitPicker::new(10 * 1024 * 1024)))
.with_compression(foyer::Compression::Lz4)
.with_tombstone_log_config(
TombstoneLogConfigBuilder::new(dir.path().join("tombstone-log-file"))
.with_flush(true)
.build(),
)
.with_runtime_config(RuntimeConfig::Separated {
read_runtime_config: TokioRuntimeConfig {
worker_threads: 4,
Expand All @@ -69,6 +56,28 @@ async fn main() -> Result<()> {
max_blocking_threads: 8,
},
})
.with_large_object_disk_cache_options(
LargeEngineOptions::new()
.with_indexer_shards(64)
.with_recover_concurrency(8)
.with_flushers(2)
.with_reclaimers(2)
.with_buffer_pool_size(256 * 1024 * 1024)
.with_clean_region_threshold(4)
.with_eviction_pickers(vec![Box::<FifoPicker>::default()])
.with_reinsertion_picker(Arc::new(RateLimitPicker::new(10 * 1024 * 1024)))
.with_tombstone_log_config(
TombstoneLogConfigBuilder::new(dir.path().join("tombstone-log-file"))
.with_flush(true)
.build(),
),
)
.with_small_object_disk_cache_options(
SmallEngineOptions::new()
.with_set_size(16 * 1024)
.with_set_cache_capacity(64)
.with_flushers(2),
)
.build()
.await?;

Expand Down
4 changes: 2 additions & 2 deletions examples/tail_based_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::time::Duration;

use foyer::{DirectFsDeviceOptionsBuilder, HybridCache, HybridCacheBuilder};
use foyer::{DirectFsDeviceOptionsBuilder, Engine, HybridCache, HybridCacheBuilder};

#[cfg(feature = "jaeger")]
fn init_jaeger_exporter() {
Expand Down Expand Up @@ -70,7 +70,7 @@ async fn main() -> anyhow::Result<()> {

let hybrid: HybridCache<u64, String> = HybridCacheBuilder::new()
.memory(64 * 1024 * 1024)
.storage()
.storage(Engine::Large)
.with_device_config(
DirectFsDeviceOptionsBuilder::new(dir.path())
.with_capacity(256 * 1024 * 1024)
Expand Down
54 changes: 38 additions & 16 deletions foyer-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ use analyze::{analyze, monitor, Metrics};
use bytesize::ByteSize;
use clap::{builder::PossibleValuesParser, ArgGroup, Parser};
use foyer::{
Compression, DirectFileDeviceOptionsBuilder, DirectFsDeviceOptionsBuilder, FifoConfig, FifoPicker, HybridCache,
HybridCacheBuilder, InvalidRatioPicker, LfuConfig, LruConfig, RateLimitPicker, RecoverMode, RuntimeConfig,
S3FifoConfig, TokioRuntimeConfig, TracingConfig,
Compression, DirectFileDeviceOptionsBuilder, DirectFsDeviceOptionsBuilder, Engine, FifoConfig, FifoPicker,
HybridCache, HybridCacheBuilder, InvalidRatioPicker, LargeEngineOptions, LfuConfig, LruConfig, RateLimitPicker,
RecoverMode, RuntimeConfig, S3FifoConfig, SmallEngineOptions, TokioRuntimeConfig, TracingConfig,
};
use futures::future::join_all;
use itertools::Itertools;
Expand Down Expand Up @@ -204,6 +204,11 @@ pub struct Args {
#[arg(long, value_enum, default_value_t = Compression::None)]
compression: Compression,

// TODO(MrCroxx): use mixed engine by default.
/// Disk cache engine.
#[arg(long, default_value_t = Engine::Large)]
engine: Engine,

/// Time-series operation distribution.
///
/// Available values: "none", "uniform", "zipf".
Expand Down Expand Up @@ -233,6 +238,12 @@ pub struct Args {
#[arg(long, value_parser = PossibleValuesParser::new(["lru", "lfu", "fifo", "s3fifo"]), default_value = "lru")]
eviction: String,

#[arg(long, default_value_t = ByteSize::kib(16))]
set_size: ByteSize,

#[arg(long, default_value_t = 64)]
set_cache_capacity: usize,

/// Record insert trace threshold. Only effective with "mtrace" feature.
#[arg(long, default_value_t = 1000 * 1000)]
trace_insert_us: usize,
Expand Down Expand Up @@ -448,7 +459,7 @@ async fn benchmark(args: Args) {

let mut builder = builder
.with_weighter(|_: &u64, value: &Value| u64::BITS as usize / 8 + value.len())
.storage();
.storage(args.engine);

builder = match (args.file.as_ref(), args.dir.as_ref()) {
(Some(file), None) => builder.with_device_config(
Expand All @@ -468,15 +479,7 @@ async fn benchmark(args: Args) {

builder = builder
.with_flush(args.flush)
.with_indexer_shards(args.shards)
.with_recover_mode(args.recover_mode)
.with_recover_concurrency(args.recover_concurrency)
.with_flushers(args.flushers)
.with_reclaimers(args.reclaimers)
.with_eviction_pickers(vec![
Box::new(InvalidRatioPicker::new(args.invalid_ratio)),
Box::<FifoPicker>::default(),
])
.with_compression(args.compression)
.with_runtime_config(match args.runtime.as_str() {
"disabled" => RuntimeConfig::Disabled,
Expand All @@ -497,20 +500,39 @@ async fn benchmark(args: Args) {
_ => unreachable!(),
});

let mut large = LargeEngineOptions::new()
.with_indexer_shards(args.shards)
.with_recover_concurrency(args.recover_concurrency)
.with_flushers(args.flushers)
.with_reclaimers(args.reclaimers)
.with_eviction_pickers(vec![
Box::new(InvalidRatioPicker::new(args.invalid_ratio)),
Box::<FifoPicker>::default(),
]);

let small = SmallEngineOptions::new()
.with_flushers(args.flushers)
.with_set_size(args.set_size.as_u64() as _)
.with_set_cache_capacity(args.set_cache_capacity);

if args.admission_rate_limit.as_u64() > 0 {
builder =
builder.with_admission_picker(Arc::new(RateLimitPicker::new(args.admission_rate_limit.as_u64() as _)));
}
if args.reinsertion_rate_limit.as_u64() > 0 {
builder =
builder.with_reinsertion_picker(Arc::new(RateLimitPicker::new(args.admission_rate_limit.as_u64() as _)));
large = large.with_reinsertion_picker(Arc::new(RateLimitPicker::new(args.admission_rate_limit.as_u64() as _)));
}

if args.clean_region_threshold > 0 {
builder = builder.with_clean_region_threshold(args.clean_region_threshold);
large = large.with_clean_region_threshold(args.clean_region_threshold);
}

let hybrid = builder.build().await.unwrap();
let hybrid = builder
.with_large_object_disk_cache_options(large)
.with_small_object_disk_cache_options(small)
.build()
.await
.unwrap();

#[cfg(feature = "mtrace")]
hybrid.enable_tracing();
Expand Down
74 changes: 74 additions & 0 deletions foyer-common/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,78 @@ impl SingletonHandle {
{
self.0.spawn_blocking(func)
}

/// Runs a future to completion on this `Handle`'s associated `Runtime`.
///
/// This runs the given future on the current thread, blocking until it is
/// complete, and yielding its resolved result. Any tasks or timers which
/// the future spawns internally will be executed on the runtime.
///
/// When this is used on a `current_thread` runtime, only the
/// [`Runtime::block_on`] method can drive the IO and timer drivers, but the
/// `Handle::block_on` method cannot drive them. This means that, when using
/// this method on a `current_thread` runtime, anything that relies on IO or
/// timers will not work unless there is another thread currently calling
/// [`Runtime::block_on`] on the same runtime.
///
/// # If the runtime has been shut down
///
/// If the `Handle`'s associated `Runtime` has been shut down (through
/// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by
/// dropping it) and `Handle::block_on` is used it might return an error or
/// panic. Specifically IO resources will return an error and timers will
/// panic. Runtime independent futures will run as normal.
///
/// # Panics
///
/// This function panics if the provided future panics, if called within an
/// asynchronous execution context, or if a timer future is executed on a
/// runtime that has been shut down.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
///
/// // Get a handle from this runtime
/// let handle = rt.handle();
///
/// // Execute the future, blocking the current thread until completion
/// handle.block_on(async {
/// println!("hello");
/// });
/// ```
///
/// Or using `Handle::current`:
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main () {
/// let handle = Handle::current();
/// std::thread::spawn(move || {
/// // Using Handle::block_on to run async code in the new thread.
/// handle.block_on(async {
/// println!("hello");
/// });
/// });
/// }
/// ```
///
/// [`JoinError`]: struct@crate::task::JoinError
/// [`JoinHandle`]: struct@crate::task::JoinHandle
/// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on
/// [`Runtime::shutdown_background`]: fn@crate::runtime::Runtime::shutdown_background
/// [`Runtime::shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
/// [`spawn_blocking`]: crate::task::spawn_blocking
/// [`tokio::fs`]: crate::fs
/// [`tokio::net`]: crate::net
/// [`tokio::time`]: crate::time
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.0.block_on(future)
}
}
2 changes: 2 additions & 0 deletions foyer-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ futures = "0.3"
itertools = { workspace = true }
libc = "0.2"
lz4 = "1.24"
ordered_hash_map = "0.4"
parking_lot = { version = "0.12", features = ["arc_lock"] }
paste = "1"
pin-project = "1"
rand = "0.8"
serde = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion foyer-storage/src/device/direct_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl Dev for DirectFileDevice {

if file.metadata().unwrap().is_file() {
tracing::warn!(
"{}\n{}\n{}",
"{} {} {}",
"It seems a `DirectFileDevice` is used within a normal file system, which is inefficient.",
"Please use `DirectFileDevice` directly on a raw block device.",
"Or use `DirectFsDevice` within a normal file system.",
Expand Down
Loading

0 comments on commit 656055a

Please sign in to comment.