Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save on allocations by using fixed size types for database rows #1043

Merged
merged 12 commits into from
Jul 13, 2024
14 changes: 10 additions & 4 deletions src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ impl Chain {
}

/// Load the chain from a collection of headers, up to the given tip
pub(crate) fn load(&mut self, headers: Vec<BlockHeader>, tip: BlockHash) {
pub(crate) fn load(&mut self, headers: impl Iterator<Item = BlockHeader>, tip: BlockHash) {
let genesis_hash = self.headers[0].0;

let header_map: HashMap<BlockHash, BlockHeader> =
headers.into_iter().map(|h| (h.block_hash(), h)).collect();
headers.map(|h| (h.block_hash(), h)).collect();
let mut blockhash = tip;
let mut new_headers: Vec<&BlockHeader> = Vec::with_capacity(header_map.len());
while blockhash != genesis_hash {
Expand Down Expand Up @@ -202,7 +202,10 @@ hex!("000000200030d7f9c11ef35b89a0eefb9a5e449909339b5e7854d99804ea8d6a49bf900a03

// test loading from a list of headers and tip
let mut regtest = Chain::new(Regtest);
regtest.load(headers.clone(), headers.last().unwrap().block_hash());
regtest.load(
headers.iter().copied(),
headers.last().unwrap().block_hash(),
);
assert_eq!(regtest.height(), headers.len());

// test getters
Expand Down Expand Up @@ -239,7 +242,10 @@ hex!("000000200030d7f9c11ef35b89a0eefb9a5e449909339b5e7854d99804ea8d6a49bf900a03

// test reorg
let mut regtest = Chain::new(Regtest);
regtest.load(headers.clone(), headers.last().unwrap().block_hash());
regtest.load(
headers.iter().copied(),
headers.last().unwrap().block_hash(),
);
let height = regtest.height();

let new_header: BlockHeader = deserialize(&hex!("000000200030d7f9c11ef35b89a0eefb9a5e449909339b5e7854d99804ea8d6a49bf900a0304d2e55fe0b6415949cff9bca0f88c0717884a5e5797509f89f856af93624a7a6bcc60ffff7f2000000000")).unwrap();
Expand Down
137 changes: 96 additions & 41 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use electrs_rocksdb as rocksdb;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};

pub(crate) type Row = Box<[u8]>;
use crate::types::{HashPrefix, SerializedHashPrefixRow, SerializedHeaderRow};

#[derive(Default)]
pub(crate) struct WriteBatch {
pub(crate) tip_row: Row,
pub(crate) header_rows: Vec<Row>,
pub(crate) funding_rows: Vec<Row>,
pub(crate) spending_rows: Vec<Row>,
pub(crate) txid_rows: Vec<Row>,
pub(crate) tip_row: [u8; 32],
pub(crate) header_rows: Vec<SerializedHeaderRow>,
pub(crate) funding_rows: Vec<SerializedHashPrefixRow>,
pub(crate) spending_rows: Vec<SerializedHashPrefixRow>,
pub(crate) txid_rows: Vec<SerializedHashPrefixRow>,
}

impl WriteBatch {
Expand Down Expand Up @@ -218,39 +218,50 @@ impl DBStore {
self.db.cf_handle(HEADERS_CF).expect("missing HEADERS_CF")
}

pub(crate) fn iter_funding(&self, prefix: Row) -> impl Iterator<Item = Row> + '_ {
pub(crate) fn iter_funding(
&self,
prefix: HashPrefix,
) -> impl Iterator<Item = SerializedHashPrefixRow> + '_ {
self.iter_prefix_cf(self.funding_cf(), prefix)
}

pub(crate) fn iter_spending(&self, prefix: Row) -> impl Iterator<Item = Row> + '_ {
pub(crate) fn iter_spending(
&self,
prefix: HashPrefix,
) -> impl Iterator<Item = SerializedHashPrefixRow> + '_ {
self.iter_prefix_cf(self.spending_cf(), prefix)
}

pub(crate) fn iter_txid(&self, prefix: Row) -> impl Iterator<Item = Row> + '_ {
pub(crate) fn iter_txid(
&self,
prefix: HashPrefix,
) -> impl Iterator<Item = SerializedHashPrefixRow> + '_ {
self.iter_prefix_cf(self.txid_cf(), prefix)
}

fn iter_cf<'a, const N: usize>(
&'a self,
cf: &rocksdb::ColumnFamily,
readopts: rocksdb::ReadOptions,
prefix: Option<HashPrefix>,
) -> impl Iterator<Item = [u8; N]> + '_ {
DBIterator::new(self.db.raw_iterator_cf_opt(cf, readopts), prefix)
}

