Skip to content

Commit

Permalink
Break out store implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
paulgb committed Sep 3, 2024
1 parent 2ecd661 commit ecf01fa
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 157 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions aper/src/aper.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
connection::{ClientConnection, MessageToServer},
store::Store,
Mutation, StoreHandle,
store::{Store, StoreHandle},
Mutation,
};
use serde::{Deserialize, Serialize};
use std::{
Expand Down
2 changes: 1 addition & 1 deletion aper/src/data_structures/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ impl<K: Serialize + DeserializeOwned, V: AperSync> Map<K, V> {
pub fn delete(&mut self, key: &K) {
let key = bincode::serialize(key).unwrap();
self.map.delete_child(&key);
}
}
}
162 changes: 10 additions & 152 deletions aper/src/store.rs → aper/src/store/core.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use super::{
handle::StoreHandle,
prefix_map::{PrefixMap, PrefixMapValue},
};
use crate::{
listener::{self, ListenerMap},
Bytes, Mutation,
Expand All @@ -10,62 +14,17 @@ use std::{
sync::{Arc, Mutex},
};

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
pub enum PrefixMapValue {
Value(Bytes),
Deleted,
}

#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum PrefixMap {
Children(BTreeMap<Bytes, PrefixMapValue>),
DeletedPrefixMap,
}

impl PrefixMap {
fn get(&self, key: &Bytes) -> Option<PrefixMapValue> {
match self {
PrefixMap::Children(children) => children.get(key).cloned(),
PrefixMap::DeletedPrefixMap => Some(PrefixMapValue::Deleted),
}
}

fn insert(&mut self, key: Bytes, value: PrefixMapValue) {
match self {
PrefixMap::Children(children) => {
children.insert(key, value);
}
PrefixMap::DeletedPrefixMap => {
if value == PrefixMapValue::Deleted {
// the prefix map is deleted, so we don't need to delete the value.
return;
}

let mut new_children = BTreeMap::new();
new_children.insert(key, value);
*self = PrefixMap::Children(new_children);
}
}
}
}

impl Default for PrefixMap {
fn default() -> Self {
Self::Children(BTreeMap::new())
}
}

#[derive(Default)]
pub struct StoreLayer {
/// Map of prefix to direct children at that prefix.
layer: BTreeMap<Vec<Bytes>, PrefixMap>,
pub(crate) layer: BTreeMap<Vec<Bytes>, PrefixMap>,
/// A set of prefixes that have been modified in this layer.
dirty: HashSet<Vec<Bytes>>,
pub(crate) dirty: HashSet<Vec<Bytes>>,
}

pub struct StoreInner {
layers: Mutex<Vec<StoreLayer>>,
listeners: Mutex<ListenerMap>,
pub(crate) layers: Mutex<Vec<StoreLayer>>,
pub(crate) listeners: Mutex<ListenerMap>,
}

impl Default for StoreInner {
Expand All @@ -79,7 +38,7 @@ impl Default for StoreInner {

#[derive(Clone, Default)]
pub struct Store {
inner: Arc<StoreInner>,
pub(crate) inner: Arc<StoreInner>,
}

impl Store {
Expand Down Expand Up @@ -250,108 +209,7 @@ impl Store {
}

pub fn handle(&self) -> StoreHandle {
StoreHandle {
map: self.clone(),
prefix: vec![],
}
}
}

#[derive(Clone)]
pub struct StoreHandle {
map: Store,
prefix: Vec<Bytes>,
}

impl StoreHandle {
pub fn listen<F: Fn() -> bool + 'static + Send + Sync>(&self, listener: F) {
let mut listeners = self.map.inner.listeners.lock().unwrap();
listeners.listen(self.prefix.clone(), listener);
}

pub fn get(&self, key: &Bytes) -> Option<Bytes> {
self.map.get(&self.prefix, key)
}

pub fn set(&mut self, key: Bytes, value: Bytes) {
// set the value in the top layer.

let mut layers = self.map.inner.layers.lock().unwrap();
let mut top_layer = layers.last_mut().unwrap();

let mut map = top_layer.layer.entry(self.prefix.clone()).or_default();

top_layer.dirty.insert(self.prefix.clone());

map.insert(key, PrefixMapValue::Value(value));
}

pub fn delete(&mut self, key: Bytes) {
// delete the value in the top layer.

let mut layers = self.map.inner.layers.lock().unwrap();
let mut top_layer = layers.last_mut().unwrap();

let mut map = top_layer.layer.entry(self.prefix.clone()).or_default();

top_layer.dirty.insert(self.prefix.clone());

map.insert(key, PrefixMapValue::Deleted);
}

pub fn child(&mut self, path_part: &[u8]) -> Self {
let mut prefix = self.prefix.clone();
prefix.push(path_part.to_vec());
self.map.ensure(&prefix);
Self {
map: self.map.clone(),
prefix,
}
}

pub fn delete_child(&mut self, path_part: &[u8]) {
let mut prefix = self.prefix.clone();
prefix.push(path_part.to_vec());

let mut layers = self.map.inner.layers.lock().unwrap();
let mut top_layer = layers.last_mut().unwrap();

// When we delete a prefix, we delete not only that prefix but all of the prefixes under it.
// TODO: This is a bit expensive, in order to make a trade-off that reads are faster. Is the balance optimal?

let mut prefixes_to_delete = HashSet::new();

for layer in layers.iter() {
for (pfx, val) in layer.layer.iter() {
if pfx.starts_with(&prefix) {
prefixes_to_delete.insert(pfx.clone());
}
}
}

let mut top_layer = layers.last_mut().unwrap();

for pfx in prefixes_to_delete.iter() {
top_layer
.layer
.insert(pfx.clone(), PrefixMap::DeletedPrefixMap);
top_layer.dirty.insert(pfx.clone());
}
}
}

impl Debug for Store {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let layers = self.inner.layers.lock().unwrap();

for (i, layer) in layers.iter().enumerate() {
writeln!(f, "Layer {}", i)?;
for (prefix, map) in layer.layer.iter() {
writeln!(f, " {:?} -> {:?}", prefix, map)?;
}
}

Ok(())
StoreHandle::new(self.clone())
}
}

