Skip to content

Commit

Permalink
refactor LruArchiveBackend to only attempt caching if layer fits
Browse files Browse the repository at this point in the history
  • Loading branch information
matko committed Jun 16, 2023
1 parent 7a8f0f8 commit 1d7bdf2
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 54 deletions.
104 changes: 57 additions & 47 deletions src/storage/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use std::{
collections::HashMap,
io::{self, ErrorKind, SeekFrom},
ops::Range,
os::unix::prelude::MetadataExt,
path::PathBuf,
pin::Pin,
sync::{Arc, RwLock},
task::Poll, os::unix::prelude::MetadataExt,
task::Poll,
};

use async_trait::async_trait;
Expand All @@ -21,6 +22,7 @@ use tokio::{
fs::{self, File},
io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt},
};
use tokio_util::either::Either;

use crate::structure::{
logarray_length_from_control_word, smallbitarray::SmallBitArray, LateLogArrayBufBuilder,
Expand Down Expand Up @@ -316,11 +318,12 @@ impl ArchiveMetadataBackend for DirectoryArchiveBackend {
}

#[derive(Clone)]
pub struct LruArchiveBackend<T> {
pub struct LruArchiveBackend<M, D> {
cache: Arc<tokio::sync::Mutex<LruCache<[u32; 5], CacheEntry>>>,
limit: usize,
current: usize,
origin: T,
metadata_origin: M,
data_origin: D,
}

#[derive(Clone)]
Expand All @@ -339,15 +342,16 @@ impl CacheEntry {
}
}

impl<T> LruArchiveBackend<T> {
pub fn new(origin: T, limit: usize) -> Self {
impl<M, D> LruArchiveBackend<M, D> {
pub fn new(metadata_origin: M, data_origin: D, limit: usize) -> Self {
let cache = Arc::new(tokio::sync::Mutex::new(LruCache::unbounded()));

Self {
cache,
limit,
current: 0,
origin,
metadata_origin,
data_origin,
}
}

Expand All @@ -356,6 +360,12 @@ impl<T> LruArchiveBackend<T> {
}
}

impl<M: ArchiveMetadataBackend, D: ArchiveBackend> LruArchiveBackend<M, D> {
async fn layer_fits_in_cache(&self, id: [u32; 5]) -> io::Result<bool> {
Ok(self.layer_size(id).await? as usize <= self.limit_bytes())
}
}

fn ensure_additional_cache_space(cache: &mut LruCache<[u32; 5], CacheEntry>, mut required: usize) {
if required == 0 {
return;
Expand Down Expand Up @@ -417,8 +427,8 @@ fn drop_from_cache(cache: &mut LruCache<[u32; 5], CacheEntry>, id: [u32; 5]) {
}

#[async_trait]
impl<T: ArchiveBackend> ArchiveBackend for LruArchiveBackend<T> {
type Read = BytesAsyncReader;
impl<M: ArchiveMetadataBackend, D: ArchiveBackend> ArchiveBackend for LruArchiveBackend<M, D> {
type Read = Either<BytesAsyncReader, D::Read>;
async fn get_layer_bytes(&self, id: [u32; 5]) -> io::Result<Bytes> {
let mut cache = self.cache.lock().await;
let cached = cache.get(&id).cloned();
Expand All @@ -445,7 +455,7 @@ impl<T: ArchiveBackend> ArchiveBackend for LruArchiveBackend<T> {

// drop the cache while doing the lookup
std::mem::drop(cache);
let lookup = self.origin.get_layer_bytes(id).await;
let lookup = self.data_origin.get_layer_bytes(id).await;

*result = Some(lookup.as_ref().map_err(|e| e.kind()).cloned());

Expand Down Expand Up @@ -483,12 +493,18 @@ impl<T: ArchiveBackend> ArchiveBackend for LruArchiveBackend<T> {
id: [u32; 5],
file_type: LayerFileEnum,
) -> io::Result<Option<Bytes>> {
let bytes = self.get_layer_bytes(id).await?;
let archive = Archive::parse(bytes);
Ok(archive.slice_for(file_type))
if self.layer_fits_in_cache(id).await? {
let bytes = self.get_layer_bytes(id).await?;
let archive = Archive::parse(bytes);
Ok(archive.slice_for(file_type))
} else {
self.data_origin
.get_layer_structure_bytes(id, file_type)
.await
}
}
async fn store_layer_file(&self, id: [u32; 5], bytes: Bytes) -> io::Result<()> {
self.origin.store_layer_file(id, bytes.clone()).await?;
self.data_origin.store_layer_file(id, bytes.clone()).await?;

let mut cache = self.cache.lock().await;
cache.get_or_insert(id, move || CacheEntry::Resolved(bytes));
Expand All @@ -501,66 +517,61 @@ impl<T: ArchiveBackend> ArchiveBackend for LruArchiveBackend<T> {
file_type: LayerFileEnum,
read_from: usize,
) -> io::Result<Self::Read> {
let mut bytes = self
.get_layer_structure_bytes(id, file_type)
.await?
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "slice not found in archive"))?;
bytes.advance(read_from);

Ok(BytesAsyncReader(bytes))
}
}

#[derive(Clone)]
pub struct LruMetadataArchiveBackend<M, D> {
metadata_backend: M,
data_backend: LruArchiveBackend<D>,
}

impl<M, D> LruMetadataArchiveBackend<M, D> {
pub fn new(metadata_backend: M, data_backend: LruArchiveBackend<D>) -> Self {
Self {
metadata_backend,
data_backend,
if self.layer_fits_in_cache(id).await? {
let mut bytes = self
.get_layer_structure_bytes(id, file_type)
.await?
.ok_or_else(|| {
io::Error::new(io::ErrorKind::NotFound, "slice not found in archive")
})?;
bytes.advance(read_from);

Ok(Either::Left(BytesAsyncReader(bytes)))
} else {
Ok(Either::Right(
self.data_origin
.read_layer_structure_bytes_from(id, file_type, read_from)
.await?,
))
}
}
}

#[async_trait]
impl<M: ArchiveMetadataBackend, D: ArchiveBackend> ArchiveMetadataBackend
for LruMetadataArchiveBackend<M, D>
for LruArchiveBackend<M, D>
{
async fn get_layer_names(&self) -> io::Result<Vec<[u32; 5]>> {
self.metadata_backend.get_layer_names().await
self.metadata_origin.get_layer_names().await
}
async fn layer_exists(&self, id: [u32; 5]) -> io::Result<bool> {
if let Some(CacheEntry::Resolved(_)) = self.data_backend.cache.lock().await.peek(&id) {
if let Some(CacheEntry::Resolved(_)) = self.cache.lock().await.peek(&id) {
Ok(true)
} else {
self.metadata_backend.layer_exists(id).await
self.metadata_origin.layer_exists(id).await
}
}
async fn layer_size(&self, id: [u32; 5]) -> io::Result<u64> {
if let Some(CacheEntry::Resolved(bytes)) = self.data_backend.cache.lock().await.peek(&id) {
if let Some(CacheEntry::Resolved(bytes)) = self.cache.lock().await.peek(&id) {
Ok(bytes.len() as u64)
} else {
self.metadata_backend.layer_size(id).await
self.metadata_origin.layer_size(id).await
}
}
async fn layer_file_exists(&self, id: [u32; 5], file_type: LayerFileEnum) -> io::Result<bool> {
if let Some(CacheEntry::Resolved(bytes)) = self.data_backend.cache.lock().await.peek(&id) {
if let Some(CacheEntry::Resolved(bytes)) = self.cache.lock().await.peek(&id) {
let header = ArchiveFilePresenceHeader::new(bytes.clone().get_u64());
Ok(header.is_present(file_type))
} else {
self.metadata_backend.layer_file_exists(id, file_type).await
self.metadata_origin.layer_file_exists(id, file_type).await
}
}
async fn get_layer_structure_size(
&self,
id: [u32; 5],
file_type: LayerFileEnum,
) -> io::Result<usize> {
if let Some(CacheEntry::Resolved(bytes)) = self.data_backend.cache.lock().await.peek(&id) {
if let Some(CacheEntry::Resolved(bytes)) = self.cache.lock().await.peek(&id) {
let (header, _) = ArchiveHeader::parse(bytes.clone());

if let Some(size) = header.size_of(file_type) {
Expand All @@ -575,21 +586,20 @@ impl<M: ArchiveMetadataBackend, D: ArchiveBackend> ArchiveMetadataBackend
))
}
} else {
self.metadata_backend
self.metadata_origin
.get_layer_structure_size(id, file_type)
.await
}
}
async fn get_rollup(&self, id: [u32; 5]) -> io::Result<Option<[u32; 5]>> {
self.metadata_backend.get_rollup(id).await
self.metadata_origin.get_rollup(id).await
}
async fn set_rollup(&self, id: [u32; 5], rollup: [u32; 5]) -> io::Result<()> {
self.metadata_backend.set_rollup(id, rollup).await
self.metadata_origin.set_rollup(id, rollup).await
}

async fn get_parent(&self, id: [u32; 5]) -> io::Result<Option<[u32; 5]>> {
if let Some(parent_bytes) = self
.data_backend
.get_layer_structure_bytes(id, LayerFileEnum::Parent)
.await?
{
Expand Down
14 changes: 7 additions & 7 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};

use crate::layer::{IdTriple, Layer, LayerBuilder, LayerCounts, ObjectType, ValueTriple};
use crate::storage::archive::{
ArchiveLayerStore, DirectoryArchiveBackend, LruArchiveBackend, LruMetadataArchiveBackend,
};
use crate::storage::archive::{ArchiveLayerStore, DirectoryArchiveBackend, LruArchiveBackend};
use crate::storage::directory::{DirectoryLabelStore, DirectoryLayerStore};
use crate::storage::memory::{MemoryLabelStore, MemoryLayerStore};
use crate::storage::{CachedLayerStore, LabelStore, LayerStore, LockingHashMapLayerCache};
Expand Down Expand Up @@ -921,13 +919,15 @@ pub fn open_memory_store() -> Store {
pub fn open_archive_store<P: Into<PathBuf>>(path: P, cache_size: usize) -> Store {
let p = path.into();
let directory_archive_backend = DirectoryArchiveBackend::new(p.clone());
let archive_backend = LruArchiveBackend::new(directory_archive_backend.clone(), cache_size);
let archive_metadata_backend =
LruMetadataArchiveBackend::new(directory_archive_backend, archive_backend.clone());
let archive_backend = LruArchiveBackend::new(
directory_archive_backend.clone(),
directory_archive_backend,
cache_size,
);
Store::new(
DirectoryLabelStore::new(p),
CachedLayerStore::new(
ArchiveLayerStore::new(archive_metadata_backend, archive_backend),
ArchiveLayerStore::new(archive_backend.clone(), archive_backend),
LockingHashMapLayerCache::new(),
),
)
Expand Down

0 comments on commit 1d7bdf2

Please sign in to comment.