fn iter_prefix_cf(
&self,
cf: &rocksdb::ColumnFamily,
prefix: Row,
) -> impl Iterator<Item = Row> + '_ {
let mode = rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward);
prefix: HashPrefix,
) -> impl Iterator<Item = SerializedHashPrefixRow> + '_ {
let mut opts = rocksdb::ReadOptions::default();
opts.set_prefix_same_as_start(true); // requires .set_prefix_extractor() above.
self.db
.iterator_cf_opt(cf, opts, mode)
.map(|row| row.expect("prefix iterator failed").0) // values are empty in prefix-scanned CFs
self.iter_cf(cf, opts, Some(prefix))
}

pub(crate) fn read_headers(&self) -> Vec<Row> {
pub(crate) fn iter_headers(&self) -> impl Iterator<Item = SerializedHeaderRow> + '_ {
let mut opts = rocksdb::ReadOptions::default();
opts.fill_cache(false);
self.db
.iterator_cf_opt(self.headers_cf(), opts, rocksdb::IteratorMode::Start)
.map(|row| row.expect("header iterator failed").0) // extract key from row
.filter(|key| &key[..] != TIP_KEY) // headers' rows are longer than TIP_KEY
.collect()
self.iter_cf(self.headers_cf(), opts, None)
}

pub(crate) fn get_tip(&self) -> Option<Vec<u8>> {
Expand All @@ -273,7 +284,7 @@ impl DBStore {
for key in &batch.header_rows {
db_batch.put_cf(self.headers_cf(), key, b"");
}
db_batch.put_cf(self.headers_cf(), TIP_KEY, &batch.tip_row);
db_batch.put_cf(self.headers_cf(), TIP_KEY, batch.tip_row);

let mut opts = rocksdb::WriteOptions::new();
let bulk_import = self.bulk_import.load(Ordering::Relaxed);
Expand Down Expand Up @@ -354,6 +365,57 @@ impl DBStore {
}
}

struct DBIterator<'a, const N: usize> {
raw: rocksdb::DBRawIterator<'a>,
prefix: Option<HashPrefix>,
done: bool,
}

impl<'a, const N: usize> DBIterator<'a, N> {
fn new(mut raw: rocksdb::DBRawIterator<'a>, prefix: Option<HashPrefix>) -> Self {
match prefix {
Some(key) => raw.seek(key),
None => raw.seek_to_first(),
};
Self {
raw,
prefix,
done: false,
}
}
}

impl<'a, const N: usize> Iterator for DBIterator<'a, N> {
type Item = [u8; N];

fn next(&mut self) -> Option<Self::Item> {
while !self.done {
let key = match self.raw.key() {
Some(key) => key,
None => {
self.raw.status().expect("DB scan failed");
break; // end of scan
}
};
let prefix_match = match self.prefix {
Some(key_prefix) => key.starts_with(&key_prefix),
None => true,
};
if !prefix_match {
break; // prefix mismatch
}
let result: Option<[u8; N]> = key.try_into().ok();
self.raw.next();
match result {
Some(value) => return Some(value),
None => continue, // skip keys with size != N
}
}
self.done = true;
None
}
}