Expand Down
120 changes: 120 additions & 0 deletions aper/src/store/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use super::{
core::Store,
prefix_map::{PrefixMap, PrefixMapValue},
};
use crate::{
listener::{self, ListenerMap},
Bytes, Mutation,
};
use serde::{Deserialize, Serialize};
use std::{
cell::RefCell,
collections::{BTreeMap, HashSet},
fmt::{Debug, Formatter},
sync::{Arc, Mutex},
};

#[derive(Clone)]
pub struct StoreHandle {
map: Store,
prefix: Vec<Bytes>,
}

impl StoreHandle {
pub fn new(map: Store) -> Self {
Self {
map,
prefix: vec![],
}
}

pub fn listen<F: Fn() -> bool + 'static + Send + Sync>(&self, listener: F) {
let mut listeners = self.map.inner.listeners.lock().unwrap();
listeners.listen(self.prefix.clone(), listener);
}

pub fn get(&self, key: &Bytes) -> Option<Bytes> {
self.map.get(&self.prefix, key)
}

pub fn set(&mut self, key: Bytes, value: Bytes) {
// set the value in the top layer.

let mut layers = self.map.inner.layers.lock().unwrap();
let mut top_layer = layers.last_mut().unwrap();

let mut map = top_layer.layer.entry(self.prefix.clone()).or_default();

top_layer.dirty.insert(self.prefix.clone());

map.insert(key, PrefixMapValue::Value(value));
}

pub fn delete(&mut self, key: Bytes) {
// delete the value in the top layer.

let mut layers = self.map.inner.layers.lock().unwrap();
let mut top_layer = layers.last_mut().unwrap();

let mut map = top_layer.layer.entry(self.prefix.clone()).or_default();

top_layer.dirty.insert(self.prefix.clone());

map.insert(key, PrefixMapValue::Deleted);
}

pub fn child(&mut self, path_part: &[u8]) -> Self {
let mut prefix = self.prefix.clone();
prefix.push(path_part.to_vec());
self.map.ensure(&prefix);
Self {
map: self.map.clone(),
prefix,
}
}

pub fn delete_child(&mut self, path_part: &[u8]) {
let mut prefix = self.prefix.clone();
prefix.push(path_part.to_vec());

let mut layers = self.map.inner.layers.lock().unwrap();
let mut top_layer = layers.last_mut().unwrap();

// When we delete a prefix, we delete not only that prefix but all of the prefixes under it.
// TODO: This is a bit expensive, in order to make a trade-off that reads are faster. Is the balance optimal?

let mut prefixes_to_delete = HashSet::new();

for layer in layers.iter() {
for (pfx, val) in layer.layer.iter() {
if pfx.starts_with(&prefix) {
prefixes_to_delete.insert(pfx.clone());
}
}
}

let mut top_layer = layers.last_mut().unwrap();

for pfx in prefixes_to_delete.iter() {
top_layer
.layer
.insert(pfx.clone(), PrefixMap::DeletedPrefixMap);
top_layer.dirty.insert(pfx.clone());
}
}
}

impl Debug for Store {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let layers = self.inner.layers.lock().unwrap();

for (i, layer) in layers.iter().enumerate() {
writeln!(f, "Layer {}", i)?;
for (prefix, map) in layer.layer.iter() {
writeln!(f, " {:?} -> {:?}", prefix, map)?;
}
}

Ok(())
}
}
7 changes: 7 additions & 0 deletions aper/src/store/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod core;
mod handle;
mod prefix_map;

pub use core::Store;
pub use handle::StoreHandle;
pub use prefix_map::{PrefixMap, PrefixMapValue};
Loading

0 comments on commit ecf01fa

Please sign in to comment.