Skip to content

Commit

Permalink
Read trees from GPU within scope of building threads.
Browse files Browse the repository at this point in the history
  • Loading branch information
porcuquine authored and dignifiedquire committed Nov 18, 2020
1 parent 4560927 commit 43a383f
Showing 1 changed file with 103 additions and 95 deletions.
198 changes: 103 additions & 95 deletions storage-proofs/porep/src/stacked/vanilla/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,12 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr

// This channel will receive batches of columns and add them to the ColumnTreeBuilder.
let (builder_tx, builder_rx) = mpsc::sync_channel(0);
// This channel will receive the finished tree data to be written to disk.
let (writer_tx, writer_rx) = mpsc::sync_channel::<(Vec<Fr>, Vec<Fr>)>(0);

let config_count = configs.len(); // Don't move config into closure below.
rayon::scope(|s| {
// This channel will receive the finished tree data to be written to disk.
let (writer_tx, writer_rx) = mpsc::sync_channel::<(Vec<Fr>, Vec<Fr>)>(0);

s.spawn(move |_| {
for i in 0..config_count {
let mut node_index = 0;
Expand Down Expand Up @@ -560,64 +561,68 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr
.expect("failed to send base_data, tree_data");
}
});
});

for config in &configs {
let (base_data, tree_data) = writer_rx.recv()?;
let tree_len = base_data.len() + tree_data.len();

assert_eq!(base_data.len(), nodes_count);
assert_eq!(tree_len, config.size.expect("config size failure"));

// Persist the base and tree data to disk based using the current store config.
let tree_c_store = DiskStore::<<Tree::Hasher as Hasher>::Domain>::new_with_config(
tree_len,
Tree::Arity::to_usize(),
config.clone(),
)
.expect("failed to create DiskStore for base tree data");

let store = Arc::new(RwLock::new(tree_c_store));
let batch_size = std::cmp::min(base_data.len(), column_write_batch_size);
let flatten_and_write_store = |data: &Vec<Fr>, offset| {
data.into_par_iter()
.chunks(column_write_batch_size)
.enumerate()
.try_for_each(|(index, fr_elements)| {
let mut buf = Vec::with_capacity(batch_size * NODE_SIZE);

for fr in fr_elements {
buf.extend(fr_into_bytes(&fr));
}
store
.write()
.expect("failed to access store for write")
.copy_from_slice(&buf[..], offset + (batch_size * index))
})
};
for config in &configs {
let (base_data, tree_data) = writer_rx
.recv()
.expect("failed to receive base_data, tree_data for tree_c");
let tree_len = base_data.len() + tree_data.len();

trace!(
"flattening tree_c base data of {} nodes using batch size {}",
base_data.len(),
batch_size
);
flatten_and_write_store(&base_data, 0).expect("failed to flatten and write store");
trace!("done flattening tree_c base data");

let base_offset = base_data.len();
trace!("flattening tree_c tree data of {} nodes using batch size {} and base offset {}", tree_data.len(), batch_size, base_offset);
flatten_and_write_store(&tree_data, base_offset)
.expect("failed to flatten and write store");
trace!("done flattening tree_c tree data");

trace!("writing tree_c store data");
store
.write()
.expect("failed to access store for sync")
.sync()
.expect("store sync failure");
trace!("done writing tree_c store data");
}
assert_eq!(base_data.len(), nodes_count);
assert_eq!(tree_len, config.size.expect("config size failure"));

// Persist the base and tree data to disk based using the current store config.
let tree_c_store =
DiskStore::<<Tree::Hasher as Hasher>::Domain>::new_with_config(
tree_len,
Tree::Arity::to_usize(),
config.clone(),
)
.expect("failed to create DiskStore for base tree data");

let store = Arc::new(RwLock::new(tree_c_store));
let batch_size = std::cmp::min(base_data.len(), column_write_batch_size);
let flatten_and_write_store = |data: &Vec<Fr>, offset| {
data.into_par_iter()
.chunks(column_write_batch_size)
.enumerate()
.try_for_each(|(index, fr_elements)| {
let mut buf = Vec::with_capacity(batch_size * NODE_SIZE);

for fr in fr_elements {
buf.extend(fr_into_bytes(&fr));
}
store
.write()
.expect("failed to access store for write")
.copy_from_slice(&buf[..], offset + (batch_size * index))
})
};

trace!(
"flattening tree_c base data of {} nodes using batch size {}",
base_data.len(),
batch_size
);
flatten_and_write_store(&base_data, 0)
.expect("failed to flatten and write store");
trace!("done flattening tree_c base data");

let base_offset = base_data.len();
trace!("flattening tree_c tree data of {} nodes using batch size {} and base offset {}", tree_data.len(), batch_size, base_offset);
flatten_and_write_store(&tree_data, base_offset)
.expect("failed to flatten and write store");
trace!("done flattening tree_c tree data");

trace!("writing tree_c store data");
store
.write()
.expect("failed to access store for sync")
.sync()
.expect("store sync failure");
trace!("done writing tree_c store data");
}
});