impl Drop for DBStore {
fn drop(&mut self) {
info!("closing DB at {}", self.db.path().display());
Expand Down Expand Up @@ -424,31 +486,24 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let store = DBStore::open(dir.path(), None, true).unwrap();

let items: &[&[u8]] = &[
b"ab",
b"abcdefgh",
b"abcdefghj",
b"abcdefghjk",
b"abcdefghxyz",
b"abcdefgi",
b"b",
b"c",
let items = [
*b"ab ",
*b"abcdefgh ",
*b"abcdefghj ",
*b"abcdefghjk ",
*b"abcdefghxyz ",
*b"abcdefgi ",
*b"b ",
*b"c ",
];

store.write(&WriteBatch {
txid_rows: to_rows(items),
txid_rows: items.to_vec(),
..Default::default()
});

let rows = store.iter_txid(b"abcdefgh".to_vec().into_boxed_slice());
assert_eq!(rows.collect::<Vec<_>>(), to_rows(&items[1..5]));
}

fn to_rows(values: &[&[u8]]) -> Vec<Box<[u8]>> {
values
.iter()
.map(|v| v.to_vec().into_boxed_slice())
.collect()
let rows = store.iter_txid(*b"abcdefgh");
assert_eq!(rows.collect::<Vec<_>>(), items[1..5]);
}

#[test]
Expand Down
31 changes: 15 additions & 16 deletions src/index.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use anyhow::{Context, Result};
use bitcoin::consensus::{deserialize, serialize, Decodable};
use bitcoin::consensus::{deserialize, Decodable, Encodable};
use bitcoin::hashes::Hash;
use bitcoin::{BlockHash, OutPoint, Txid};
use bitcoin_slices::{bsl, Visit, Visitor};
use std::ops::ControlFlow;

use crate::{
chain::{Chain, NewHeader},
daemon::Daemon,
db::{DBStore, Row, WriteBatch},
db::{DBStore, WriteBatch},
metrics::{self, Gauge, Histogram, Metrics},
signals::ExitFlag,
types::{
Expand Down Expand Up @@ -48,8 +49,8 @@ impl Stats {
self.update_duration.observe_duration(label, f)
}

fn observe_size(&self, label: &str, rows: &[Row]) {
self.update_size.observe(label, db_rows_size(rows) as f64);
fn observe_size<const N: usize>(&self, label: &str, rows: &[[u8; N]]) {
self.update_size.observe(label, (rows.len() * N) as f64);
romanz marked this conversation as resolved.
Show resolved Hide resolved
}

fn observe_batch(&self, batch: &WriteBatch) {
Expand Down Expand Up @@ -101,10 +102,8 @@ impl Index {
if let Some(row) = store.get_tip() {
let tip = deserialize(&row).expect("invalid tip");
let headers = store
.read_headers()
.into_iter()
.map(|row| HeaderRow::from_db_row(&row).header)
.collect();
.iter_headers()
.map(|row| HeaderRow::from_db_row(row).header);
chain.load(headers, tip);
chain.drop_last_headers(reindex_last_blocks);
};
Expand Down Expand Up @@ -141,7 +140,7 @@ impl Index {
pub(crate) fn filter_by_txid(&self, txid: Txid) -> impl Iterator<Item = BlockHash> + '_ {
self.store
.iter_txid(TxidRow::scan_prefix(txid))
.map(|row| HashPrefixRow::from_db_row(&row).height())
.map(|row| HashPrefixRow::from_db_row(row).height())
.filter_map(move |height| self.chain.get_block_hash(height))
}

Expand All @@ -151,7 +150,7 @@ impl Index {
) -> impl Iterator<Item = BlockHash> + '_ {
self.store
.iter_funding(ScriptHashRow::scan_prefix(scripthash))
.map(|row| HashPrefixRow::from_db_row(&row).height())
.map(|row| HashPrefixRow::from_db_row(row).height())
.filter_map(move |height| self.chain.get_block_hash(height))
}

Expand All @@ -161,7 +160,7 @@ impl Index {
) -> impl Iterator<Item = BlockHash> + '_ {
self.store
.iter_spending(SpendingPrefixRow::scan_prefix(outpoint))
.map(|row| HashPrefixRow::from_db_row(&row).height())
.map(|row| HashPrefixRow::from_db_row(row).height())
.filter_map(move |height| self.chain.get_block_hash(height))
}

Expand Down Expand Up @@ -236,10 +235,6 @@ impl Index {
}
}

fn db_rows_size(rows: &[Row]) -> usize {
rows.iter().map(|key| key.len()).sum()
}

fn index_single_block(
block_hash: BlockHash,
block: SerBlock,
Expand Down Expand Up @@ -292,5 +287,9 @@ fn index_single_block(

let mut index_block = IndexBlockVisitor { batch, height };
bsl::Block::visit(&block, &mut index_block).expect("core returned invalid block");
batch.tip_row = serialize(&block_hash).into_boxed_slice();

let len = block_hash
.consensus_encode(&mut (&mut batch.tip_row as &mut [u8]))
.expect("in-memory writers don't error");
debug_assert_eq!(len, BlockHash::LEN);
}
Loading
Loading