Skip to content

Commit

Permalink
Add decompression traits and a test case
Browse files Browse the repository at this point in the history
  • Loading branch information
Dentosal committed Oct 4, 2024
1 parent 4061e8c commit fb37322
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 27 deletions.
6 changes: 4 additions & 2 deletions crates/chain-config/src/config/coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl From<CoinConfig> for TableEntry<Coins> {
/// Generates a new coin config with a unique utxo id for testing
#[derive(Default, Debug)]
pub struct CoinConfigGenerator {
count: usize,
count: u16,
}

impl CoinConfigGenerator {
Expand All @@ -88,10 +88,12 @@ impl CoinConfigGenerator {

pub fn generate(&mut self) -> CoinConfig {
let mut bytes = [0u8; 32];
bytes[..std::mem::size_of::<usize>()].copy_from_slice(&self.count.to_be_bytes());
bytes[..std::mem::size_of_val(&self.count)]
.copy_from_slice(&self.count.to_be_bytes());

let config = CoinConfig {
tx_id: Bytes32::from(bytes),
tx_pointer_tx_idx: self.count,
..Default::default()
};
self.count = self.count.checked_add(1).expect("Max coin count reached");
Expand Down
17 changes: 16 additions & 1 deletion crates/fuel-core/src/combined_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ impl CombinedDatabase {
})
}

/// A test-only temporary rocksdb database with given rewind policy.
#[cfg(feature = "rocksdb")]
pub fn from_state_rewind_policy(
state_rewind_policy: StateRewindPolicy,
) -> DatabaseResult<Self> {
Ok(Self {
on_chain: Database::rocksdb_temp(state_rewind_policy)?,
off_chain: Database::rocksdb_temp(state_rewind_policy)?,
relayer: Database::rocksdb_temp(state_rewind_policy)?,
gas_price: Default::default(),
})
}

pub fn from_config(config: &CombinedDatabaseConfig) -> DatabaseResult<Self> {
let combined_database = match config.database_type {
#[cfg(feature = "rocksdb")]
Expand All @@ -103,7 +116,9 @@ impl CombinedDatabase {
tracing::warn!(
"No RocksDB path configured, initializing database with a tmp directory"
);
CombinedDatabase::default()
CombinedDatabase::from_state_rewind_policy(
config.state_rewind_policy,
)?
} else {
tracing::info!(
"Opening database {:?} with cache size \"{}\" and state rewind policy \"{:?}\"",
Expand Down
13 changes: 7 additions & 6 deletions crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,18 @@ where
}

#[cfg(feature = "rocksdb")]
pub fn rocksdb_temp() -> Self {
let db = RocksDb::<Historical<Description>>::default_open_temp(None).unwrap();
let historical_db =
HistoricalRocksDB::new(db, StateRewindPolicy::NoRewind).unwrap();
pub fn rocksdb_temp(rewind_policy: StateRewindPolicy) -> Result<Self> {
let db = RocksDb::<Historical<Description>>::default_open_temp(None)?;
let historical_db = HistoricalRocksDB::new(db, rewind_policy)?;
let data = Arc::new(historical_db);
Self::from_storage(DataSource::new(data, Stage::default()))
Ok(Self::from_storage(DataSource::new(data, Stage::default())))
}
}

/// Construct an ephemeral database
/// uses rocksdb when rocksdb features are enabled
/// uses in-memory when rocksdb features are disabled
/// Panics if the database creation fails
impl<Description, Stage> Default for Database<Description, Stage>
where
Description: DatabaseDescription,
Expand All @@ -264,7 +264,8 @@ where
}
#[cfg(feature = "rocksdb")]
{
Self::rocksdb_temp()
Self::rocksdb_temp(StateRewindPolicy::NoRewind)
.expect("Failed to create a temporary database")
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

pub mod api_service;
mod da_compression;
pub mod da_compression;
pub mod database;
pub(crate) mod metrics_extension;
pub mod ports;
Expand Down
179 changes: 171 additions & 8 deletions crates/fuel-core/src/graphql_api/da_compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,21 @@ use fuel_core_compression::{
config::Config,
ports::{
EvictorDb,
HistoryLookup,
TemporalRegistry,
UtxoIdToPointer,
},
};
use fuel_core_storage::{
not_found,
tables::{
Coins,
FuelBlocks,
Messages,
},
StorageAsMut,
StorageAsRef,
StorageInspect,
};
use fuel_core_types::{
blockchain::block::Block,
Expand Down Expand Up @@ -49,8 +56,8 @@ where
{
let compressed = compress(
config,
CompressTx {
db_tx,
CompressDbTx {
db_tx: DbTx { db_tx },
block_events,
},
block,
Expand All @@ -65,14 +72,23 @@ where
Ok(())
}

struct CompressTx<'a, Tx> {
db_tx: &'a mut Tx,
pub struct DbTx<'a, Tx> {
pub db_tx: &'a mut Tx,
}

struct CompressDbTx<'a, Tx> {
db_tx: DbTx<'a, Tx>,
block_events: &'a [Event],
}

pub struct DecompressDbTx<'a, Tx, Onchain> {
pub db_tx: DbTx<'a, Tx>,
pub onchain_db: Onchain,
}

macro_rules! impl_temporal_registry {
($type:ty) => { paste::paste! {
impl<'a, Tx> TemporalRegistry<$type> for CompressTx<'a, Tx>
impl<'a, Tx> TemporalRegistry<$type> for DbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
Expand Down Expand Up @@ -150,6 +166,78 @@ macro_rules! impl_temporal_registry {
}
}

impl<'a, Tx> TemporalRegistry<$type> for CompressTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
fn read_registry(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<$type> {
self.db_tx.read_registry(key)
}

fn read_timestamp(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<Tai64> {
<_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key)
}

fn write_registry(
&mut self,
key: &fuel_core_types::fuel_compression::RegistryKey,
value: &$type,
timestamp: Tai64,
) -> anyhow::Result<()> {
self.db_tx.write_registry(key, value, timestamp)
}

fn registry_index_lookup(
&self,
value: &$type,
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>>
{
self.db_tx.registry_index_lookup(value)
}
}

impl<'a, Tx, Offchain> TemporalRegistry<$type> for DecompressTx<'a, Tx, Offchain>
where
Tx: OffChainDatabaseTransaction,
{
fn read_registry(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<$type> {
self.db_tx.read_registry(key)
}

fn read_timestamp(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<Tai64> {
<_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key)
}

fn write_registry(
&mut self,
key: &fuel_core_types::fuel_compression::RegistryKey,
value: &$type,
timestamp: Tai64,
) -> anyhow::Result<()> {
self.db_tx.write_registry(key, value, timestamp)
}

fn registry_index_lookup(
&self,
value: &$type,
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>>
{
self.db_tx.registry_index_lookup(value)
}
}

impl<'a, Tx> EvictorDb<$type> for CompressTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
Expand All @@ -158,7 +246,7 @@ macro_rules! impl_temporal_registry {
&mut self,
key: fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<()> {
self.db_tx
self.db_tx.db_tx
.storage_as_mut::<DaCompressionTemporalRegistryEvictorCache>()
.insert(&MetadataKey::$type, &key)?;
Ok(())
Expand All @@ -168,7 +256,7 @@ macro_rules! impl_temporal_registry {
&self,
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>> {
Ok(self
.db_tx
.db_tx.db_tx
.storage_as_ref::<DaCompressionTemporalRegistryEvictorCache>()
.get(&MetadataKey::$type)?
.map(|v| v.into_owned())
Expand All @@ -185,7 +273,7 @@ impl_temporal_registry!(ContractId);
impl_temporal_registry!(ScriptCode);
impl_temporal_registry!(PredicateCode);

impl<'a, Tx> UtxoIdToPointer for CompressTx<'a, Tx>
impl<'a, Tx> UtxoIdToPointer for CompressDbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
Expand All @@ -210,3 +298,78 @@ where
anyhow::bail!("UtxoId not found in the block events");
}
}

impl<'a, Tx, Onchain> HistoryLookup for DecompressDbTx<'a, Tx, Onchain>
where
Tx: OffChainDatabaseTransaction,
Onchain: StorageInspect<Coins, Error = fuel_core_storage::Error>
+ StorageInspect<Messages, Error = fuel_core_storage::Error>
+ StorageInspect<FuelBlocks, Error = fuel_core_storage::Error>,
{
fn utxo_id(
&self,
c: fuel_core_types::fuel_tx::CompressedUtxoId,
) -> anyhow::Result<fuel_core_types::fuel_tx::UtxoId> {
if c.tx_pointer.block_height() == 0u32.into() {
// This is a genesis coin, which is handled differently.
// See CoinConfigGenerator::generate which generates the genesis coins.
let mut bytes = [0u8; 32];
let tx_index = c.tx_pointer.tx_index();
bytes[..std::mem::size_of_val(&tx_index)]
.copy_from_slice(&tx_index.to_be_bytes());
return Ok(fuel_core_types::fuel_tx::UtxoId::new(
fuel_core_types::fuel_tx::TxId::from(bytes),
0,
));
}

let block_info = self
.onchain_db
.storage_as_ref::<fuel_core_storage::tables::FuelBlocks>()
.get(&c.tx_pointer.block_height())?
.ok_or(not_found!(fuel_core_storage::tables::FuelBlocks))?;

let tx_id = *block_info
.transactions()
.get(c.tx_pointer.tx_index() as usize)
.ok_or(anyhow::anyhow!(
"Transaction not found in the block: {:?}",
c.tx_pointer
))?;

Ok(fuel_core_types::fuel_tx::UtxoId::new(tx_id, c.output_index))
}

fn coin(
&self,
utxo_id: fuel_core_types::fuel_tx::UtxoId,
) -> anyhow::Result<fuel_core_compression::ports::CoinInfo> {
let coin = self
.onchain_db
.storage_as_ref::<fuel_core_storage::tables::Coins>()
.get(&dbg!(utxo_id))?
.ok_or(not_found!(fuel_core_storage::tables::Coins))?;
Ok(fuel_core_compression::ports::CoinInfo {
owner: *coin.owner(),
asset_id: *coin.asset_id(),
amount: *coin.amount(),
})
}

fn message(
&self,
nonce: fuel_core_types::fuel_types::Nonce,
) -> anyhow::Result<fuel_core_compression::ports::MessageInfo> {
let message = self
.onchain_db
.storage_as_ref::<fuel_core_storage::tables::Messages>()
.get(&nonce)?
.ok_or(not_found!(fuel_core_storage::tables::Messages))?;
Ok(fuel_core_compression::ports::MessageInfo {
sender: *message.sender(),
recipient: *message.recipient(),
amount: message.amount(),
data: message.data().clone(),
})
}
}
Loading

0 comments on commit fb37322

Please sign in to comment.