create_disk_tree::<
DiskTree<Tree::Hasher, Tree::Arity, Tree::SubTreeArity, Tree::TopTreeArity>,
Expand Down Expand Up @@ -721,12 +726,13 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr

// This channel will receive batches of leaf nodes and add them to the TreeBuilder.
let (builder_tx, builder_rx) = mpsc::sync_channel::<(Vec<Fr>, bool)>(0);
// This channel will receive the finished tree data to be written to disk.
let (writer_tx, writer_rx) = mpsc::sync_channel::<Vec<Fr>>(0);
let config_count = configs.len(); // Don't move config into closure below.
let configs = &configs;
let tree_r_last_config = &tree_r_last_config;
rayon::scope(|s| {
// This channel will receive the finished tree data to be written to disk.
let (writer_tx, writer_rx) = mpsc::sync_channel::<Vec<Fr>>(0);

s.spawn(move |_| {
for i in 0..config_count {
let mut node_index = 0;
Expand Down Expand Up @@ -822,45 +828,47 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr
writer_tx.send(tree_data).expect("failed to send tree_data");
}
});
});

for config in configs.iter() {
let tree_data = writer_rx.recv()?;
for config in configs.iter() {
let tree_data = writer_rx
.recv()
.expect("failed to receive tree_data for tree_r_last");

let tree_data_len = tree_data.len();
let cache_size = get_merkle_tree_cache_size(
get_merkle_tree_leafs(
config.size.expect("config size failure"),
let tree_data_len = tree_data.len();
let cache_size = get_merkle_tree_cache_size(
get_merkle_tree_leafs(
config.size.expect("config size failure"),
Tree::Arity::to_usize(),
)
.expect("failed to get merkle tree leaves"),
Tree::Arity::to_usize(),
config.rows_to_discard,
)
.expect("failed to get merkle tree leaves"),
Tree::Arity::to_usize(),
config.rows_to_discard,
)
.expect("failed to get merkle tree cache size");
assert_eq!(tree_data_len, cache_size);

let flat_tree_data: Vec<_> = tree_data
.into_par_iter()
.flat_map(|el| fr_into_bytes(&el))
.collect();

// Persist the data to the store based on the current config.
let tree_r_last_path = StoreConfig::data_path(&config.path, &config.id);
trace!(
"persisting tree r of len {} with {} rows to discard at path {:?}",
tree_data_len,
config.rows_to_discard,
tree_r_last_path
);
let mut f = OpenOptions::new()
.create(true)
.write(true)
.open(&tree_r_last_path)
.expect("failed to open file for tree_r_last");
f.write_all(&flat_tree_data)
.expect("failed to wrote tree_r_last data");
}
.expect("failed to get merkle tree cache size");
assert_eq!(tree_data_len, cache_size);

let flat_tree_data: Vec<_> = tree_data
.into_par_iter()
.flat_map(|el| fr_into_bytes(&el))
.collect();

// Persist the data to the store based on the current config.
let tree_r_last_path = StoreConfig::data_path(&config.path, &config.id);
trace!(
"persisting tree r of len {} with {} rows to discard at path {:?}",
tree_data_len,
config.rows_to_discard,
tree_r_last_path
);
let mut f = OpenOptions::new()
.create(true)
.write(true)
.open(&tree_r_last_path)
.expect("failed to open file for tree_r_last");
f.write_all(&flat_tree_data)
.expect("failed to wrote tree_r_last data");
}
});
} else {
info!("generating tree r last using the CPU");
let size = Store::len(last_layer_labels);
Expand Down

0 comments on commit 43a383f

Please sign in to comment.