Skip to content

Commit

Permalink
adjust table configs
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Jan 14, 2024
1 parent 5e67089 commit 6770da6
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 24 deletions.
2 changes: 2 additions & 0 deletions server/src/api/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use actix_web::{
};
use serde_json::json;

// TODO: allow setting dedicated cache size

#[put("/v1/table/{name}")]
pub async fn handler(
path: Path<String>,
Expand Down
3 changes: 3 additions & 0 deletions server/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub struct AppState {
}

impl AppState {
// TODO: allow setting dedicated block cache per table
// TODO: if Some(...), show cache usage in table list PER table

pub async fn create_table(&self, table_name: &str) -> fjall::Result<MonitoredSmoltable> {
let mut tables = self.tables.write().await;

Expand Down
5 changes: 4 additions & 1 deletion server/src/env.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::path::PathBuf;

/// Gets data folder
pub fn data_folder() -> PathBuf {
let data_folder = std::env::var("SMOLTABLE_DATA").unwrap_or(".smoltable_data".into());
PathBuf::from(&data_folder)
}

/// Gets HTTP port
pub fn get_port() -> u16 {
let port = std::env::var("PORT")
.or_else(|_| std::env::var("SMOLTABLE_PORT"))
Expand All @@ -13,8 +15,9 @@ pub fn get_port() -> u16 {
port.parse::<u16>().expect("invalid port")
}

/// Metrics data cap *per metrics table*
pub fn metrics_cap_mb() -> u16 {
let port = std::env::var("SMOLTABLE_METRICS_CAP_MB").unwrap_or("10".into());
let port = std::env::var("SMOLTABLE_METRICS_CAP_MB").unwrap_or("1".into());

port.parse::<u16>()
.expect("invalid metrics cap MB setting, can be up to 65536")
Expand Down
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ async fn main() -> fjall::Result<()> {
let port = get_port();

let block_cache = Arc::new(fjall::BlockCache::with_capacity_bytes(
/* 32 MiB */ 32 * 1_024 * 1_024,
/* 16 MiB */ 16 * 1_024 * 1_024,
));

let keyspace = fjall::Config::new(data_folder())
Expand Down
2 changes: 1 addition & 1 deletion server/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ impl ManifestTable {
tree.set_max_memtable_size(/* 512 KiB */ 512 * 1_024);

tree.set_compaction_strategy(Arc::new(fjall::compaction::Levelled {
target_size: /* 512 KiB */ 512 * 1_024,
l0_threshold: 2,
target_size: 512 * 1_024,
}));

#[cfg(debug_assertions)]
Expand Down
2 changes: 2 additions & 0 deletions server/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ impl MetricsTable {
)),
)?;

table.tree.set_max_memtable_size(/* 1 MiB*/ 1_024 * 1_024);

table.create_column_families(&CreateColumnFamilyInput {
column_families: vec![ColumnFamilyDefinition {
name: "value".into(),
Expand Down
75 changes: 54 additions & 21 deletions smoltable/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,27 @@ impl Smoltable {
strategy: Arc<dyn fjall::compaction::Strategy + Send + Sync>,
) -> fjall::Result<Smoltable> {
let manifest = {
let config = fjall::PartitionCreateOptions::default().level_count(2);
let config = fjall::PartitionCreateOptions::default()
.level_count(2)
.level_ratio(2);

let tree = keyspace.open_partition(&format!("_man_{name}"), config)?;

tree.set_max_memtable_size(/* 512 KiB */ 512 * 1_024);

tree.set_compaction_strategy(Arc::new(fjall::compaction::Levelled {
target_size: 4 * 1_024 * 1_024,
target_size: /* 512 KiB */ 512 * 1_024,
l0_threshold: 2,
}));

tree
};

let tree = {
let config = fjall::PartitionCreateOptions::default().block_size(BLOCK_SIZE);
let tree = keyspace.open_partition(&format!("_dat_{name}"), config)?;
tree.set_compaction_strategy(strategy);

tree
};

Expand All @@ -169,6 +177,8 @@ impl Smoltable {

table.load_locality_groups()?;

// TODO: set block cache if defined

Ok(table)
}

Expand Down Expand Up @@ -199,7 +209,7 @@ impl Smoltable {
pub fn list_column_families(&self) -> fjall::Result<Vec<ColumnFamilyDefinition>> {
let items = self
.manifest
.prefix("cf:")
.prefix("cf#")
.into_iter()
.collect::<Result<Vec<_>, fjall::LsmError>>()?;

Expand All @@ -217,15 +227,15 @@ impl Smoltable {
fn load_locality_groups(&self) -> fjall::Result<()> {
let items = self
.manifest
.prefix("lg:")
.prefix("lg#")
.into_iter()
.collect::<Result<Vec<_>, fjall::LsmError>>()?;

let items = items
.into_iter()
.map(|(key, value)| {
let key = std::str::from_utf8(&key).expect("should be utf-8");
let id = key.split(':').nth(1).expect("should have ID");
let id = key.split('#').nth(1).expect("should have ID");

let value = std::str::from_utf8(&value).expect("should be utf-8");

Expand All @@ -236,10 +246,19 @@ impl Smoltable {
Ok(LocalityGroup {
id: id.into(),
column_families,
tree: self.keyspace.open_partition(
&format!("_lg_{id}"),
fjall::PartitionCreateOptions::default().block_size(BLOCK_SIZE),
)?,
tree: {
let tree = self.keyspace.open_partition(
&format!("_lg_{id}"),
fjall::PartitionCreateOptions::default().block_size(BLOCK_SIZE),
)?;

tree.set_compaction_strategy(Arc::new(fjall::compaction::Levelled {
target_size: 128 * 1_024 * 1_024,
l0_threshold: 8,
}));

tree
},
})
})
.collect::<fjall::Result<Vec<_>>>()?;
Expand All @@ -249,14 +268,37 @@ impl Smoltable {
Ok(())
}

/// Creates a dedicated block cache for the table.
///
/// Will be applied after restart automatically, no need to call after every start.
pub fn set_cache_size(&self, bytes: u64) -> fjall::Result<()> {
log::debug!("Setting block cache with {bytes}B table {:?}", self.name);

self.manifest.insert("cache#bytes", bytes.to_be_bytes())?;

// TODO: create block cache and apply to all partitions

self.keyspace.persist()?;

Ok(())
}

/// Creates column families.
///
/// Will be persisted, no need to call after every restart.
pub fn create_column_families(&self, input: &CreateColumnFamilyInput) -> fjall::Result<()> {
log::debug!("Creating column families for table");
log::debug!(
"Creating {} column families (locality: {}) for table {:?}",
input.column_families.len(),
input.locality_group.unwrap_or_default(),
self.name
);

let mut batch = self.keyspace.batch();

for item in &input.column_families {
let str = serde_json::to_string(&item).expect("should serialize");
batch.insert(&self.manifest, format!("cf:{}", item.name), str);
batch.insert(&self.manifest, format!("cf#{}", item.name), str);
}

let locality_group_id = nanoid::nanoid!();
Expand All @@ -269,7 +311,7 @@ impl Smoltable {
.collect();
let str = serde_json::to_string(&names).expect("should serialize");

batch.insert(&self.manifest, format!("lg:{locality_group_id}"), str);
batch.insert(&self.manifest, format!("lg#{locality_group_id}"), str);
}

batch.commit()?;
Expand All @@ -280,15 +322,6 @@ impl Smoltable {
Ok(())
}

/* pub fn from_tree(keyspace: Keyspace, tree: PartitionHandle) -> fjall::Result<Smoltable> {
Ok(Self {
keyspace,
tree,
manifest: keys
locality_groups: vec![],
})
} */

// TODO: count thrashes block cache

// TODO: unit test
Expand Down

0 comments on commit 6770da6

Please sign in to comment.