From f2196ba9a4689efea1698824dfca0187815d7c49 Mon Sep 17 00:00:00 2001 From: Matthijs van Otterdijk Date: Wed, 5 Apr 2023 11:15:45 +0200 Subject: [PATCH] use less memory in builders (#132) * complete builder rewrite for better memory behavior * dedup while adding triples to the simple builder * use archive store for the builder benchmarks * change loop strategy for iterator --- benches/builder/main.rs | 10 +- src/layer/simple_builder.rs | 525 +++++++++++++++++++----------------- 2 files changed, 285 insertions(+), 250 deletions(-) diff --git a/benches/builder/main.rs b/benches/builder/main.rs index 9557c93b..9a980b67 100644 --- a/benches/builder/main.rs +++ b/benches/builder/main.rs @@ -12,7 +12,7 @@ use data::*; #[bench] fn build_empty_base_layer(b: &mut Bencher) { let dir = tempdir().unwrap(); - let store = terminus_store::open_sync_directory_store(dir.path()); + let store = terminus_store::open_sync_archive_store(dir.path(), 512); b.iter(|| { let builder = store.create_base_layer().unwrap(); @@ -23,7 +23,7 @@ fn build_empty_base_layer(b: &mut Bencher) { #[bench] fn build_base_layer_1000(b: &mut Bencher) { let dir = tempdir().unwrap(); - let store = terminus_store::open_sync_directory_store(dir.path()); + let store = terminus_store::open_sync_archive_store(dir.path(), 512); let seed = b"the quick brown fox jumped over "; let rand = StdRng::from_seed(*seed); @@ -48,7 +48,7 @@ fn build_base_layer_1000(b: &mut Bencher) { #[bench] fn build_empty_child_layer_on_empty_base_layer(b: &mut Bencher) { let dir = tempdir().unwrap(); - let store = terminus_store::open_sync_directory_store(dir.path()); + let store = terminus_store::open_sync_archive_store(dir.path(), 512); let builder = store.create_base_layer().unwrap(); let base_layer = builder.commit().unwrap(); @@ -61,7 +61,7 @@ fn build_empty_child_layer_on_empty_base_layer(b: &mut Bencher) { #[bench] fn build_nonempty_child_layer_on_empty_base_layer(b: &mut Bencher) { let dir = tempdir().unwrap(); - let store = terminus_store::open_sync_directory_store(dir.path()); + let store = terminus_store::open_sync_archive_store(dir.path(), 512); let builder = store.create_base_layer().unwrap(); let base_layer = builder.commit().unwrap(); @@ -88,7 +88,7 @@ fn build_nonempty_child_layer_on_empty_base_layer(b: &mut Bencher) { #[bench] fn build_nonempty_child_layer_on_nonempty_base_layer(b: &mut Bencher) { let dir = tempdir().unwrap(); - let store = terminus_store::open_sync_directory_store(dir.path()); + let store = terminus_store::open_sync_archive_store(dir.path(), 512); let seed = b"the quick brown fox jumped over "; let rand = StdRng::from_seed(*seed); diff --git a/src/layer/simple_builder.rs b/src/layer/simple_builder.rs index 90c4a23e..024a643a 100644 --- a/src/layer/simple_builder.rs +++ b/src/layer/simple_builder.rs @@ -12,15 +12,14 @@ use super::internal::*; use super::layer::*; use crate::storage::*; -use crate::structure::TypedDictEntry; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::io; use std::pin::Pin; use std::sync::Arc; use futures::future::Future; -use rayon::prelude::*; +use bitvec::prelude::*; /// A layer builder trait with no generic typing. /// @@ -54,10 +53,16 @@ pub struct SimpleLayerBuilder { name: [u32; 5], parent: Option>, files: LayerFiles, - additions: Vec, id_additions: Vec, - removals: Vec, id_removals: Vec, + + nodes_values_map: HashMap, + predicates_map: HashMap, + nodes_values_map_count: usize, + predicates_map_count: usize, + node_count: usize, + pred_count: usize, + val_count: usize, } impl SimpleLayerBuilder { @@ -67,25 +72,120 @@ impl SimpleLayerBuilder { name, parent: None, files: LayerFiles::Base(files), - additions: Vec::new(), id_additions: Vec::with_capacity(0), - removals: Vec::new(), id_removals: Vec::with_capacity(0), + + nodes_values_map: HashMap::new(), + predicates_map: HashMap::new(), + nodes_values_map_count: 0, + predicates_map_count: 0, + node_count: 0, + pred_count: 0, + val_count: 0, } } /// Construct a layer builder for a child layer pub fn from_parent(name: [u32; 5], parent: Arc, files: ChildLayerFiles) -> Self { + let nodes_values_map_count = parent.node_and_value_count(); + let predicates_map_count = parent.predicate_count(); Self { name, parent: Some(parent), files: LayerFiles::Child(files), - additions: Vec::new(), id_additions: Vec::new(), - removals: Vec::new(), id_removals: Vec::new(), + + nodes_values_map: HashMap::new(), + predicates_map: HashMap::new(), + nodes_values_map_count, + predicates_map_count, + node_count: 0, + pred_count: 0, + val_count: 0, } } + + fn calculate_triple(&mut self, triple: ValueTriple) -> IdTriple { + let subject = ObjectType::Node(triple.subject); + let predicate = triple.predicate; + let object = triple.object; + let subject_id = if let Some(n) = self.nodes_values_map.get(&subject) { + *n + } else { + let node_id = if let Some(node_id) = self + .parent + .as_ref() + .and_then(|p| p.subject_id(subject.node_ref().unwrap())) + { + node_id + } else { + self.nodes_values_map_count += 1; + self.node_count += 1; + self.nodes_values_map_count as u64 + }; + self.nodes_values_map.insert(subject, node_id); + + node_id + }; + + let predicate_id = if let Some(p) = self.predicates_map.get(&predicate) { + *p + } else { + let predicate_id = if let Some(predicate_id) = self + .parent + .as_ref() + .and_then(|p| p.predicate_id(&predicate)) + { + predicate_id + } else { + self.predicates_map_count += 1; + self.pred_count += 1; + self.predicates_map_count as u64 + }; + self.predicates_map.insert(predicate, predicate_id); + + predicate_id + }; + let object_id = if let Some(o) = self.nodes_values_map.get(&object) { + *o + } else { + match object { + ObjectType::Node(n) => { + let node_id = if let Some(node_id) = self + .parent + .as_ref() + .and_then(|p| p.object_node_id(n.as_str())) + { + node_id + } else { + self.nodes_values_map_count += 1; + self.node_count += 1; + self.nodes_values_map_count as u64 + }; + self.nodes_values_map.insert(ObjectType::Node(n), node_id); + + node_id + } + ObjectType::Value(v) => { + let value_id = if let Some(value_id) = + self.parent.as_ref().and_then(|p| p.object_value_id(&v)) + { + value_id + } else { + self.nodes_values_map_count += 1; + self.val_count += 1; + self.nodes_values_map_count as u64 + }; + self.nodes_values_map.insert(ObjectType::Value(v), value_id); + + value_id + } + } + }; + + IdTriple::new(subject_id, predicate_id, object_id) + } } impl LayerBuilder for SimpleLayerBuilder { @@ -97,16 +197,18 @@ impl LayerBuilder for SimpleLayerBuil self.parent.clone() } - fn add_value_triple(&mut self, triple: ValueTriple) { - self.additions.push(triple); + fn add_value_triple(&mut self, addition: ValueTriple) { + let triple = self.calculate_triple(addition); + self.id_additions.push(triple); } fn add_id_triple(&mut self, triple: IdTriple) { self.id_additions.push(triple); } - fn remove_value_triple(&mut self, triple: ValueTriple) { - self.removals.push(triple); + fn remove_value_triple(&mut self, removal: ValueTriple) { + let triple = self.calculate_triple(removal); + self.id_removals.push(triple); } fn remove_id_triple(&mut self, triple: IdTriple) { @@ -118,75 +220,168 @@ impl LayerBuilder for SimpleLayerBuil name: _, parent, files, - additions, - id_additions, - removals, - id_removals, + mut id_additions, + mut id_removals, + + nodes_values_map, + predicates_map, + nodes_values_map_count: _, + predicates_map_count: _, + node_count, + pred_count, + val_count, } = self; + let parent_node_value_offset = parent + .as_ref() + .map(|p| p.node_and_value_count()) + .unwrap_or(0); + let parent_predicate_offset = parent.as_ref().map(|p| p.predicate_count()).unwrap_or(0); + // time to deduplicate! + + id_additions.sort(); + id_additions.dedup(); + id_additions.shrink_to_fit(); + id_removals.sort(); + id_removals.dedup(); + id_removals.shrink_to_fit(); + + // we now need to figure out noops. + let mut additions_it = id_additions.iter_mut().peekable(); + let mut removals_it = id_removals.iter_mut().peekable(); + loop { + let addition = additions_it.peek(); + let removal = removals_it.peek(); + + // advance those iterators in order until we reach the end + if removal.is_none() { + break; + } + if addition.is_none() { + // loop over the remaining removals to nullify everything that should be a noop due to being out of range + for removal in removals_it { + if removal.subject > parent_node_value_offset as u64 + || removal.predicate > parent_predicate_offset as u64 + || removal.object > parent_node_value_offset as u64 + { + *removal = IdTriple::new(0, 0, 0); + } + } + break; + } - let (mut additions, mut removals) = rayon::join( - || { - let mut additions: Vec<_> = match parent.as_ref() { - None => additions - .into_iter() - .map(|triple| triple.to_unresolved()) - .collect(), - Some(parent) => additions - .into_par_iter() - .map(move |triple| parent.value_triple_to_partially_resolved(triple)) - .collect(), - }; - - additions.extend(id_additions.into_iter().map(|triple| triple.to_resolved())); - additions.par_sort_unstable(); - additions.dedup(); - - additions - }, - || { - let mut removals: Vec<_> = match parent.as_ref() { - None => removals - .into_iter() - .map(|triple| triple.to_unresolved()) - .collect(), - Some(parent) => removals - .into_par_iter() - .map(move |triple| parent.value_triple_to_partially_resolved(triple)) - .collect(), - }; - - removals.extend(id_removals.into_iter().map(|triple| triple.to_resolved())); - removals.par_sort_unstable(); - removals.dedup(); - - removals - }, - ); + if addition < removal { + additions_it.next(); + } else if addition > removal { + let removal = removals_it.next().unwrap(); + // we need to clear a potential noop + if removal.subject > parent_node_value_offset as u64 + || removal.predicate > parent_predicate_offset as u64 + || removal.object > parent_node_value_offset as u64 + { + *removal = IdTriple::new(0, 0, 0); + } + } else { + // same triple! make it zeroes to express a no-op without having to shift around triples + let addition = additions_it.next().unwrap(); + let removal = removals_it.next().unwrap(); + *addition = IdTriple::new(0, 0, 0); + *removal = IdTriple::new(0, 0, 0); + } + } + + // some dict entries might now be unused. We need to do an existence check. + let mut node_value_existences = bitvec![0;node_count + val_count]; + let mut predicate_existences = bitvec![0;pred_count]; + for triple in id_additions.iter().chain(id_removals.iter()) { + if triple.subject > parent_node_value_offset as u64 { + node_value_existences + .set(triple.subject as usize - parent_node_value_offset - 1, true); + } + if triple.predicate > parent_predicate_offset as u64 { + predicate_existences.set( + triple.predicate as usize - parent_predicate_offset - 1, + true, + ); + } + if triple.object > parent_node_value_offset as u64 { + node_value_existences + .set(triple.object as usize - parent_node_value_offset - 1, true); + } + } + + // time to collect our dictionaries. + let mut nodes = Vec::with_capacity(node_count); + let mut predicates = Vec::with_capacity(pred_count); + let mut values = Vec::with_capacity(val_count); + + for (entry, id) in nodes_values_map.into_iter() { + if id <= parent_node_value_offset as u64 { + // we don't care about these ids. they are already correct in the triples + continue; + } + if !node_value_existences[id as usize - parent_node_value_offset - 1] { + // while originally collected, in the end this entry was unused + continue; + } + match entry { + ObjectType::Node(n) => nodes.push((n, id)), + ObjectType::Value(v) => values.push((v, id)), + } + } + for (entry, id) in predicates_map.into_iter() { + if id <= parent_predicate_offset as u64 { + // we don't care about these ids. they are already correct in the triples + continue; + } + if !predicate_existences[id as usize - parent_predicate_offset - 1] { + // while originally collected, in the end this entry was unused + continue; + } - // there's now a sorted list of additions and a sorted list of - // removals, all as resolved as they can possibly be at this - // point. In order to support no-ops (where you add and - // remove the same triple in the same builder), we need to - // cross off the instances that appear in both lists. - // 'crossing off' is accomplished by setting the particular - // triple to (0,0,0), which is understood by the rest of the - // code to mean a no-op. - - zero_equivalents(&mut additions, &mut removals); - - // in addition, all removals that aren't resolved at this - // point are actually no-ops. - if parent.is_some() { - removals - .par_iter_mut() - .for_each(|triple| triple.make_resolved_or_zero()) + predicates.push((entry, id)); } - // collect all strings we don't yet know about - let (unresolved_nodes, unresolved_predicates, unresolved_values) = - collect_unresolved_strings(&additions); + nodes.sort(); + predicates.sort(); + values.sort(); + + // build up conversion maps for converting the id triples to their final id + let mut node_value_id_map = vec![0_u64; node_count + val_count]; + let mut predicate_id_map = vec![0_u64; pred_count]; + for (new_id, (_, old_id)) in nodes.iter().enumerate() { + let mapped_old_id = *old_id as usize - parent_node_value_offset - 1; + node_value_id_map[mapped_old_id] = (new_id + parent_node_value_offset + 1) as u64; + } + for (new_id, (_, old_id)) in values.iter().enumerate() { + let mapped_old_id = *old_id as usize - parent_node_value_offset - 1; + node_value_id_map[mapped_old_id] = + (new_id + parent_node_value_offset + nodes.len() + 1) as u64; + } + for (new_id, (_, old_id)) in predicates.iter().enumerate() { + let mapped_old_id = *old_id as usize - parent_predicate_offset - 1; + predicate_id_map[mapped_old_id] = (new_id + parent_predicate_offset + 1) as u64; + } + + // now we have to map all the additions and removals + for triple in id_additions.iter_mut().chain(id_removals.iter_mut()) { + if triple.subject > parent_node_value_offset as u64 { + let mapped_id = triple.subject as usize - parent_node_value_offset - 1; + triple.subject = node_value_id_map[mapped_id]; + } + if triple.predicate > parent_predicate_offset as u64 { + let mapped_id = triple.predicate as usize - parent_predicate_offset - 1; + triple.predicate = predicate_id_map[mapped_id]; + } + if triple.object > parent_node_value_offset as u64 { + let mapped_id = triple.object as usize - parent_node_value_offset - 1; + triple.object = node_value_id_map[mapped_id]; + } + } + // and resort them + id_additions.sort(); + id_removals.sort(); - // time to build things + // great! everything is now in order. Let's stuff it into an actual builder Box::pin(async { match parent { Some(parent) => { @@ -194,44 +389,15 @@ impl LayerBuilder for SimpleLayerBuil let mut builder = ChildLayerFileBuilder::from_files(parent.clone(), &files).await?; - let node_ids = builder.add_nodes(unresolved_nodes.clone()); - let predicate_ids = builder.add_predicates(unresolved_predicates.clone()); - let value_ids = builder.add_values(unresolved_values.clone()); + builder.add_nodes(nodes.into_iter().map(|x| x.0)); + builder.add_predicates(predicates.into_iter().map(|x| x.0)); + builder.add_values(values.into_iter().map(|x| x.0)); let mut builder = builder.into_phase2().await?; - let counts = parent.all_counts(); - let parent_node_offset = counts.node_count as u64 + counts.value_count as u64; - let parent_predicate_offset = counts.predicate_count as u64; - let mut node_map = HashMap::new(); - for (node, id) in unresolved_nodes.into_iter().zip(node_ids) { - node_map.insert(node, id + parent_node_offset); - } - let mut predicate_map = HashMap::new(); - for (predicate, id) in unresolved_predicates.into_iter().zip(predicate_ids) { - predicate_map.insert(predicate, id + parent_predicate_offset); - } - let mut value_map = HashMap::new(); - for (value, id) in unresolved_values.into_iter().zip(value_ids) { - value_map.insert(value, id + parent_node_offset + node_map.len() as u64); - } + builder.add_id_triples(id_additions).await?; + builder.remove_id_triples(id_removals).await?; - let mut add_triples: Vec<_> = additions - .into_iter() - .map(|t| { - t.resolve_with(&node_map, &predicate_map, &value_map) - .expect("triple should have been resolvable") - }) - .collect(); - add_triples.par_sort_unstable(); - let remove_triples: Vec<_> = removals - .into_iter() - .filter_map(|r| r.as_resolved()) - .collect(); - - // TODO this should be in parallel - builder.add_id_triples(add_triples).await?; - builder.remove_id_triples(remove_triples).await?; builder.finalize().await } None => { @@ -239,35 +405,14 @@ impl LayerBuilder for SimpleLayerBuil let files = files.into_base(); let mut builder = BaseLayerFileBuilder::from_files(&files).await?; - let node_ids = builder.add_nodes(unresolved_nodes.clone()); - let predicate_ids = builder.add_predicates(unresolved_predicates.clone()); - let value_ids = builder.add_values(unresolved_values.clone()); + builder.add_nodes(nodes.into_iter().map(|x| x.0)); + builder.add_predicates(predicates.into_iter().map(|x| x.0)); + builder.add_values(values.into_iter().map(|x| x.0)); let mut builder = builder.into_phase2().await?; - let mut node_map = HashMap::new(); - for (node, id) in unresolved_nodes.into_iter().zip(node_ids) { - node_map.insert(node, id); - } - let mut predicate_map = HashMap::new(); - for (predicate, id) in unresolved_predicates.into_iter().zip(predicate_ids) { - predicate_map.insert(predicate, id); - } - let mut value_map = HashMap::new(); - for (value, id) in unresolved_values.into_iter().zip(value_ids) { - value_map.insert(value, id + node_map.len() as u64); - } + builder.add_id_triples(id_additions).await?; - let mut add_triples: Vec<_> = additions - .into_iter() - .map(|t| { - t.resolve_with(&node_map, &predicate_map, &value_map) - .expect("triple should have been resolvable") - }) - .collect(); - add_triples.par_sort_unstable(); - - builder.add_id_triples(add_triples).await?; builder.finalize().await } } @@ -280,116 +425,6 @@ impl LayerBuilder for SimpleLayerBuil } } -fn zero_equivalents( - additions: &mut [PartiallyResolvedTriple], - removals: &mut [PartiallyResolvedTriple], -) { - let mut removals_iter = removals.iter_mut().peekable(); - 'outer: for mut addition in additions { - let mut next = removals_iter.peek(); - if next == None { - break; - } - - if next < Some(&addition) { - loop { - removals_iter.next().unwrap(); - next = removals_iter.peek(); - - if next == None { - break 'outer; - } else if next >= Some(&addition) { - break; - } - } - } - - if next == Some(&addition) { - let mut removal = removals_iter.next().unwrap(); - addition.subject = PossiblyResolved::Resolved(0); - addition.predicate = PossiblyResolved::Resolved(0); - addition.object = PossiblyResolved::Resolved(0); - - removal.subject = PossiblyResolved::Resolved(0); - removal.predicate = PossiblyResolved::Resolved(0); - removal.object = PossiblyResolved::Resolved(0); - } - } -} - -fn collect_unresolved_strings( - triples: &[PartiallyResolvedTriple], -) -> (Vec, Vec, Vec) { - let (unresolved_nodes, (unresolved_predicates, unresolved_values)) = rayon::join( - || { - let unresolved_nodes_set: HashSet<_> = triples - .par_iter() - .filter_map(|triple| { - let subject = match triple.subject.is_resolved() { - true => None, - false => Some(triple.subject.as_ref().unwrap_unresolved().to_owned()), - }; - let object = match triple.object.is_resolved() { - true => None, - false => match triple.object.as_ref().unwrap_unresolved() { - ObjectType::Node(node) => Some(node.to_owned()), - _ => None, - }, - }; - - match (subject, object) { - (Some(subject), Some(object)) => Some(vec![subject, object]), - (Some(subject), _) => Some(vec![subject]), - (_, Some(object)) => Some(vec![object]), - _ => None, - } - }) - .flatten() - .collect(); - - let mut unresolved_nodes: Vec<_> = unresolved_nodes_set.into_iter().collect(); - unresolved_nodes.par_sort_unstable(); - - unresolved_nodes - }, - || { - rayon::join( - || { - let unresolved_predicates_set: HashSet<_> = triples - .par_iter() - .filter_map(|triple| match triple.predicate.is_resolved() { - true => None, - false => Some(triple.predicate.as_ref().unwrap_unresolved().to_owned()), - }) - .collect(); - let mut unresolved_predicates: Vec<_> = - unresolved_predicates_set.into_iter().collect(); - unresolved_predicates.par_sort_unstable(); - - unresolved_predicates - }, - || { - let unresolved_values_set: HashSet<_> = triples - .par_iter() - .filter_map(|triple| match triple.object.is_resolved() { - true => None, - false => match triple.object.as_ref().unwrap_unresolved() { - ObjectType::Value(value) => Some(value.to_owned()), - _ => None, - }, - }) - .collect(); - let mut unresolved_values: Vec<_> = unresolved_values_set.into_iter().collect(); - unresolved_values.par_sort_unstable(); - unresolved_values - }, - ) - }, - ); - - (unresolved_nodes, unresolved_predicates, unresolved_values) -} - #[cfg(test)] mod tests { use super::*;