diff --git a/crates/chain-config/src/config/coin.rs b/crates/chain-config/src/config/coin.rs index d24297a54f2..f61d1b474c5 100644 --- a/crates/chain-config/src/config/coin.rs +++ b/crates/chain-config/src/config/coin.rs @@ -78,7 +78,7 @@ impl From for TableEntry { /// Generates a new coin config with a unique utxo id for testing #[derive(Default, Debug)] pub struct CoinConfigGenerator { - count: usize, + count: u16, } impl CoinConfigGenerator { @@ -88,10 +88,12 @@ impl CoinConfigGenerator { pub fn generate(&mut self) -> CoinConfig { let mut bytes = [0u8; 32]; - bytes[..std::mem::size_of::()].copy_from_slice(&self.count.to_be_bytes()); + bytes[..std::mem::size_of_val(&self.count)] + .copy_from_slice(&self.count.to_be_bytes()); let config = CoinConfig { tx_id: Bytes32::from(bytes), + tx_pointer_tx_idx: self.count, ..Default::default() }; self.count = self.count.checked_add(1).expect("Max coin count reached"); diff --git a/crates/fuel-core/src/combined_database.rs b/crates/fuel-core/src/combined_database.rs index a5f527969fd..9ff5f9d8645 100644 --- a/crates/fuel-core/src/combined_database.rs +++ b/crates/fuel-core/src/combined_database.rs @@ -94,6 +94,19 @@ impl CombinedDatabase { }) } + /// A test-only temporary rocksdb database with given rewind policy. + #[cfg(feature = "rocksdb")] + pub fn from_state_rewind_policy( + state_rewind_policy: StateRewindPolicy, + ) -> DatabaseResult { + Ok(Self { + on_chain: Database::rocksdb_temp(state_rewind_policy)?, + off_chain: Database::rocksdb_temp(state_rewind_policy)?, + relayer: Database::rocksdb_temp(state_rewind_policy)?, + gas_price: Default::default(), + }) + } + pub fn from_config(config: &CombinedDatabaseConfig) -> DatabaseResult { let combined_database = match config.database_type { #[cfg(feature = "rocksdb")] @@ -103,7 +116,9 @@ impl CombinedDatabase { tracing::warn!( "No RocksDB path configured, initializing database with a tmp directory" ); - CombinedDatabase::default() + CombinedDatabase::from_state_rewind_policy( + config.state_rewind_policy, + )? } else { tracing::info!( "Opening database {:?} with cache size \"{}\" and state rewind policy \"{:?}\"", diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index 851ebd08b2a..9acd2e5d07f 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -240,18 +240,18 @@ where } #[cfg(feature = "rocksdb")] - pub fn rocksdb_temp() -> Self { - let db = RocksDb::>::default_open_temp(None).unwrap(); - let historical_db = - HistoricalRocksDB::new(db, StateRewindPolicy::NoRewind).unwrap(); + pub fn rocksdb_temp(rewind_policy: StateRewindPolicy) -> Result { + let db = RocksDb::>::default_open_temp(None)?; + let historical_db = HistoricalRocksDB::new(db, rewind_policy)?; let data = Arc::new(historical_db); - Self::from_storage(DataSource::new(data, Stage::default())) + Ok(Self::from_storage(DataSource::new(data, Stage::default()))) } } /// Construct an ephemeral database /// uses rocksdb when rocksdb features are enabled /// uses in-memory when rocksdb features are disabled +/// Panics if the database creation fails impl Default for Database where Description: DatabaseDescription, @@ -264,7 +264,8 @@ where } #[cfg(feature = "rocksdb")] { - Self::rocksdb_temp() + Self::rocksdb_temp(StateRewindPolicy::NoRewind) + .expect("Failed to create a temporary database") } } } diff --git a/crates/fuel-core/src/graphql_api.rs b/crates/fuel-core/src/graphql_api.rs index 8df7d16b3ac..7cc75301bf3 100644 --- a/crates/fuel-core/src/graphql_api.rs +++ b/crates/fuel-core/src/graphql_api.rs @@ -8,7 +8,7 @@ use std::{ }; pub mod api_service; -mod da_compression; +pub mod da_compression; pub mod database; pub(crate) mod metrics_extension; pub mod ports; diff --git a/crates/fuel-core/src/graphql_api/da_compression.rs b/crates/fuel-core/src/graphql_api/da_compression.rs index e9d11d1c22e..14ba824eaf8 100644 --- a/crates/fuel-core/src/graphql_api/da_compression.rs +++ b/crates/fuel-core/src/graphql_api/da_compression.rs @@ -14,14 +14,21 @@ use fuel_core_compression::{ config::Config, ports::{ EvictorDb, + HistoryLookup, TemporalRegistry, UtxoIdToPointer, }, }; use fuel_core_storage::{ not_found, + tables::{ + Coins, + FuelBlocks, + Messages, + }, StorageAsMut, StorageAsRef, + StorageInspect, }; use fuel_core_types::{ blockchain::block::Block, @@ -49,8 +56,8 @@ where { let compressed = compress( config, - CompressTx { - db_tx, + CompressDbTx { + db_tx: DbTx { db_tx }, block_events, }, block, @@ -65,14 +72,23 @@ where Ok(()) } -struct CompressTx<'a, Tx> { - db_tx: &'a mut Tx, +pub struct DbTx<'a, Tx> { + pub db_tx: &'a mut Tx, +} + +struct CompressDbTx<'a, Tx> { + db_tx: DbTx<'a, Tx>, block_events: &'a [Event], } +pub struct DecompressDbTx<'a, Tx, Onchain> { + pub db_tx: DbTx<'a, Tx>, + pub onchain_db: Onchain, +} + macro_rules! impl_temporal_registry { ($type:ty) => { paste::paste! { - impl<'a, Tx> TemporalRegistry<$type> for CompressTx<'a, Tx> + impl<'a, Tx> TemporalRegistry<$type> for DbTx<'a, Tx> where Tx: OffChainDatabaseTransaction, { @@ -150,6 +166,78 @@ macro_rules! impl_temporal_registry { } } + impl<'a, Tx> TemporalRegistry<$type> for CompressTx<'a, Tx> + where + Tx: OffChainDatabaseTransaction, + { + fn read_registry( + &self, + key: &fuel_core_types::fuel_compression::RegistryKey, + ) -> anyhow::Result<$type> { + self.db_tx.read_registry(key) + } + + fn read_timestamp( + &self, + key: &fuel_core_types::fuel_compression::RegistryKey, + ) -> anyhow::Result { + <_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key) + } + + fn write_registry( + &mut self, + key: &fuel_core_types::fuel_compression::RegistryKey, + value: &$type, + timestamp: Tai64, + ) -> anyhow::Result<()> { + self.db_tx.write_registry(key, value, timestamp) + } + + fn registry_index_lookup( + &self, + value: &$type, + ) -> anyhow::Result> + { + self.db_tx.registry_index_lookup(value) + } + } + + impl<'a, Tx, Offchain> TemporalRegistry<$type> for DecompressTx<'a, Tx, Offchain> + where + Tx: OffChainDatabaseTransaction, + { + fn read_registry( + &self, + key: &fuel_core_types::fuel_compression::RegistryKey, + ) -> anyhow::Result<$type> { + self.db_tx.read_registry(key) + } + + fn read_timestamp( + &self, + key: &fuel_core_types::fuel_compression::RegistryKey, + ) -> anyhow::Result { + <_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key) + } + + fn write_registry( + &mut self, + key: &fuel_core_types::fuel_compression::RegistryKey, + value: &$type, + timestamp: Tai64, + ) -> anyhow::Result<()> { + self.db_tx.write_registry(key, value, timestamp) + } + + fn registry_index_lookup( + &self, + value: &$type, + ) -> anyhow::Result> + { + self.db_tx.registry_index_lookup(value) + } + } + impl<'a, Tx> EvictorDb<$type> for CompressTx<'a, Tx> where Tx: OffChainDatabaseTransaction, @@ -158,7 +246,7 @@ macro_rules! impl_temporal_registry { &mut self, key: fuel_core_types::fuel_compression::RegistryKey, ) -> anyhow::Result<()> { - self.db_tx + self.db_tx.db_tx .storage_as_mut::() .insert(&MetadataKey::$type, &key)?; Ok(()) @@ -168,7 +256,7 @@ macro_rules! impl_temporal_registry { &self, ) -> anyhow::Result> { Ok(self - .db_tx + .db_tx.db_tx .storage_as_ref::() .get(&MetadataKey::$type)? .map(|v| v.into_owned()) @@ -185,7 +273,7 @@ impl_temporal_registry!(ContractId); impl_temporal_registry!(ScriptCode); impl_temporal_registry!(PredicateCode); -impl<'a, Tx> UtxoIdToPointer for CompressTx<'a, Tx> +impl<'a, Tx> UtxoIdToPointer for CompressDbTx<'a, Tx> where Tx: OffChainDatabaseTransaction, { @@ -210,3 +298,78 @@ where anyhow::bail!("UtxoId not found in the block events"); } } + +impl<'a, Tx, Onchain> HistoryLookup for DecompressDbTx<'a, Tx, Onchain> +where + Tx: OffChainDatabaseTransaction, + Onchain: StorageInspect + + StorageInspect + + StorageInspect, +{ + fn utxo_id( + &self, + c: fuel_core_types::fuel_tx::CompressedUtxoId, + ) -> anyhow::Result { + if c.tx_pointer.block_height() == 0u32.into() { + // This is a genesis coin, which is handled differently. + // See CoinConfigGenerator::generate which generates the genesis coins. + let mut bytes = [0u8; 32]; + let tx_index = c.tx_pointer.tx_index(); + bytes[..std::mem::size_of_val(&tx_index)] + .copy_from_slice(&tx_index.to_be_bytes()); + return Ok(fuel_core_types::fuel_tx::UtxoId::new( + fuel_core_types::fuel_tx::TxId::from(bytes), + 0, + )); + } + + let block_info = self + .onchain_db + .storage_as_ref::() + .get(&c.tx_pointer.block_height())? + .ok_or(not_found!(fuel_core_storage::tables::FuelBlocks))?; + + let tx_id = *block_info + .transactions() + .get(c.tx_pointer.tx_index() as usize) + .ok_or(anyhow::anyhow!( + "Transaction not found in the block: {:?}", + c.tx_pointer + ))?; + + Ok(fuel_core_types::fuel_tx::UtxoId::new(tx_id, c.output_index)) + } + + fn coin( + &self, + utxo_id: fuel_core_types::fuel_tx::UtxoId, + ) -> anyhow::Result { + let coin = self + .onchain_db + .storage_as_ref::() + .get(&dbg!(utxo_id))? + .ok_or(not_found!(fuel_core_storage::tables::Coins))?; + Ok(fuel_core_compression::ports::CoinInfo { + owner: *coin.owner(), + asset_id: *coin.asset_id(), + amount: *coin.amount(), + }) + } + + fn message( + &self, + nonce: fuel_core_types::fuel_types::Nonce, + ) -> anyhow::Result { + let message = self + .onchain_db + .storage_as_ref::() + .get(&nonce)? + .ok_or(not_found!(fuel_core_storage::tables::Messages))?; + Ok(fuel_core_compression::ports::MessageInfo { + sender: *message.sender(), + recipient: *message.recipient(), + amount: message.amount(), + data: message.data().clone(), + }) + } +} diff --git a/tests/tests/da_compression.rs b/tests/tests/da_compression.rs index 7bc3c4e03de..0ec99f526fc 100644 --- a/tests/tests/da_compression.rs +++ b/tests/tests/da_compression.rs @@ -1,7 +1,13 @@ use core::time::Duration; use fuel_core::{ - combined_database::CombinedDatabase, - fuel_core_graphql_api::worker_service::DaCompressionConfig, + chain_config::TESTNET_WALLET_SECRETS, + fuel_core_graphql_api::{ + da_compression::{ + DbTx, + DecompressDbTx, + }, + worker_service::DaCompressionConfig, + }, p2p_test_helpers::*, service::{ Config, @@ -9,11 +15,19 @@ use fuel_core::{ }, }; use fuel_core_client::client::{ + pagination::PaginationRequest, types::TransactionStatus, FuelClient, }; -use fuel_core_compression::VersionedCompressedBlock; +use fuel_core_compression::{ + decompress::decompress, + VersionedCompressedBlock, +}; use fuel_core_poa::signer::SignMode; +use fuel_core_storage::transactional::{ + HistoricalView, + IntoTransaction, +}; use fuel_core_types::{ fuel_asm::{ op, @@ -21,9 +35,11 @@ use fuel_core_types::{ }, fuel_crypto::SecretKey, fuel_tx::{ + Address, GasCosts, Input, TransactionBuilder, + TxPointer, }, secrecy::Secret, }; @@ -31,30 +47,56 @@ use rand::{ rngs::StdRng, SeedableRng, }; +use std::str::FromStr; #[tokio::test] async fn can_fetch_da_compressed_block_from_graphql() { let mut rng = StdRng::seed_from_u64(10); let poa_secret = SecretKey::random(&mut rng); - let db = CombinedDatabase::default(); let mut config = Config::local_node(); config.consensus_signer = SignMode::Key(Secret::new(poa_secret.into())); + config.utxo_validation = true; let compression_config = fuel_core_compression::Config { temporal_registry_retention: Duration::from_secs(3600), }; config.da_compression = DaCompressionConfig::Enabled(compression_config); - let srv = FuelService::from_combined_database(db.clone(), config) - .await - .unwrap(); + let srv = FuelService::new_node(config).await.unwrap(); let client = FuelClient::from(srv.bound_address); + let wallet_secret = + SecretKey::from_str(TESTNET_WALLET_SECRETS[1]).expect("Expected valid secret"); + let wallet_address = Address::from(*wallet_secret.public_key().hash()); + + let coin = client + .coins( + &wallet_address, + None, + PaginationRequest { + cursor: None, + results: 1, + direction: fuel_core_client::client::pagination::PageDirection::Forward, + }, + ) + .await + .expect("Unable to get coins") + .results + .into_iter() + .next() + .expect("Expected at least one coin"); + let tx = TransactionBuilder::script([op::ret(RegId::ONE)].into_iter().collect(), vec![]) .max_fee_limit(0) .script_gas_limit(1_000_000) .with_gas_costs(GasCosts::free()) - .add_random_fee_input() + .add_unsigned_coin_input( + wallet_secret, + coin.utxo_id, + coin.amount, + coin.asset_id, + TxPointer::new(coin.block_created.into(), coin.tx_created_idx), + ) .finalize_as_transaction(); let status = client.submit_and_await_commit(&tx).await.unwrap(); @@ -68,7 +110,19 @@ async fn can_fetch_da_compressed_block_from_graphql() { let block = client.da_compressed_block(block_height).await.unwrap(); let block = block.expect("Unable to get compressed block"); - let _: VersionedCompressedBlock = postcard::from_bytes(&block).unwrap(); + let block: VersionedCompressedBlock = postcard::from_bytes(&block).unwrap(); + + let db = &srv.shared.database; + let mut tx_inner = db.off_chain().clone().into_transaction(); + let db_tx = DecompressDbTx { + db_tx: DbTx { + db_tx: &mut tx_inner, + }, + onchain_db: db.on_chain().view_at(&0u32.into()).unwrap(), + }; + let decompressed = decompress(compression_config, db_tx, block).await.unwrap(); + + assert!(decompressed.transactions.len() == 2); } #[tokio::test(flavor = "multi_thread")]