diff --git a/CHANGELOG.md b/CHANGELOG.md index 00fd7c17578..46daf9061d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] ### Added +- [1609](https://github.com/FuelLabs/fuel-core/pull/1609): Add DA compression support. Compressed blocks are stored in the offchain database when blocks are produced, and can be fetched using the GraphQL API. - [2290](https://github.com/FuelLabs/fuel-core/pull/2290): Added a new CLI argument `--graphql-max-directives`. The default value is `10`. - [2195](https://github.com/FuelLabs/fuel-core/pull/2195): Added enforcement of the limit on the size of the L2 transactions per block according to the `block_transaction_size_limit` parameter. - [2131](https://github.com/FuelLabs/fuel-core/pull/2131): Add flow in TxPool in order to ask to newly connected peers to share their transaction pool diff --git a/Cargo.lock b/Cargo.lock index d0ead48739d..6448b1b7168 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3149,6 +3149,17 @@ dependencies = [ "strum 0.24.1", ] +[[package]] +name = "fuel-compression" +version = "0.57.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83d62c1ba6352f7bbcf1f2abcdd1c1092c7f5155653d75243b915c4e9ae500b4" +dependencies = [ + "fuel-derive 0.57.1", + "fuel-types 0.57.1", + "serde", +] + [[package]] name = "fuel-core" version = "0.36.0" @@ -3163,6 +3174,7 @@ dependencies = [ "enum-iterator", "fuel-core", "fuel-core-chain-config", + "fuel-core-compression", "fuel-core-consensus-module", "fuel-core-database", "fuel-core-executor", @@ -3187,6 +3199,7 @@ dependencies = [ "itertools 0.12.1", "mockall", "num_cpus", + "paste", "postcard", "proptest", "rand", @@ -3262,6 +3275,7 @@ dependencies = [ "dotenvy", "fuel-core", "fuel-core-chain-config", + "fuel-core-compression", "fuel-core-poa", "fuel-core-storage", "fuel-core-types 0.36.0", @@ -3344,6 +3358,23 @@ dependencies = [ "tokio", ] +[[package]] +name = "fuel-core-compression" +version = "0.36.0" +dependencies = [ + "anyhow", + "fuel-core-compression", + "fuel-core-types 0.36.0", + "paste", + "postcard", + "proptest", + "rand", + "serde", + "strum 0.25.0", + "strum_macros 0.25.3", + "tokio", +] + [[package]] name = "fuel-core-consensus-module" version = "0.36.0" @@ -3669,6 +3700,7 @@ dependencies = [ "fuel-core-benches", "fuel-core-bin", "fuel-core-client", + "fuel-core-compression", "fuel-core-executor", "fuel-core-gas-price-service", "fuel-core-p2p", @@ -3685,6 +3717,7 @@ dependencies = [ "insta", "itertools 0.12.1", "k256", + "postcard", "pretty_assertions", "primitive-types", "proptest", @@ -3969,6 +4002,7 @@ dependencies = [ "derivative", "derive_more", "fuel-asm 0.57.1", + "fuel-compression", "fuel-crypto 0.57.1", "fuel-merkle 0.57.1", "fuel-types 0.57.1", @@ -4050,6 +4084,7 @@ dependencies = [ "derive_more", "ethnum", "fuel-asm 0.57.1", + "fuel-compression", "fuel-crypto 0.57.1", "fuel-merkle 0.57.1", "fuel-storage 0.57.1", diff --git a/Cargo.toml b/Cargo.toml index 26a7ad1abca..a6416d7b638 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "bin/keygen", "crates/chain-config", "crates/client", + "crates/compression", "crates/database", "crates/fuel-core", "crates/fuel-gas-price-algorithm", @@ -62,6 +63,7 @@ fuel-core-keygen = { version = "0.36.0", path = "./crates/keygen" } fuel-core-keygen-bin = { version = "0.36.0", path = "./bin/keygen" } fuel-core-chain-config = { version = "0.36.0", path = "./crates/chain-config", default-features = false } fuel-core-client = { version = "0.36.0", path = "./crates/client" } +fuel-core-compression = { version = "0.36.0", path = "./crates/compression" } fuel-core-database = { version = "0.36.0", path = "./crates/database" } fuel-core-metrics = { version = "0.36.0", path = "./crates/metrics" } fuel-core-services = { version = "0.36.0", path = "./crates/services" } @@ -131,6 +133,7 @@ test-strategy = "0.3" parquet = { version = "49.0", default-features = false } rayon = "1.10.0" bytes = "1.5.0" +paste = "1.0" pretty_assertions = "1.4.0" proptest = "1.1" pin-project-lite = "0.2" diff --git a/bin/fuel-core/Cargo.toml b/bin/fuel-core/Cargo.toml index a15848bb37e..aa111e7637c 100644 --- a/bin/fuel-core/Cargo.toml +++ b/bin/fuel-core/Cargo.toml @@ -27,6 +27,7 @@ dirs = "4.0" dotenvy = { version = "0.15", optional = true } fuel-core = { workspace = true, features = ["wasm-executor"] } fuel-core-chain-config = { workspace = true } +fuel-core-compression = { workspace = true } fuel-core-poa = { workspace = true } fuel-core-types = { workspace = true, features = ["std"] } hex = { workspace = true } diff --git a/bin/fuel-core/src/cli/run.rs b/bin/fuel-core/src/cli/run.rs index 8d6ceb2f8d7..0231ac9fe36 100644 --- a/bin/fuel-core/src/cli/run.rs +++ b/bin/fuel-core/src/cli/run.rs @@ -20,7 +20,10 @@ use fuel_core::{ CombinedDatabase, CombinedDatabaseConfig, }, - fuel_core_graphql_api::ServiceConfig as GraphQLConfig, + fuel_core_graphql_api::{ + worker_service::DaCompressionConfig, + ServiceConfig as GraphQLConfig, + }, producer::Config as ProducerConfig, service::{ config::Trigger, @@ -190,6 +193,11 @@ pub struct Command { #[cfg(feature = "aws-kms")] pub consensus_aws_kms: Option, + /// If given, the node will produce and store da-compressed blocks + /// with the given retention time. + #[arg(long = "da-compression", env)] + pub da_compression: Option, + /// A new block is produced instantly when transactions are available. #[clap(flatten)] pub poa_trigger: PoATriggerArgs, @@ -272,6 +280,7 @@ impl Command { consensus_key, #[cfg(feature = "aws-kms")] consensus_aws_kms, + da_compression, poa_trigger, predefined_blocks_path, coinbase_recipient, @@ -418,6 +427,15 @@ impl Command { let block_importer = fuel_core::service::config::fuel_core_importer::Config::new(); + let da_compression = match da_compression { + Some(retention) => { + DaCompressionConfig::Enabled(fuel_core_compression::Config { + temporal_registry_retention: retention.into(), + }) + } + None => DaCompressionConfig::Disabled, + }; + let TxPoolArgs { tx_pool_ttl, tx_max_number, @@ -476,6 +494,7 @@ impl Command { min_gas_price, gas_price_threshold_percent, block_importer, + da_compression, #[cfg(feature = "relayer")] relayer: relayer_cfg, #[cfg(feature = "p2p")] diff --git a/crates/client/assets/schema.sdl b/crates/client/assets/schema.sdl index c7692a11adb..67287341c38 100644 --- a/crates/client/assets/schema.sdl +++ b/crates/client/assets/schema.sdl @@ -284,6 +284,10 @@ enum ContractParametersVersion { V1 } +type DaCompressedBlock { + bytes: HexString! +} + union DependentCost = LightOperation | HeavyOperation type DryRunFailureStatus { @@ -942,6 +946,12 @@ type Query { """ excludedIds: ExcludeInput ): [[CoinType!]!]! + daCompressedBlock( + """ + Height of the block + """ + height: U32! + ): DaCompressedBlock contract( """ ID of the Contract diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index dfe61d865fe..2fabf630ccb 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -81,6 +81,7 @@ use schema::{ block::BlockByIdArgs, coins::CoinByIdArgs, contract::ContractByIdArgs, + da_compressed::DaCompressedBlockByHeightArgs, tx::{ TxArg, TxIdArgs, @@ -880,6 +881,23 @@ impl FuelClient { Ok(block) } + pub async fn da_compressed_block( + &self, + height: BlockHeight, + ) -> io::Result>> { + let query = schema::da_compressed::DaCompressedBlockByHeightQuery::build( + DaCompressedBlockByHeightArgs { + height: U32(height.into()), + }, + ); + + Ok(self + .query(query) + .await? + .da_compressed_block + .map(|b| b.bytes.into())) + } + /// Retrieve a blob by its ID pub async fn blob(&self, id: BlobId) -> io::Result> { let query = schema::blob::BlobByIdQuery::build(BlobByIdArgs { id: id.into() }); diff --git a/crates/client/src/client/schema.rs b/crates/client/src/client/schema.rs index 648304425aa..7930d66c1ab 100644 --- a/crates/client/src/client/schema.rs +++ b/crates/client/src/client/schema.rs @@ -32,6 +32,7 @@ pub mod block; pub mod chain; pub mod coins; pub mod contract; +pub mod da_compressed; pub mod message; pub mod node_info; pub mod upgrades; diff --git a/crates/client/src/client/schema/da_compressed.rs b/crates/client/src/client/schema/da_compressed.rs new file mode 100644 index 00000000000..73a131f1d32 --- /dev/null +++ b/crates/client/src/client/schema/da_compressed.rs @@ -0,0 +1,44 @@ +use crate::client::schema::{ + schema, + U32, +}; + +use super::HexString; + +#[derive(cynic::QueryVariables, Debug)] +pub struct DaCompressedBlockByHeightArgs { + pub height: U32, +} + +#[derive(cynic::QueryFragment, Clone, Debug)] +#[cynic( + schema_path = "./assets/schema.sdl", + graphql_type = "Query", + variables = "DaCompressedBlockByHeightArgs" +)] +pub struct DaCompressedBlockByHeightQuery { + #[arguments(height: $height)] + pub da_compressed_block: Option, +} + +/// Block with transaction ids +#[derive(cynic::QueryFragment, Clone, Debug)] +#[cynic(schema_path = "./assets/schema.sdl")] +pub struct DaCompressedBlock { + pub bytes: HexString, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn block_by_height_query_gql_output() { + use cynic::QueryBuilder; + let operation = + DaCompressedBlockByHeightQuery::build(DaCompressedBlockByHeightArgs { + height: U32(0), + }); + insta::assert_snapshot!(operation.query) + } +} diff --git a/crates/client/src/client/schema/snapshots/fuel_core_client__client__schema__da_compressed__tests__block_by_height_query_gql_output.snap b/crates/client/src/client/schema/snapshots/fuel_core_client__client__schema__da_compressed__tests__block_by_height_query_gql_output.snap new file mode 100644 index 00000000000..62f797d34e8 --- /dev/null +++ b/crates/client/src/client/schema/snapshots/fuel_core_client__client__schema__da_compressed__tests__block_by_height_query_gql_output.snap @@ -0,0 +1,9 @@ +--- +source: crates/client/src/client/schema/da_compressed.rs +expression: operation.query +--- +query($height: U32!) { + daCompressedBlock(height: $height) { + bytes + } +} diff --git a/crates/compression/Cargo.toml b/crates/compression/Cargo.toml new file mode 100644 index 00000000000..40e092d1e00 --- /dev/null +++ b/crates/compression/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "fuel-core-compression" +version = { workspace = true } +authors = { workspace = true } +categories = ["cryptography::cryptocurrencies"] +edition = { workspace = true } +homepage = { workspace = true } +keywords = [ + "blockchain", + "cryptocurrencies", + "fuel-core", + "fuel-client", + "fuel-compression", +] +license = { workspace = true } +repository = { workspace = true } +description = "Compression and decompression of Fuel blocks for DA storage." + +[dependencies] +anyhow = { workspace = true } +fuel-core-types = { workspace = true, features = [ + "alloc", + "serde", + "da-compression", +] } +paste = { workspace = true } +rand = { workspace = true, optional = true } +serde = { version = "1.0", features = ["derive"] } +strum = { workspace = true } +strum_macros = { workspace = true } + +[dev-dependencies] +fuel-core-compression = { path = ".", features = ["test-helpers"] } +postcard = { version = "1.0", features = ["use-std"] } +proptest = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } + +[features] +test-helpers = [ + "dep:rand", + "fuel-core-types/test-helpers", + "fuel-core-types/random", + "fuel-core-types/std", +] diff --git a/crates/compression/README.md b/crates/compression/README.md new file mode 100644 index 00000000000..0768e6a1287 --- /dev/null +++ b/crates/compression/README.md @@ -0,0 +1,27 @@ +# Compression and decompression of transactions for the DA layer + +## Compressed block header + +Each compressed block begins with a version field, so that it's possible to change the format later. + +## Temporal registry + +This crate provides offchain registries for different types such as `AssetId`, `ContractId`, scripts, and predicates. Each registry is a key-value store with three-byte key. The registries are essentially compression caches. The three byte key allows cache size of 16 million values before reregistering the older values. + +The registries allow replacing repeated objects with their respective keys, so if an object +is used multiple times in a short interval (couple of months, maybe), then the full value +exists on only a single uncompressed block. + +### Fraud proofs + +Compressed block will contain a merkle root over all compression smts, followed by newly registered values along with their keys. Using an SMT provides flexibility around the algorithm we use to define keys without knowing how exactly values were chosen to be registered. + +Each registry also uses an SMT. Since the keys are three bytes long, the depth of the SMT is capped at 24 levels. + +## Compression of `UtxoIds` + +Since each `UtxoId` only appears once, there's no point in registering them. Instead, they are replaced with `TxPointer` and output index, which are still unique. + +### Fraud proofs + +During fraud proofs we need to use the `prev_root` to prove that the referenced block height is part of the chain. diff --git a/crates/compression/src/compress.rs b/crates/compression/src/compress.rs new file mode 100644 index 00000000000..0ad14e39f55 --- /dev/null +++ b/crates/compression/src/compress.rs @@ -0,0 +1,247 @@ +use crate::{ + config::Config, + eviction_policy::CacheEvictor, + ports::{ + EvictorDb, + TemporalRegistry, + UtxoIdToPointer, + }, + registry::{ + EvictorDbAll, + PerRegistryKeyspace, + RegistrationsPerTable, + TemporalRegistryAll, + }, + CompressedBlockPayloadV0, + VersionedCompressedBlock, +}; +use anyhow::Context; +use fuel_core_types::{ + blockchain::block::Block, + fuel_compression::{ + CompressibleBy, + ContextError, + RegistryKey, + }, + fuel_tx::{ + input::PredicateCode, + CompressedUtxoId, + ScriptCode, + TxPointer, + UtxoId, + }, + fuel_types::{ + Address, + AssetId, + ContractId, + }, + tai64::Tai64, +}; +use std::collections::{ + HashMap, + HashSet, +}; + +pub trait CompressDb: TemporalRegistryAll + EvictorDbAll + UtxoIdToPointer {} +impl CompressDb for T where T: TemporalRegistryAll + EvictorDbAll + UtxoIdToPointer {} + +/// This must be called for all new blocks in sequence, otherwise the result will be garbage, since +/// the registry is valid for only the current block height. On any other height you could be +/// referring to keys that have already been overwritten, or have not been written to yet. +pub async fn compress( + config: Config, + mut db: D, + block: &Block, +) -> anyhow::Result +where + D: CompressDb, +{ + let target = block.transactions_vec(); + + let mut prepare_ctx = PrepareCtx { + config, + timestamp: block.header().time(), + db: &mut db, + accessed_keys: Default::default(), + }; + let _ = target.compress_with(&mut prepare_ctx).await?; + + let mut ctx = prepare_ctx.into_compression_context()?; + let transactions = target.compress_with(&mut ctx).await?; + let registrations: RegistrationsPerTable = ctx.finalize()?; + + Ok(VersionedCompressedBlock::V0(CompressedBlockPayloadV0 { + registrations, + header: block.header().into(), + transactions, + })) +} + +/// Preparation pass through the block to collect all keys accessed during compression. +/// Returns dummy values. The resulting "compressed block" should be discarded. +struct PrepareCtx { + config: Config, + /// Current timestamp + timestamp: Tai64, + /// Database handle + db: D, + /// Keys accessed during the compression. + accessed_keys: PerRegistryKeyspace>, +} + +impl ContextError for PrepareCtx { + type Error = anyhow::Error; +} + +impl CompressibleBy> for UtxoId +where + D: CompressDb, +{ + async fn compress_with( + &self, + _ctx: &mut PrepareCtx, + ) -> anyhow::Result { + Ok(CompressedUtxoId { + tx_pointer: TxPointer::default(), + output_index: 0, + }) + } +} + +#[derive(Debug)] +struct CompressCtxKeyspace { + /// Cache evictor state for this keyspace + cache_evictor: CacheEvictor, + /// Changes to the temporary registry, to be included in the compressed block header + changes: HashMap, + /// Reverse lookup into changes + changes_lookup: HashMap, +} + +macro_rules! compression { + ($($ident:ty: $type:ty),*) => { paste::paste! { + pub struct CompressCtx { + config: Config, + timestamp: Tai64, + db: D, + $($ident: CompressCtxKeyspace<$type>,)* + } + + impl PrepareCtx where D: CompressDb { + /// Converts the preparation context into a [`CompressCtx`] + /// keeping accessed keys to avoid its eviction during compression. + /// Initializes the cache evictors from the database, which may fail. + pub fn into_compression_context(mut self) -> anyhow::Result> { + Ok(CompressCtx { + $( + $ident: CompressCtxKeyspace { + changes: Default::default(), + changes_lookup: Default::default(), + cache_evictor: CacheEvictor::new_from_db(&mut self.db, self.accessed_keys.$ident.into())?, + }, + )* + config: self.config, + timestamp: self.timestamp, + db: self.db, + }) + } + } + + impl CompressCtx where D: CompressDb { + /// Finalizes the compression context, returning the changes to the registry. + /// Commits the registrations and cache evictor states to the database. + fn finalize(mut self) -> anyhow::Result { + let mut registrations = RegistrationsPerTable::default(); + $( + self.$ident.cache_evictor.commit(&mut self.db)?; + for (key, value) in self.$ident.changes.into_iter() { + registrations.$ident.push((key, value)); + } + )* + registrations.write_to_registry(&mut self.db, self.timestamp)?; + Ok(registrations) + } + } + + $( + impl CompressibleBy> for $type + where + D: TemporalRegistry<$type> + EvictorDb<$type> + { + async fn compress_with( + &self, + ctx: &mut PrepareCtx, + ) -> anyhow::Result { + if *self == <$type>::default() { + return Ok(RegistryKey::ZERO); + } + if let Some(found) = ctx.db.registry_index_lookup(self)? { + if !ctx.accessed_keys.$ident.contains(&found) { + let key_timestamp = ctx.db.read_timestamp(&found) + .context("Database invariant violated: no timestamp stored but key found")?; + if ctx.config.is_timestamp_accessible(ctx.timestamp, key_timestamp)? { + ctx.accessed_keys.$ident.insert(found); + } + } + } + Ok(RegistryKey::ZERO) + } + } + + impl CompressibleBy> for $type + where + D: TemporalRegistry<$type> + EvictorDb<$type> + { + async fn compress_with( + &self, + ctx: &mut CompressCtx, + ) -> anyhow::Result { + if self == &Default::default() { + return Ok(RegistryKey::DEFAULT_VALUE); + } + if let Some(found) = ctx.$ident.changes_lookup.get(self) { + return Ok(*found); + } + if let Some(found) = ctx.db.registry_index_lookup(self)? { + let key_timestamp = ctx.db.read_timestamp(&found) + .context("Database invariant violated: no timestamp stored but key found")?; + if ctx.config.is_timestamp_accessible(ctx.timestamp, key_timestamp)? { + return Ok(found); + } + } + + let key = ctx.$ident.cache_evictor.next_key(); + let old = ctx.$ident.changes.insert(key, self.clone()); + let old_rev = ctx.$ident.changes_lookup.insert(self.clone(), key); + debug_assert!(old.is_none(), "Key collision in registry substitution"); + debug_assert!(old_rev.is_none(), "Key collision in registry substitution"); + Ok(key) + } + } + )* + }}; +} + +compression!( + address: Address, + asset_id: AssetId, + contract_id: ContractId, + script_code: ScriptCode, + predicate_code: PredicateCode +); + +impl ContextError for CompressCtx { + type Error = anyhow::Error; +} + +impl CompressibleBy> for UtxoId +where + D: CompressDb, +{ + async fn compress_with( + &self, + ctx: &mut CompressCtx, + ) -> anyhow::Result { + ctx.db.lookup(*self) + } +} diff --git a/crates/compression/src/config.rs b/crates/compression/src/config.rs new file mode 100644 index 00000000000..cc111119c59 --- /dev/null +++ b/crates/compression/src/config.rs @@ -0,0 +1,33 @@ +use core::time::Duration; + +use fuel_core_types::tai64::{ + Tai64, + Tai64N, +}; + +#[derive(Debug, Clone, Copy)] +pub struct Config { + /// How long entries in the temporal registry are valid. + /// After this time has passed, the entry is considered stale and must not be used. + /// If the value is needed again, it must be re-registered. + pub temporal_registry_retention: Duration, +} + +impl Config { + /// Given timestamp of the current block and a key in an older block, + /// is the key is still accessible? + /// Returns error if the arguments are not valid block timestamps, + /// or if the block is older than the key. + pub fn is_timestamp_accessible( + &self, + block_timestamp: Tai64, + key_timestamp: Tai64, + ) -> anyhow::Result { + let block = Tai64N(block_timestamp, 0); + let key = Tai64N(key_timestamp, 0); + let duration = block + .duration_since(&key) + .map_err(|_| anyhow::anyhow!("Invalid timestamp ordering"))?; + Ok(duration <= self.temporal_registry_retention) + } +} diff --git a/crates/compression/src/decompress.rs b/crates/compression/src/decompress.rs new file mode 100644 index 00000000000..15565ce8433 --- /dev/null +++ b/crates/compression/src/decompress.rs @@ -0,0 +1,336 @@ +use crate::{ + config::Config, + ports::{ + HistoryLookup, + TemporalRegistry, + }, + registry::TemporalRegistryAll, + VersionedCompressedBlock, +}; +use fuel_core_types::{ + blockchain::block::PartialFuelBlock, + fuel_compression::{ + Compressible, + ContextError, + Decompress, + DecompressibleBy, + RegistryKey, + }, + fuel_tx::{ + input::{ + coin::{ + Coin, + CoinSpecification, + }, + message::{ + Message, + MessageSpecification, + }, + AsField, + PredicateCode, + }, + AssetId, + CompressedUtxoId, + Mint, + ScriptCode, + Transaction, + UtxoId, + }, + fuel_types::{ + Address, + ContractId, + }, + tai64::Tai64, +}; + +pub trait DecompressDb: TemporalRegistryAll + HistoryLookup {} +impl DecompressDb for T where T: TemporalRegistryAll + HistoryLookup {} + +/// This must be called for all decompressed blocks in sequence, otherwise the result will be garbage. +pub async fn decompress( + config: Config, + mut db: D, + block: VersionedCompressedBlock, +) -> anyhow::Result +where + D: DecompressDb, +{ + let VersionedCompressedBlock::V0(compressed) = block; + + // TODO: merkle root verification: https://github.com/FuelLabs/fuel-core/issues/2232 + + compressed + .registrations + .write_to_registry(&mut db, compressed.header.consensus.time)?; + + let ctx = DecompressCtx { + config, + timestamp: compressed.header.consensus.time, + db, + }; + + let transactions = as DecompressibleBy<_>>::decompress_with( + compressed.transactions, + &ctx, + ) + .await?; + + Ok(PartialFuelBlock { + header: compressed.header, + transactions, + }) +} + +pub struct DecompressCtx { + pub config: Config, + /// Timestamp of the block being decompressed + pub timestamp: Tai64, + pub db: D, +} + +impl ContextError for DecompressCtx { + type Error = anyhow::Error; +} + +impl DecompressibleBy> for UtxoId +where + D: HistoryLookup, +{ + async fn decompress_with( + c: CompressedUtxoId, + ctx: &DecompressCtx, + ) -> anyhow::Result { + ctx.db.utxo_id(c) + } +} + +macro_rules! decompress_impl { + ($($type:ty),*) => { paste::paste! { + $( + impl DecompressibleBy> for $type + where + D: TemporalRegistry<$type> + { + async fn decompress_with( + key: RegistryKey, + ctx: &DecompressCtx, + ) -> anyhow::Result { + if key == RegistryKey::DEFAULT_VALUE { + return Ok(<$type>::default()); + } + let key_timestamp = ctx.db.read_timestamp(&key)?; + if !ctx.config.is_timestamp_accessible(ctx.timestamp, key_timestamp)? { + anyhow::bail!("Timestamp not accessible"); + } + ctx.db.read_registry(&key) + } + } + )* + }}; +} + +decompress_impl!(AssetId, ContractId, Address, PredicateCode, ScriptCode); + +impl DecompressibleBy> for Coin +where + D: DecompressDb, + Specification: CoinSpecification, + Specification::Predicate: DecompressibleBy>, + Specification::PredicateData: DecompressibleBy>, + Specification::PredicateGasUsed: DecompressibleBy>, + Specification::Witness: DecompressibleBy>, +{ + async fn decompress_with( + c: as Compressible>::Compressed, + ctx: &DecompressCtx, + ) -> anyhow::Result> { + let utxo_id = UtxoId::decompress_with(c.utxo_id, ctx).await?; + let coin_info = ctx.db.coin(utxo_id)?; + let witness_index = c.witness_index.decompress(ctx).await?; + let predicate_gas_used = c.predicate_gas_used.decompress(ctx).await?; + let predicate = c.predicate.decompress(ctx).await?; + let predicate_data = c.predicate_data.decompress(ctx).await?; + Ok(Self { + utxo_id, + owner: coin_info.owner, + amount: coin_info.amount, + asset_id: coin_info.asset_id, + tx_pointer: Default::default(), + witness_index, + predicate_gas_used, + predicate, + predicate_data, + }) + } +} + +impl DecompressibleBy> for Message +where + D: DecompressDb, + Specification: MessageSpecification, + Specification::Data: DecompressibleBy> + Default, + Specification::Predicate: DecompressibleBy>, + Specification::PredicateData: DecompressibleBy>, + Specification::PredicateGasUsed: DecompressibleBy>, + Specification::Witness: DecompressibleBy>, +{ + async fn decompress_with( + c: as Compressible>::Compressed, + ctx: &DecompressCtx, + ) -> anyhow::Result> { + let msg = ctx.db.message(c.nonce)?; + let witness_index = c.witness_index.decompress(ctx).await?; + let predicate_gas_used = c.predicate_gas_used.decompress(ctx).await?; + let predicate = c.predicate.decompress(ctx).await?; + let predicate_data = c.predicate_data.decompress(ctx).await?; + let mut message: Message = Message { + sender: msg.sender, + recipient: msg.recipient, + amount: msg.amount, + nonce: c.nonce, + witness_index, + predicate_gas_used, + data: Default::default(), + predicate, + predicate_data, + }; + + if let Some(data) = message.data.as_mut_field() { + data.clone_from(&msg.data) + } + + Ok(message) + } +} + +impl DecompressibleBy> for Mint +where + D: DecompressDb, +{ + async fn decompress_with( + c: Self::Compressed, + ctx: &DecompressCtx, + ) -> anyhow::Result { + Ok(Transaction::mint( + Default::default(), // TODO: what should this we do with this? + c.input_contract.decompress(ctx).await?, + c.output_contract.decompress(ctx).await?, + c.mint_amount.decompress(ctx).await?, + c.mint_asset_id.decompress(ctx).await?, + c.gas_price.decompress(ctx).await?, + )) + } +} + +#[cfg(test)] +mod tests { + use crate::ports::{ + EvictorDb, + TemporalRegistry, + }; + + use super::*; + use fuel_core_types::{ + fuel_compression::RegistryKey, + fuel_tx::{ + input::PredicateCode, + Address, + AssetId, + ContractId, + ScriptCode, + }, + }; + use serde::{ + Deserialize, + Serialize, + }; + + pub struct MockDb; + impl HistoryLookup for MockDb { + fn utxo_id(&self, _: CompressedUtxoId) -> anyhow::Result { + unimplemented!() + } + + fn coin(&self, _: UtxoId) -> anyhow::Result { + unimplemented!() + } + + fn message( + &self, + _: fuel_core_types::fuel_types::Nonce, + ) -> anyhow::Result { + unimplemented!() + } + } + macro_rules! mock_temporal { + ($type:ty) => { + impl TemporalRegistry<$type> for MockDb { + fn read_registry(&self, _key: &RegistryKey) -> anyhow::Result<$type> { + unimplemented!() + } + + fn read_timestamp(&self, _key: &RegistryKey) -> anyhow::Result { + unimplemented!() + } + + fn write_registry( + &mut self, + _key: &RegistryKey, + _value: &$type, + _timestamp: Tai64, + ) -> anyhow::Result<()> { + unimplemented!() + } + + fn registry_index_lookup( + &self, + _value: &$type, + ) -> anyhow::Result> { + unimplemented!() + } + } + + impl EvictorDb<$type> for MockDb { + fn set_latest_assigned_key( + &mut self, + _key: RegistryKey, + ) -> anyhow::Result<()> { + unimplemented!() + } + + fn get_latest_assigned_key(&self) -> anyhow::Result> { + unimplemented!() + } + } + }; + } + mock_temporal!(Address); + mock_temporal!(AssetId); + mock_temporal!(ContractId); + mock_temporal!(ScriptCode); + mock_temporal!(PredicateCode); + + #[tokio::test] + async fn decompress_block_with_unknown_version() { + #[derive(Clone, Serialize, Deserialize)] + enum CompressedBlockWithNewVersions { + V0(crate::CompressedBlockPayloadV0), + NewVersion(u32), + #[serde(untagged)] + Unknown, + } + + // Given + let bad_block = + postcard::to_stdvec(&CompressedBlockWithNewVersions::NewVersion(1234)) + .unwrap(); + + // When + let result: Result = + postcard::from_bytes(&bad_block); + + // Then + let _ = + result.expect_err("should fail to deserialize because of unknown version"); + } +} diff --git a/crates/compression/src/eviction_policy.rs b/crates/compression/src/eviction_policy.rs new file mode 100644 index 00000000000..6343de83d8c --- /dev/null +++ b/crates/compression/src/eviction_policy.rs @@ -0,0 +1,63 @@ +use std::collections::HashSet; + +use fuel_core_types::fuel_compression::RegistryKey; + +use crate::ports::EvictorDb; + +/// Evictor for a single keyspace +#[derive(Debug)] +#[must_use = "Evictor must be committed to the database to persist state"] +pub(crate) struct CacheEvictor { + /// Set of keys that must not be evicted + keep_keys: HashSet, + /// Next key to be used + next_key: RegistryKey, + /// Marker for the keyspace type + _keyspace_marker: std::marker::PhantomData, +} + +impl CacheEvictor { + /// Create new evictor, reading state from the database + pub fn new_from_db( + db: &mut D, + keep_keys: HashSet, + ) -> anyhow::Result + where + D: EvictorDb, + { + let latest_key = db.get_latest_assigned_key()?; + let next_key = if let Some(latest_key) = latest_key { + latest_key.next() + } else { + RegistryKey::ZERO + }; + + Ok(Self { + keep_keys, + next_key, + _keyspace_marker: std::marker::PhantomData, + }) + } + + pub fn next_key(&mut self) -> RegistryKey { + // Pick first key not in the set + // TODO: use a proper algo, maybe LRU? + + debug_assert!(self.keep_keys.len() < 2usize.pow(24).saturating_sub(2)); + + while self.keep_keys.contains(&self.next_key) { + self.next_key = self.next_key.next(); + } + + self.keep_keys.insert(self.next_key); + self.next_key + } + + /// Commit the current state of the evictor to the database + pub fn commit(self, db: &mut D) -> anyhow::Result<()> + where + D: EvictorDb, + { + db.set_latest_assigned_key(self.next_key) + } +} diff --git a/crates/compression/src/lib.rs b/crates/compression/src/lib.rs new file mode 100644 index 00000000000..bd4b0fdcbba --- /dev/null +++ b/crates/compression/src/lib.rs @@ -0,0 +1,155 @@ +#![deny(clippy::arithmetic_side_effects)] +#![deny(clippy::cast_possible_truncation)] +#![deny(unused_crate_dependencies)] +#![deny(warnings)] + +pub mod compress; +pub mod config; +pub mod decompress; +mod eviction_policy; +pub mod ports; +mod registry; + +pub use config::Config; +pub use registry::RegistryKeyspace; + +use fuel_core_types::{ + blockchain::header::PartialBlockHeader, + fuel_tx::CompressedTransaction, +}; +use registry::RegistrationsPerTable; + +/// Compressed block, without the preceding version byte. +#[derive(Debug, Default, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct CompressedBlockPayloadV0 { + /// Temporal registry insertions + pub registrations: RegistrationsPerTable, + /// Compressed block header + pub header: PartialBlockHeader, + /// Compressed transactions + pub transactions: Vec, +} + +/// Versioned compressed block. +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +pub enum VersionedCompressedBlock { + V0(CompressedBlockPayloadV0), +} + +impl Default for VersionedCompressedBlock { + fn default() -> Self { + Self::V0(Default::default()) + } +} + +#[cfg(test)] +mod tests { + use fuel_core_compression as _; + use fuel_core_types::{ + blockchain::{ + header::{ + ApplicationHeader, + ConsensusHeader, + }, + primitives::Empty, + }, + fuel_compression::RegistryKey, + tai64::Tai64, + }; + use proptest::prelude::*; + + use super::*; + + fn keyspace() -> impl Strategy { + prop_oneof![ + Just(RegistryKeyspace::Address), + Just(RegistryKeyspace::AssetId), + Just(RegistryKeyspace::ContractId), + Just(RegistryKeyspace::ScriptCode), + Just(RegistryKeyspace::PredicateCode), + ] + } + + proptest! { + /// Serialization for compressed transactions is already tested in fuel-vm, + /// but the rest of the block de/serialization is tested here. + #[test] + fn postcard_roundtrip( + da_height in 0..=u64::MAX, + prev_root in prop::array::uniform32(0..=u8::MAX), + height in 0..=u32::MAX, + consensus_parameters_version in 0..=u32::MAX, + state_transition_bytecode_version in 0..=u32::MAX, + registration_inputs in prop::collection::vec( + (keyspace(), prop::num::u16::ANY, prop::array::uniform32(0..=u8::MAX)).prop_map(|(ks, rk, arr)| { + let k = RegistryKey::try_from(rk as u32).unwrap(); + (ks, k, arr) + }), + 0..123 + ), + ) { + let mut registrations: RegistrationsPerTable = Default::default(); + + for (ks, key, arr) in registration_inputs { + let value_len_limit = (key.as_u32() % 32) as usize; + match ks { + RegistryKeyspace::Address => { + registrations.address.push((key, arr.into())); + } + RegistryKeyspace::AssetId => { + registrations.asset_id.push((key, arr.into())); + } + RegistryKeyspace::ContractId => { + registrations.contract_id.push((key, arr.into())); + } + RegistryKeyspace::ScriptCode => { + registrations.script_code.push((key, arr[..value_len_limit].to_vec().into())); + } + RegistryKeyspace::PredicateCode => { + registrations.predicate_code.push((key, arr[..value_len_limit].to_vec().into())); + } + } + } + + let header = PartialBlockHeader { + application: ApplicationHeader { + da_height: da_height.into(), + consensus_parameters_version, + state_transition_bytecode_version, + generated: Empty, + }, + consensus: ConsensusHeader { + prev_root: prev_root.into(), + height: height.into(), + time: Tai64::UNIX_EPOCH, + generated: Empty + } + }; + let original = CompressedBlockPayloadV0 { + registrations, + header, + transactions: vec![], + }; + + let compressed = postcard::to_allocvec(&original).unwrap(); + let decompressed: CompressedBlockPayloadV0 = + postcard::from_bytes(&compressed).unwrap(); + + let CompressedBlockPayloadV0 { + registrations, + header, + transactions, + } = decompressed; + + assert_eq!(registrations, original.registrations); + + assert_eq!(header.da_height, da_height.into()); + assert_eq!(*header.prev_root(), prev_root.into()); + assert_eq!(*header.height(), height.into()); + assert_eq!(header.consensus_parameters_version, consensus_parameters_version); + assert_eq!(header.state_transition_bytecode_version, state_transition_bytecode_version); + + assert!(transactions.is_empty()); + } + } +} diff --git a/crates/compression/src/ports.rs b/crates/compression/src/ports.rs new file mode 100644 index 00000000000..50b2acd6fe8 --- /dev/null +++ b/crates/compression/src/ports.rs @@ -0,0 +1,121 @@ +//! Ports this service requires to function. + +use fuel_core_types::{ + fuel_compression::RegistryKey, + fuel_tx::{ + Address, + AssetId, + CompressedUtxoId, + UtxoId, + Word, + }, + fuel_types::Nonce, + tai64::Tai64, +}; + +/// Rolling cache for compression. +/// Holds the latest state which can be event sourced from the compressed blocks. +/// The changes done using this trait in a single call to `compress` or `decompress` +/// must be committed atomically, after which block height must be incremented. +pub trait TemporalRegistry { + /// Reads a value from the registry at its current height. + fn read_registry(&self, key: &RegistryKey) -> anyhow::Result; + + /// Reads timestamp of the value from the registry. + fn read_timestamp(&self, key: &RegistryKey) -> anyhow::Result; + + /// Writes a value from to the registry. The timestamp is the time of the block, + /// and it is used for key retention. + fn write_registry( + &mut self, + key: &RegistryKey, + value: &T, + timestamp: Tai64, + ) -> anyhow::Result<()>; + + /// Lookup registry key by the value. + fn registry_index_lookup(&self, value: &T) -> anyhow::Result>; +} + +impl TemporalRegistry for &mut D +where + D: TemporalRegistry, +{ + fn read_registry(&self, key: &RegistryKey) -> anyhow::Result { + >::read_registry(self, key) + } + + fn read_timestamp(&self, key: &RegistryKey) -> anyhow::Result { + >::read_timestamp(self, key) + } + + fn write_registry( + &mut self, + key: &RegistryKey, + value: &T, + timestamp: Tai64, + ) -> anyhow::Result<()> { + >::write_registry(self, key, value, timestamp) + } + + fn registry_index_lookup(&self, value: &T) -> anyhow::Result> { + >::registry_index_lookup(self, value) + } +} + +/// Lookup for UTXO pointers used for compression. +pub trait UtxoIdToPointer { + fn lookup(&self, utxo_id: UtxoId) -> anyhow::Result; +} + +impl UtxoIdToPointer for &mut D +where + D: UtxoIdToPointer, +{ + fn lookup(&self, utxo_id: UtxoId) -> anyhow::Result { + ::lookup(self, utxo_id) + } +} + +/// Lookup for history of UTXOs and messages, used for decompression. +pub trait HistoryLookup { + fn utxo_id(&self, c: CompressedUtxoId) -> anyhow::Result; + fn coin(&self, utxo_id: UtxoId) -> anyhow::Result; + fn message(&self, nonce: Nonce) -> anyhow::Result; +} + +/// Information about a coin. +#[derive(Debug, Clone)] +pub struct CoinInfo { + pub owner: Address, + pub amount: u64, + pub asset_id: AssetId, +} + +/// Information about a message. +#[derive(Debug, Clone)] +pub struct MessageInfo { + pub sender: Address, + pub recipient: Address, + pub amount: Word, + pub data: Vec, +} + +/// Evictor registry to keep track of the latest used key for the type `T`. +pub trait EvictorDb { + fn get_latest_assigned_key(&self) -> anyhow::Result>; + fn set_latest_assigned_key(&mut self, key: RegistryKey) -> anyhow::Result<()>; +} + +impl EvictorDb for &mut D +where + D: EvictorDb, +{ + fn get_latest_assigned_key(&self) -> anyhow::Result> { + >::get_latest_assigned_key(self) + } + + fn set_latest_assigned_key(&mut self, key: RegistryKey) -> anyhow::Result<()> { + >::set_latest_assigned_key(self, key) + } +} diff --git a/crates/compression/src/registry.rs b/crates/compression/src/registry.rs new file mode 100644 index 00000000000..0bf1e3a5967 --- /dev/null +++ b/crates/compression/src/registry.rs @@ -0,0 +1,119 @@ +use crate::ports::{ + EvictorDb, + TemporalRegistry, +}; +use fuel_core_types::{ + fuel_compression::RegistryKey, + fuel_tx::{ + input::PredicateCode, + Address, + AssetId, + ContractId, + ScriptCode, + }, + tai64::Tai64, +}; + +macro_rules! tables { + ($($ident:ty: $type:ty),*) => { paste::paste! { + #[doc = "RegistryKey namespaces"] + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize, strum_macros::EnumCount)] + pub enum RegistryKeyspace { + $( + [<$type>], + )* + } + + #[doc = "A value for each keyspace"] + #[derive(Debug, Clone, PartialEq, Eq, Default)] + pub struct PerRegistryKeyspace { + $(pub $ident: T,)* + } + impl core::ops::Index for PerRegistryKeyspace { + type Output = T; + + fn index(&self, index: RegistryKeyspace) -> &Self::Output { + match index { + $( + RegistryKeyspace::[<$type>] => &self.$ident, + )* + } + } + } + impl core::ops::IndexMut for PerRegistryKeyspace { + fn index_mut(&mut self, index: RegistryKeyspace) -> &mut Self::Output { + match index { + $( + RegistryKeyspace::[<$type>] => &mut self.$ident, + )* + } + } + } + + #[doc = "The set of registrations for each table, as used in the compressed block header"] + #[derive(Debug, Clone, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)] + pub struct RegistrationsPerTable { + $(pub $ident: Vec<(RegistryKey, $type)>,)* + } + + pub trait TemporalRegistryAll + where + $(Self: TemporalRegistry<$type>,)* + {} + + impl TemporalRegistryAll for T + where + $(T: TemporalRegistry<$type>,)* + {} + + pub trait EvictorDbAll + where + $(Self: EvictorDb<$type>,)* + {} + + impl EvictorDbAll for T + where + $(T: EvictorDb<$type>,)* + {} + + + impl RegistrationsPerTable { + pub(crate) fn write_to_registry(&self, registry: &mut R, timestamp: Tai64) -> anyhow::Result<()> + where + R: TemporalRegistryAll + { + $( + for (key, value) in self.$ident.iter() { + registry.write_registry(key, value, timestamp)?; + } + )* + + Ok(()) + } + } + }}; +} + +tables!( + address: Address, + asset_id: AssetId, + contract_id: ContractId, + script_code: ScriptCode, + predicate_code: PredicateCode +); + +// TODO: move inside the macro when this stabilizes: https://github.com/rust-lang/rust/pull/122808 +#[cfg(any(test, feature = "test-helpers"))] +impl rand::prelude::Distribution for rand::distributions::Standard { + fn sample(&self, rng: &mut R) -> RegistryKeyspace { + use strum::EnumCount; + match rng.gen_range(0..RegistryKeyspace::COUNT) { + 0 => RegistryKeyspace::Address, + 1 => RegistryKeyspace::AssetId, + 2 => RegistryKeyspace::ContractId, + 3 => RegistryKeyspace::ScriptCode, + 4 => RegistryKeyspace::PredicateCode, + _ => unreachable!("New keyspace is added but not supported here"), + } + } +} diff --git a/crates/fuel-core/Cargo.toml b/crates/fuel-core/Cargo.toml index 5161914d274..e2da4fa8521 100644 --- a/crates/fuel-core/Cargo.toml +++ b/crates/fuel-core/Cargo.toml @@ -22,6 +22,7 @@ clap = { workspace = true, features = ["derive"] } derive_more = { version = "0.99" } enum-iterator = { workspace = true } fuel-core-chain-config = { workspace = true, features = ["std"] } +fuel-core-compression = { workspace = true } fuel-core-consensus-module = { workspace = true } fuel-core-database = { workspace = true } fuel-core-executor = { workspace = true, features = ["std"] } @@ -44,6 +45,7 @@ hyper = { workspace = true } indicatif = { workspace = true, default-features = true } itertools = { workspace = true } num_cpus = { version = "1.16.0", optional = true } +paste = { workspace = true } postcard = { workspace = true, optional = true } rand = { workspace = true } rocksdb = { version = "0.21", default-features = false, features = [ @@ -95,6 +97,7 @@ test-helpers = [ "fuel-core-p2p?/test-helpers", "fuel-core-storage/test-helpers", "fuel-core-chain-config/test-helpers", + "fuel-core-compression/test-helpers", "fuel-core-txpool/test-helpers", "fuel-core-services/test-helpers", "fuel-core-importer/test-helpers", diff --git a/crates/fuel-core/src/graphql_api.rs b/crates/fuel-core/src/graphql_api.rs index 8eeae002f6e..fad33dbc59d 100644 --- a/crates/fuel-core/src/graphql_api.rs +++ b/crates/fuel-core/src/graphql_api.rs @@ -8,6 +8,7 @@ use std::{ }; pub mod api_service; +mod da_compression; pub mod database; pub(crate) mod metrics_extension; pub mod ports; @@ -41,6 +42,7 @@ pub struct Costs { pub storage_read: usize, pub storage_iterator: usize, pub bytecode_read: usize, + pub da_compressed_block_read: usize, } pub const QUERY_COSTS: Costs = Costs { @@ -60,6 +62,7 @@ pub const QUERY_COSTS: Costs = Costs { storage_read: 10, storage_iterator: 100, bytecode_read: 2000, + da_compressed_block_read: 1000, }; #[derive(Clone, Debug)] diff --git a/crates/fuel-core/src/graphql_api/da_compression.rs b/crates/fuel-core/src/graphql_api/da_compression.rs new file mode 100644 index 00000000000..e9d11d1c22e --- /dev/null +++ b/crates/fuel-core/src/graphql_api/da_compression.rs @@ -0,0 +1,212 @@ +use crate::fuel_core_graphql_api::{ + ports::worker::OffChainDatabaseTransaction, + storage::da_compression::{ + evictor_cache::MetadataKey, + timestamps::{ + TimestampKey, + TimestampKeyspace, + }, + *, + }, +}; +use fuel_core_compression::{ + compress::compress, + config::Config, + ports::{ + EvictorDb, + TemporalRegistry, + UtxoIdToPointer, + }, +}; +use fuel_core_storage::{ + not_found, + StorageAsMut, + StorageAsRef, +}; +use fuel_core_types::{ + blockchain::block::Block, + fuel_tx::{ + input::PredicateCode, + Address, + AssetId, + ContractId, + ScriptCode, + }, + services::executor::Event, + tai64::Tai64, +}; +use futures::FutureExt; + +/// Performs DA compression for a block and stores it in the database. +pub fn da_compress_block( + config: Config, + block: &Block, + block_events: &[Event], + db_tx: &mut T, +) -> anyhow::Result<()> +where + T: OffChainDatabaseTransaction, +{ + let compressed = compress( + config, + CompressTx { + db_tx, + block_events, + }, + block, + ) + .now_or_never() + .expect("The current implementation resolved all futures instantly")?; + + db_tx + .storage_as_mut::() + .insert(&block.header().consensus().height, &compressed)?; + + Ok(()) +} + +struct CompressTx<'a, Tx> { + db_tx: &'a mut Tx, + block_events: &'a [Event], +} + +macro_rules! impl_temporal_registry { + ($type:ty) => { paste::paste! { + impl<'a, Tx> TemporalRegistry<$type> for CompressTx<'a, Tx> + where + Tx: OffChainDatabaseTransaction, + { + fn read_registry( + &self, + key: &fuel_core_types::fuel_compression::RegistryKey, + ) -> anyhow::Result<$type> { + Ok(self + .db_tx + .storage_as_ref::<[< DaCompressionTemporalRegistry $type >]>() + .get(key)? + .ok_or(not_found!([< DaCompressionTemporalRegistry $type>]))? + .into_owned()) + } + + fn read_timestamp( + &self, + key: &fuel_core_types::fuel_compression::RegistryKey, + ) -> anyhow::Result { + Ok(self + .db_tx + .storage_as_ref::<[< DaCompressionTemporalRegistryTimestamps >]>() + .get(&TimestampKey { + keyspace: TimestampKeyspace::$type, + key: *key, + })? + .ok_or(not_found!(DaCompressionTemporalRegistryTimestamps))? + .into_owned()) + } + + fn write_registry( + &mut self, + key: &fuel_core_types::fuel_compression::RegistryKey, + value: &$type, + timestamp: Tai64, + ) -> anyhow::Result<()> { + // Write the actual value + let old_value = self.db_tx + .storage_as_mut::<[< DaCompressionTemporalRegistry $type >]>() + .replace(key, value)?; + + // Remove the overwritten value from index, if any + if let Some(old_value) = old_value { + let old_reverse_key = (&old_value).into(); + self.db_tx + .storage_as_mut::() + .remove(&old_reverse_key)?; + } + + // Add the new value to the index + let reverse_key = value.into(); + self.db_tx + .storage_as_mut::() + .insert(&reverse_key, key)?; + + // Update the timestamp + self.db_tx + .storage_as_mut::() + .insert(&TimestampKey { keyspace: TimestampKeyspace::$type, key: *key }, ×tamp)?; + + Ok(()) + } + + fn registry_index_lookup( + &self, + value: &$type, + ) -> anyhow::Result> + { + let reverse_key = value.into(); + Ok(self + .db_tx + .storage_as_ref::() + .get(&reverse_key)? + .map(|v| v.into_owned())) + } + } + + impl<'a, Tx> EvictorDb<$type> for CompressTx<'a, Tx> + where + Tx: OffChainDatabaseTransaction, + { + fn set_latest_assigned_key( + &mut self, + key: fuel_core_types::fuel_compression::RegistryKey, + ) -> anyhow::Result<()> { + self.db_tx + .storage_as_mut::() + .insert(&MetadataKey::$type, &key)?; + Ok(()) + } + + fn get_latest_assigned_key( + &self, + ) -> anyhow::Result> { + Ok(self + .db_tx + .storage_as_ref::() + .get(&MetadataKey::$type)? + .map(|v| v.into_owned()) + ) + } + } + + }}; +} + +impl_temporal_registry!(Address); +impl_temporal_registry!(AssetId); +impl_temporal_registry!(ContractId); +impl_temporal_registry!(ScriptCode); +impl_temporal_registry!(PredicateCode); + +impl<'a, Tx> UtxoIdToPointer for CompressTx<'a, Tx> +where + Tx: OffChainDatabaseTransaction, +{ + fn lookup( + &self, + utxo_id: fuel_core_types::fuel_tx::UtxoId, + ) -> anyhow::Result { + for event in self.block_events { + match event { + Event::CoinCreated(coin) | Event::CoinConsumed(coin) + if coin.utxo_id == utxo_id => + { + let output_index = coin.utxo_id.output_index(); + return Ok(fuel_core_types::fuel_tx::CompressedUtxoId { + tx_pointer: coin.tx_pointer, + output_index, + }); + } + _ => {} + } + } + anyhow::bail!("UtxoId not found in the block events"); + } +} diff --git a/crates/fuel-core/src/graphql_api/database.rs b/crates/fuel-core/src/graphql_api/database.rs index ca1dd6ca972..53d2dbb39bf 100644 --- a/crates/fuel-core/src/graphql_api/database.rs +++ b/crates/fuel-core/src/graphql_api/database.rs @@ -69,6 +69,8 @@ use std::{ sync::Arc, }; +use super::ports::DatabaseDaCompressedBlocks; + mod arc_wrapper; /// The on-chain view of the database used by the [`ReadView`] to fetch on-chain data. @@ -210,6 +212,16 @@ impl DatabaseBlocks for ReadView { } } +impl DatabaseDaCompressedBlocks for ReadView { + fn da_compressed_block(&self, id: &BlockHeight) -> StorageResult> { + self.off_chain.da_compressed_block(id) + } + + fn latest_height(&self) -> StorageResult { + self.on_chain.latest_height() + } +} + impl StorageInspect for ReadView where M: Mappable, @@ -286,6 +298,10 @@ impl OffChainDatabase for ReadView { self.off_chain.block_height(block_id) } + fn da_compressed_block(&self, height: &BlockHeight) -> StorageResult> { + self.off_chain.da_compressed_block(height) + } + fn tx_status(&self, tx_id: &TxId) -> StorageResult { self.off_chain.tx_status(tx_id) } diff --git a/crates/fuel-core/src/graphql_api/ports.rs b/crates/fuel-core/src/graphql_api/ports.rs index 2dcd596f2eb..e9a9e5255b4 100644 --- a/crates/fuel-core/src/graphql_api/ports.rs +++ b/crates/fuel-core/src/graphql_api/ports.rs @@ -69,6 +69,8 @@ use std::sync::Arc; pub trait OffChainDatabase: Send + Sync { fn block_height(&self, block_id: &BlockId) -> StorageResult; + fn da_compressed_block(&self, height: &BlockHeight) -> StorageResult>; + fn tx_status(&self, tx_id: &TxId) -> StorageResult; fn owned_coins_ids( @@ -150,6 +152,14 @@ pub trait DatabaseBlocks { fn consensus(&self, id: &BlockHeight) -> StorageResult; } +/// Trait that specifies all the getters required for DA compressed blocks. +pub trait DatabaseDaCompressedBlocks { + /// Get a DA compressed block by its height. + fn da_compressed_block(&self, height: &BlockHeight) -> StorageResult>; + + fn latest_height(&self) -> StorageResult; +} + /// Trait that specifies all the getters required for messages. pub trait DatabaseMessages: StorageInspect { fn all_messages( @@ -268,6 +278,7 @@ pub mod worker { }, }, graphql_api::storage::{ + da_compression::*, old::{ OldFuelBlockConsensus, OldFuelBlocks, @@ -321,6 +332,15 @@ pub mod worker { + StorageMutate + StorageMutate + StorageMutate + + StorageMutate + + StorageMutate + + StorageMutate + + StorageMutate + + StorageMutate + + StorageMutate + + StorageMutate + + StorageMutate + + StorageMutate { fn record_tx_id_owner( &mut self, diff --git a/crates/fuel-core/src/graphql_api/storage.rs b/crates/fuel-core/src/graphql_api/storage.rs index 1b77c07cbdc..8f8cfcd1f19 100644 --- a/crates/fuel-core/src/graphql_api/storage.rs +++ b/crates/fuel-core/src/graphql_api/storage.rs @@ -39,6 +39,7 @@ use statistic::StatisticTable; pub mod blocks; pub mod coins; pub mod contracts; +pub mod da_compression; pub mod messages; pub mod old; pub mod statistic; @@ -93,6 +94,25 @@ pub enum Column { /// Existence of a key in this column means that the message has been spent. /// See [`SpentMessages`](messages::SpentMessages) SpentMessages = 13, + /// DA compression and postcard serialized blocks. + /// See [`DaCompressedBlocks`](da_compression::DaCompressedBlocks) + DaCompressedBlocks = 14, + /// See [`DaCompressionTemporalRegistryIndex`](da_compression::DaCompressionTemporalRegistryIndex) + DaCompressionTemporalRegistryIndex = 15, + /// See [`DaCompressionTemporalRegistryTimestamps`](da_compression::DaCompressionTemporalRegistryTimestamps) + DaCompressionTemporalRegistryTimestamps = 16, + /// See [`DaCompressionTemporalRegistryEvictorCache`](da_compression::DaCompressionTemporalRegistryEvictorCache) + DaCompressionTemporalRegistryEvictorCache = 17, + /// See [`DaCompressionTemporalRegistryAddress`](da_compression::DaCompressionTemporalRegistryAddress) + DaCompressionTemporalRegistryAddress = 18, + /// See [`DaCompressionTemporalRegistryAssetId`](da_compression::DaCompressionTemporalRegistryAssetId) + DaCompressionTemporalRegistryAssetId = 19, + /// See [`DaCompressionTemporalRegistryContractId`](da_compression::DaCompressionTemporalRegistryContractId) + DaCompressionTemporalRegistryContractId = 20, + /// See [`DaCompressionTemporalRegistryScriptCode`](da_compression::DaCompressionTemporalRegistryScriptCode) + DaCompressionTemporalRegistryScriptCode = 21, + /// See [`DaCompressionTemporalRegistryPredicateCode`](da_compression::DaCompressionTemporalRegistryPredicateCode) + DaCompressionTemporalRegistryPredicateCode = 22, } impl Column { diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression.rs b/crates/fuel-core/src/graphql_api/storage/da_compression.rs new file mode 100644 index 00000000000..54b21073ed6 --- /dev/null +++ b/crates/fuel-core/src/graphql_api/storage/da_compression.rs @@ -0,0 +1,200 @@ +use self::{ + evictor_cache::MetadataKey, + predicate_code_codec::PredicateCodeCodec, + reverse_key::ReverseKey, + script_code_codec::ScriptCodeCodec, + timestamps::TimestampKey, +}; +use fuel_core_compression::VersionedCompressedBlock; +use fuel_core_storage::{ + blueprint::plain::Plain, + codec::{ + postcard::Postcard, + primitive::Primitive, + raw::Raw, + }, + structured_storage::TableWithBlueprint, + Mappable, +}; +use fuel_core_types::{ + fuel_compression::RegistryKey, + fuel_tx::{ + input::PredicateCode, + Address, + AssetId, + ContractId, + ScriptCode, + }, + fuel_types::BlockHeight, + tai64::Tai64, +}; + +pub mod evictor_cache; +pub mod predicate_code_codec; +pub mod reverse_key; +pub mod script_code_codec; +pub mod timestamps; + +/// The table for the compressed blocks sent to DA. +pub struct DaCompressedBlocks; + +impl Mappable for DaCompressedBlocks { + type Key = Self::OwnedKey; + type OwnedKey = BlockHeight; + type Value = Self::OwnedValue; + type OwnedValue = VersionedCompressedBlock; +} + +impl TableWithBlueprint for DaCompressedBlocks { + type Blueprint = Plain, Postcard>; + type Column = super::Column; + + fn column() -> Self::Column { + Self::Column::DaCompressedBlocks + } +} + +/// Mapping from the type to the registry key in the temporal registry. +pub struct DaCompressionTemporalRegistryIndex; + +impl Mappable for DaCompressionTemporalRegistryIndex { + type Key = Self::OwnedKey; + type OwnedKey = ReverseKey; + type Value = Self::OwnedValue; + type OwnedValue = RegistryKey; +} + +impl TableWithBlueprint for DaCompressionTemporalRegistryIndex { + // TODO: Use Raw codec for value instead of Postcard + type Blueprint = Plain; + type Column = super::Column; + + fn column() -> Self::Column { + Self::Column::DaCompressionTemporalRegistryIndex + } +} + +/// This table keeps track of last written timestamp for each key, +/// so that we can keep track of expiration. +pub struct DaCompressionTemporalRegistryTimestamps; + +impl Mappable for DaCompressionTemporalRegistryTimestamps { + type Key = Self::OwnedKey; + type OwnedKey = TimestampKey; + type Value = Self::OwnedValue; + type OwnedValue = Tai64; +} + +impl TableWithBlueprint for DaCompressionTemporalRegistryTimestamps { + // TODO: Use Raw codec for value instead of Postcard + type Blueprint = Plain; + type Column = super::Column; + + fn column() -> Self::Column { + Self::Column::DaCompressionTemporalRegistryTimestamps + } +} + +/// This table is used to hold "next key to evict" for each keyspace. +/// In the future we'll likely switch to use LRU or something, in which +/// case this table can be repurposed. +pub struct DaCompressionTemporalRegistryEvictorCache; + +impl Mappable for DaCompressionTemporalRegistryEvictorCache { + type Key = Self::OwnedKey; + type OwnedKey = MetadataKey; + type Value = Self::OwnedValue; + type OwnedValue = RegistryKey; +} + +impl TableWithBlueprint for DaCompressionTemporalRegistryEvictorCache { + // TODO: Use Raw codec for value instead of Postcard + type Blueprint = Plain; + type Column = super::Column; + + fn column() -> Self::Column { + Self::Column::DaCompressionTemporalRegistryEvictorCache + } +} + +macro_rules! temporal_registry { + ($type:ty, $code:ty) => { + paste::paste! { + pub struct [< DaCompressionTemporalRegistry $type >]; + + impl Mappable for [< DaCompressionTemporalRegistry $type >] { + type Key = Self::OwnedKey; + type OwnedKey = RegistryKey; + type Value = Self::OwnedValue; + type OwnedValue = $type; + } + + impl TableWithBlueprint for [< DaCompressionTemporalRegistry $type >] { + // TODO: Use Raw codec for value instead of Postcard + type Blueprint = Plain; + type Column = super::Column; + + fn column() -> Self::Column { + Self::Column::[< DaCompressionTemporalRegistry $type >] + } + } + + + #[cfg(test)] + fuel_core_storage::basic_storage_tests!( + [< DaCompressionTemporalRegistry $type >], + RegistryKey::ZERO, + <[< DaCompressionTemporalRegistry $type >] as Mappable>::Value::default(), + <[< DaCompressionTemporalRegistry $type >] as Mappable>::Value::default(), + tests::generate_key + ); + } + }; +} + +temporal_registry!(Address, Raw); +temporal_registry!(AssetId, Raw); +temporal_registry!(ContractId, Raw); +temporal_registry!(ScriptCode, ScriptCodeCodec); +temporal_registry!(PredicateCode, PredicateCodeCodec); + +#[cfg(test)] +mod tests { + use super::*; + + #[cfg(test)] + fuel_core_storage::basic_storage_tests!( + DaCompressionTemporalRegistryIndex, + ReverseKey::Address(Address::zeroed()), + RegistryKey::ZERO + ); + + #[cfg(test)] + fuel_core_storage::basic_storage_tests!( + DaCompressionTemporalRegistryTimestamps, + TimestampKey { + keyspace: timestamps::TimestampKeyspace::Address, + key: RegistryKey::ZERO + }, + Tai64::UNIX_EPOCH + ); + + #[cfg(test)] + fuel_core_storage::basic_storage_tests!( + DaCompressionTemporalRegistryEvictorCache, + MetadataKey::Address, + RegistryKey::ZERO + ); + + fuel_core_storage::basic_storage_tests!( + DaCompressedBlocks, + ::Key::default(), + ::Value::default() + ); + + #[allow(clippy::arithmetic_side_effects)] // Test code, and also safe + pub fn generate_key(rng: &mut impl rand::Rng) -> RegistryKey { + let raw_key: u32 = rng.gen_range(0..2u32.pow(24) - 2); + RegistryKey::try_from(raw_key).unwrap() + } +} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/evictor_cache.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/evictor_cache.rs new file mode 100644 index 00000000000..870d02722f6 --- /dev/null +++ b/crates/fuel-core/src/graphql_api/storage/da_compression/evictor_cache.rs @@ -0,0 +1,34 @@ +/// The metadata key used by `DaCompressionTemporalRegistryEvictorCache` table to +/// store progress of the evictor. +#[derive( + Debug, + Clone, + Copy, + PartialEq, + Eq, + serde::Serialize, + serde::Deserialize, + strum::EnumCount, +)] +pub enum MetadataKey { + Address, + AssetId, + ContractId, + ScriptCode, + PredicateCode, +} + +#[cfg(feature = "test-helpers")] +impl rand::distributions::Distribution for rand::distributions::Standard { + fn sample(&self, rng: &mut R) -> MetadataKey { + use strum::EnumCount; + match rng.next_u32() as usize % MetadataKey::COUNT { + 0 => MetadataKey::Address, + 1 => MetadataKey::AssetId, + 2 => MetadataKey::ContractId, + 3 => MetadataKey::ScriptCode, + 4 => MetadataKey::PredicateCode, + _ => unreachable!("New metadata key is added but not supported here"), + } + } +} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/predicate_code_codec.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/predicate_code_codec.rs new file mode 100644 index 00000000000..6c165c09f3a --- /dev/null +++ b/crates/fuel-core/src/graphql_api/storage/da_compression/predicate_code_codec.rs @@ -0,0 +1,28 @@ +use fuel_core_storage::codec::{ + Decode, + Encode, +}; +use fuel_core_types::fuel_tx::input::PredicateCode; +use std::{ + borrow::Cow, + ops::Deref, +}; + +// TODO: Remove this codec when the `PredicateCode` implements +// `AsRef<[u8]>` and `TryFrom<[u8]>` and use `Raw` codec instead. + +pub struct PredicateCodeCodec; + +impl Encode for PredicateCodeCodec { + type Encoder<'a> = Cow<'a, [u8]>; + + fn encode(t: &PredicateCode) -> Self::Encoder<'_> { + Cow::Borrowed(t.deref()) + } +} + +impl Decode for PredicateCodeCodec { + fn decode(bytes: &[u8]) -> anyhow::Result { + Ok(bytes.to_vec().into()) + } +} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/reverse_key.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/reverse_key.rs new file mode 100644 index 00000000000..3c24d3a817f --- /dev/null +++ b/crates/fuel-core/src/graphql_api/storage/da_compression/reverse_key.rs @@ -0,0 +1,82 @@ +use fuel_core_types::{ + fuel_tx::{ + input::PredicateCode, + ScriptCode, + }, + fuel_types::{ + Address, + AssetId, + Bytes32, + ContractId, + }, +}; +use std::ops::Deref; + +#[derive( + Debug, + Clone, + Copy, + PartialEq, + Eq, + serde::Serialize, + serde::Deserialize, + strum::EnumCount, +)] +/// The reverse key for the temporal registry index. +/// By this key we can find the registry key from the temporal registry. +pub enum ReverseKey { + Address(Address), + AssetId(AssetId), + ContractId(ContractId), + /// Hash of the script code. + ScriptCode(Bytes32), + /// Hash of the predicate code. + PredicateCode(Bytes32), +} + +impl From<&Address> for ReverseKey { + fn from(address: &Address) -> Self { + Self::Address(*address) + } +} + +impl From<&AssetId> for ReverseKey { + fn from(asset_id: &AssetId) -> Self { + Self::AssetId(*asset_id) + } +} + +impl From<&ContractId> for ReverseKey { + fn from(contract_id: &ContractId) -> Self { + Self::ContractId(*contract_id) + } +} + +impl From<&ScriptCode> for ReverseKey { + fn from(script_code: &ScriptCode) -> Self { + let hash = fuel_core_types::fuel_crypto::Hasher::hash(script_code.deref()); + ReverseKey::ScriptCode(hash) + } +} + +impl From<&PredicateCode> for ReverseKey { + fn from(predicate_code: &PredicateCode) -> Self { + let hash = fuel_core_types::fuel_crypto::Hasher::hash(predicate_code.deref()); + ReverseKey::PredicateCode(hash) + } +} + +#[cfg(feature = "test-helpers")] +impl rand::distributions::Distribution for rand::distributions::Standard { + fn sample(&self, rng: &mut R) -> ReverseKey { + use strum::EnumCount; + match rng.next_u32() as usize % ReverseKey::COUNT { + 0 => ReverseKey::Address(Address::default()), + 1 => ReverseKey::AssetId(AssetId::default()), + 2 => ReverseKey::ContractId(ContractId::default()), + 3 => ReverseKey::ScriptCode(Bytes32::default()), + 4 => ReverseKey::PredicateCode(Bytes32::default()), + _ => unreachable!("New reverse key is added but not supported here"), + } + } +} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/script_code_codec.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/script_code_codec.rs new file mode 100644 index 00000000000..a4d6c8d1ac3 --- /dev/null +++ b/crates/fuel-core/src/graphql_api/storage/da_compression/script_code_codec.rs @@ -0,0 +1,28 @@ +use fuel_core_storage::codec::{ + Decode, + Encode, +}; +use fuel_core_types::fuel_tx::ScriptCode; +use std::{ + borrow::Cow, + ops::Deref, +}; + +// TODO: Remove this codec when the `ScriptCode` implements +// `AsRef<[u8]>` and `TryFrom<[u8]>` and use `Raw` codec instead. + +pub struct ScriptCodeCodec; + +impl Encode for ScriptCodeCodec { + type Encoder<'a> = Cow<'a, [u8]>; + + fn encode(t: &ScriptCode) -> Self::Encoder<'_> { + Cow::Borrowed(t.deref()) + } +} + +impl Decode for ScriptCodeCodec { + fn decode(bytes: &[u8]) -> anyhow::Result { + Ok(bytes.to_vec().into()) + } +} diff --git a/crates/fuel-core/src/graphql_api/storage/da_compression/timestamps.rs b/crates/fuel-core/src/graphql_api/storage/da_compression/timestamps.rs new file mode 100644 index 00000000000..dc8f016f50b --- /dev/null +++ b/crates/fuel-core/src/graphql_api/storage/da_compression/timestamps.rs @@ -0,0 +1,57 @@ +use fuel_core_types::fuel_compression::RegistryKey; + +/// The metadata key used by `DaCompressionTemporalRegistryTimsetamps` table to +/// keep track of when each key was last updated. +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct TimestampKey { + /// The column where the key is stored. + pub keyspace: TimestampKeyspace, + /// The key itself. + pub key: RegistryKey, +} + +#[derive( + Debug, + Clone, + Copy, + PartialEq, + Eq, + serde::Serialize, + serde::Deserialize, + strum::EnumCount, +)] +pub enum TimestampKeyspace { + Address, + AssetId, + ContractId, + ScriptCode, + PredicateCode, +} + +#[cfg(feature = "test-helpers")] +impl rand::distributions::Distribution for rand::distributions::Standard { + #![allow(clippy::arithmetic_side_effects)] // Test-only code, and also safe + fn sample(&self, rng: &mut R) -> TimestampKey { + TimestampKey { + keyspace: rng.gen(), + key: RegistryKey::try_from(rng.gen_range(0..2u32.pow(24) - 2)).unwrap(), + } + } +} + +#[cfg(feature = "test-helpers")] +impl rand::distributions::Distribution + for rand::distributions::Standard +{ + fn sample(&self, rng: &mut R) -> TimestampKeyspace { + use strum::EnumCount; + match rng.next_u32() as usize % TimestampKeyspace::COUNT { + 0 => TimestampKeyspace::Address, + 1 => TimestampKeyspace::AssetId, + 2 => TimestampKeyspace::ContractId, + 3 => TimestampKeyspace::ScriptCode, + 4 => TimestampKeyspace::PredicateCode, + _ => unreachable!("New metadata key is added but not supported here"), + } + } +} diff --git a/crates/fuel-core/src/graphql_api/worker_service.rs b/crates/fuel-core/src/graphql_api/worker_service.rs index 1c19788d194..a1736aeefc7 100644 --- a/crates/fuel-core/src/graphql_api/worker_service.rs +++ b/crates/fuel-core/src/graphql_api/worker_service.rs @@ -1,12 +1,17 @@ -use super::storage::old::{ - OldFuelBlockConsensus, - OldFuelBlocks, - OldTransactions, +use super::{ + da_compression::da_compress_block, + storage::old::{ + OldFuelBlockConsensus, + OldFuelBlocks, + OldTransactions, + }, }; use crate::{ fuel_core_graphql_api::{ - ports, - ports::worker::OffChainDatabaseTransaction, + ports::{ + self, + worker::OffChainDatabaseTransaction, + }, storage::{ blocks::FuelBlockIdsToHeights, coins::{ @@ -93,9 +98,16 @@ use std::{ #[cfg(test)] mod tests; +#[derive(Debug, Clone)] +pub enum DaCompressionConfig { + Disabled, + Enabled(fuel_core_compression::config::Config), +} + /// The initialization task recovers the state of the GraphQL service database on startup. pub struct InitializeTask { chain_id: ChainId, + da_compression_config: DaCompressionConfig, continue_on_error: bool, tx_pool: TxPool, blocks_events: BoxStream, @@ -111,6 +123,7 @@ pub struct Task { block_importer: BoxStream, database: D, chain_id: ChainId, + da_compression_config: DaCompressionConfig, continue_on_error: bool, } @@ -134,7 +147,7 @@ where let height = block.header().height(); let block_id = block.id(); transaction - .storage::() + .storage_as_mut::() .insert(&block_id, height)?; let total_tx_count = transaction @@ -146,6 +159,13 @@ where &mut transaction, )?; + match self.da_compression_config { + DaCompressionConfig::Disabled => {} + DaCompressionConfig::Enabled(config) => { + da_compress_block(config, block, &result.events, &mut transaction)?; + } + } + transaction.commit()?; for status in result.tx_status.iter() { @@ -455,6 +475,7 @@ where let InitializeTask { chain_id, + da_compression_config, tx_pool, block_importer, blocks_events, @@ -468,6 +489,7 @@ where block_importer: blocks_events, database: off_chain_database, chain_id, + da_compression_config, continue_on_error, }; @@ -579,6 +601,7 @@ pub fn new_service( on_chain_database: OnChain, off_chain_database: OffChain, chain_id: ChainId, + da_compression_config: DaCompressionConfig, continue_on_error: bool, ) -> ServiceRunner> where @@ -594,6 +617,7 @@ where on_chain_database, off_chain_database, chain_id, + da_compression_config, continue_on_error, }) } diff --git a/crates/fuel-core/src/graphql_api/worker_service/tests.rs b/crates/fuel-core/src/graphql_api/worker_service/tests.rs index b6eef2c7826..8b9ad758975 100644 --- a/crates/fuel-core/src/graphql_api/worker_service/tests.rs +++ b/crates/fuel-core/src/graphql_api/worker_service/tests.rs @@ -81,6 +81,7 @@ fn worker_task_with_block_importer_and_db( block_importer, database, chain_id, + da_compression_config: DaCompressionConfig::Disabled, continue_on_error: false, } } diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index f72b4c348b0..8303122eb45 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -6,8 +6,13 @@ use crate::{ CoinConfigGenerator, }, combined_database::CombinedDatabase, - database::Database, + database::{ + database_description::off_chain::OffChain, + Database, + }, + fuel_core_graphql_api::storage::transactions::TransactionStatuses, p2p::Multiaddr, + schema::tx::types::TransactionStatus, service::{ Config, FuelService, @@ -33,7 +38,6 @@ use fuel_core_poa::{ Trigger, }; use fuel_core_storage::{ - tables::Transactions, transactional::AtomicView, StorageAsRef, }; @@ -59,7 +63,6 @@ use fuel_core_types::{ services::p2p::GossipsubMessageAcceptance, }; use futures::StreamExt; -use itertools::Itertools; use rand::{ rngs::StdRng, SeedableRng, @@ -491,24 +494,34 @@ impl Node { /// Wait for the node to reach consistency with the given transactions. pub async fn consistency(&mut self, txs: &HashMap) { - let Self { db, .. } = self; - let mut blocks = self.node.shared.block_importer.block_stream(); - while !not_found_txs(db, txs).is_empty() { - tokio::select! { - result = blocks.next() => { - result.unwrap(); - } - _ = self.node.await_shutdown() => { - panic!("Got a stop signal") + let db = self.node.shared.database.off_chain(); + loop { + let not_found = not_found_txs(db, txs); + + if not_found.is_empty() { + break; + } + + let tx_id = not_found[0]; + let mut wait_transaction = + self.node.transaction_status_change(tx_id).unwrap(); + + loop { + tokio::select! { + result = wait_transaction.next() => { + let status = result.unwrap().unwrap(); + + if matches!(status, TransactionStatus::Failed { .. }) + || matches!(status, TransactionStatus::Success { .. }) { + break + } + } + _ = self.node.await_shutdown() => { + panic!("Got a stop signal") + } } } } - - let count = db - .all_transactions(None, None) - .filter_ok(|tx| tx.is_script()) - .count(); - assert_eq!(count, txs.len()); } /// Wait for the node to reach consistency with the given transactions within 10 seconds. @@ -570,13 +583,17 @@ impl Node { } fn not_found_txs<'iter>( - db: &'iter Database, + db: &'iter Database, txs: &'iter HashMap, ) -> Vec { let mut not_found = vec![]; txs.iter().for_each(|(id, tx)| { assert_eq!(id, &tx.id(&Default::default())); - if !db.storage::().contains_key(id).unwrap() { + let found = db + .storage::() + .contains_key(id) + .unwrap(); + if !found { not_found.push(*id); } }); diff --git a/crates/fuel-core/src/query.rs b/crates/fuel-core/src/query.rs index c5d3d0f6988..fc2dc79ea9b 100644 --- a/crates/fuel-core/src/query.rs +++ b/crates/fuel-core/src/query.rs @@ -9,6 +9,8 @@ mod subscriptions; mod tx; mod upgrades; +pub mod da_compressed; + // TODO: Remove reexporting of everything pub use balance::*; pub use blob::*; diff --git a/crates/fuel-core/src/query/da_compressed.rs b/crates/fuel-core/src/query/da_compressed.rs new file mode 100644 index 00000000000..669e55d584e --- /dev/null +++ b/crates/fuel-core/src/query/da_compressed.rs @@ -0,0 +1,16 @@ +use crate::graphql_api::ports::DatabaseDaCompressedBlocks; +use fuel_core_storage::Result as StorageResult; +use fuel_core_types::fuel_types::BlockHeight; + +pub trait DaCompressedBlockData: Send + Sync { + fn da_compressed_block(&self, id: &BlockHeight) -> StorageResult>; +} + +impl DaCompressedBlockData for D +where + D: DatabaseDaCompressedBlocks + ?Sized + Send + Sync, +{ + fn da_compressed_block(&self, height: &BlockHeight) -> StorageResult> { + self.da_compressed_block(height) + } +} diff --git a/crates/fuel-core/src/query/message/test.rs b/crates/fuel-core/src/query/message/test.rs index 258de7fc7be..350004ba31d 100644 --- a/crates/fuel-core/src/query/message/test.rs +++ b/crates/fuel-core/src/query/message/test.rs @@ -1,6 +1,5 @@ use std::ops::Deref; -use fuel_core_txpool::types::ContractId; use fuel_core_types::{ blockchain::header::{ ApplicationHeader, @@ -9,11 +8,14 @@ use fuel_core_types::{ }, entities::relayer::message::MerkleProof, fuel_tx::{ - AssetId, Script, Transaction, }, - fuel_types::BlockHeight, + fuel_types::{ + AssetId, + BlockHeight, + ContractId, + }, tai64::Tai64, }; diff --git a/crates/fuel-core/src/schema.rs b/crates/fuel-core/src/schema.rs index 747f2740151..bd9e550d448 100644 --- a/crates/fuel-core/src/schema.rs +++ b/crates/fuel-core/src/schema.rs @@ -32,6 +32,7 @@ pub mod block; pub mod chain; pub mod coins; pub mod contract; +pub mod da_compressed; pub mod dap; pub mod health; pub mod message; @@ -54,6 +55,7 @@ pub struct Query( tx::TxQuery, health::HealthQuery, coins::CoinQuery, + da_compressed::DaCompressedBlockQuery, contract::ContractQuery, contract::ContractBalanceQuery, node_info::NodeQuery, diff --git a/crates/fuel-core/src/schema/da_compressed.rs b/crates/fuel-core/src/schema/da_compressed.rs new file mode 100644 index 00000000000..3af336f8ba9 --- /dev/null +++ b/crates/fuel-core/src/schema/da_compressed.rs @@ -0,0 +1,51 @@ +use super::{ + scalars::HexString, + ReadViewProvider, +}; +use crate::{ + fuel_core_graphql_api::{ + IntoApiResult, + QUERY_COSTS, + }, + query::da_compressed::DaCompressedBlockData, + schema::scalars::U32, +}; +use async_graphql::{ + Context, + Object, +}; + +pub struct DaCompressedBlock { + bytes: Vec, +} + +impl From> for DaCompressedBlock { + fn from(bytes: Vec) -> Self { + Self { bytes } + } +} + +#[Object] +impl DaCompressedBlock { + async fn bytes(&self) -> HexString { + HexString(self.bytes.clone()) + } +} + +#[derive(Default)] +pub struct DaCompressedBlockQuery; + +#[Object] +impl DaCompressedBlockQuery { + #[graphql(complexity = "QUERY_COSTS.da_compressed_block_read")] + async fn da_compressed_block( + &self, + ctx: &Context<'_>, + #[graphql(desc = "Height of the block")] height: U32, + ) -> async_graphql::Result> { + let query = ctx.read_view()?; + query + .da_compressed_block(&height.0.into()) + .into_api_result() + } +} diff --git a/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs b/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs index fdd1d3183bc..092c83da438 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs @@ -11,6 +11,7 @@ use crate::{ }, storage::{ contracts::ContractsInfo, + da_compression::DaCompressedBlocks, relayed_transactions::RelayedTransactionStatuses, transactions::OwnedTransactionIndexCursor, }, @@ -22,13 +23,17 @@ use crate::{ }, }; use fuel_core_storage::{ + blueprint::BlueprintInspect, + codec::Encode, iter::{ BoxedIter, IntoBoxedIter, IterDirection, IteratorOverTable, }, + kv_store::KeyValueInspect, not_found, + structured_storage::TableWithBlueprint, transactional::{ IntoTransaction, StorageTransaction, @@ -69,6 +74,19 @@ impl OffChainDatabase for OffChainIterableKeyValueView { .and_then(|height| height.ok_or(not_found!("BlockHeight"))) } + fn da_compressed_block(&self, height: &BlockHeight) -> StorageResult> { + let column = ::column(); + let encoder = + <::Blueprint as BlueprintInspect< + DaCompressedBlocks, + Self, + >>::KeyCodec::encode(height); + + self.get(encoder.as_ref(), column)? + .ok_or_else(|| not_found!(DaCompressedBlocks)) + .map(|value| value.as_ref().clone()) + } + fn tx_status(&self, tx_id: &TxId) -> StorageResult { self.get_tx_status(tx_id) .transpose() diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index 5ceb8c1f663..06a94448dca 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -31,7 +31,10 @@ use fuel_core_types::blockchain::header::StateTransitionBytecodeVersion; use crate::{ combined_database::CombinedDatabaseConfig, - graphql_api::ServiceConfig as GraphQLConfig, + graphql_api::{ + worker_service::DaCompressionConfig, + ServiceConfig as GraphQLConfig, + }, }; #[derive(Clone, Debug)] @@ -57,6 +60,7 @@ pub struct Config { pub gas_price_change_percent: u64, pub min_gas_price: u64, pub gas_price_threshold_percent: u64, + pub da_compression: DaCompressionConfig, pub block_importer: fuel_core_importer::Config, #[cfg(feature = "relayer")] pub relayer: Option, @@ -158,6 +162,7 @@ impl Config { block_producer: fuel_core_producer::Config { ..Default::default() }, + da_compression: DaCompressionConfig::Disabled, starting_gas_price, gas_price_change_percent, min_gas_price, diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 93686950f21..82864d73fd9 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -289,6 +289,7 @@ pub fn init_sub_services( database.on_chain().clone(), database.off_chain().clone(), chain_id, + config.da_compression.clone(), config.continue_on_error, ); diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index fbb71e24327..8750f3ae8b0 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -28,7 +28,7 @@ impl-tools = "0.10" itertools = { workspace = true, features = ["use_alloc"] } mockall = { workspace = true, optional = true } num_enum = { workspace = true } -paste = "1" +paste = { workspace = true } postcard = { workspace = true, features = ["alloc"] } primitive-types = { workspace = true, default-features = false } rand = { workspace = true, optional = true } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index ee971992a06..8fd5a0a9b80 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -70,7 +70,7 @@ pub enum Error { #[display(fmt = "error occurred in the underlying datastore `{_0:?}`")] DatabaseError(Box), /// This error should be created with `not_found` macro. - #[display(fmt = "resource of type `{_0}` was not found at the: {_1}")] + #[display(fmt = "resource was not found in table `{_0}` at the: {_1}")] NotFound(&'static str, &'static str), // TODO: Do we need this type at all? /// Unknown or not expected(by architecture) error. @@ -194,7 +194,7 @@ macro_rules! not_found { }; ($ty: path) => { $crate::Error::NotFound( - ::core::any::type_name::<<$ty as $crate::Mappable>::OwnedValue>(), + ::core::any::type_name::<$ty>(), concat!(file!(), ":", line!()), ) }; @@ -209,12 +209,12 @@ mod test { #[rustfmt::skip] assert_eq!( format!("{}", not_found!("BlockId")), - format!("resource of type `BlockId` was not found at the: {}:{}", file!(), line!() - 1) + format!("resource was not found in table `BlockId` at the: {}:{}", file!(), line!() - 1) ); #[rustfmt::skip] assert_eq!( format!("{}", not_found!(Coins)), - format!("resource of type `fuel_core_types::entities::coins::coin::CompressedCoin` was not found at the: {}:{}", file!(), line!() - 1) + format!("resource was not found in table `fuel_core_storage::tables::Coins` at the: {}:{}", file!(), line!() - 1) ); } } diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index e889d03c8ba..58b9268e6c1 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -34,6 +34,7 @@ zeroize = "1.5" default = ["std"] alloc = ["fuel-vm-private/alloc"] serde = ["dep:serde", "fuel-vm-private/serde"] +da-compression = ["fuel-vm-private/da-compression"] std = ["alloc", "fuel-vm-private/std", "bs58"] random = ["dep:rand", "fuel-vm-private/random"] test-helpers = ["random", "fuel-vm-private/test-helpers"] diff --git a/crates/types/src/blockchain/block.rs b/crates/types/src/blockchain/block.rs index d89a5e03682..7611082f232 100644 --- a/crates/types/src/blockchain/block.rs +++ b/crates/types/src/blockchain/block.rs @@ -59,7 +59,8 @@ pub struct BlockV1 { transactions: Vec, } -/// Compressed version of the fuel `Block`. +/// Fuel `Block` with transactions represented by their id only. +/// Note that this is different from the DA compressed blocks. pub type CompressedBlock = Block; /// Fuel block with all transaction data included @@ -68,7 +69,7 @@ pub type CompressedBlock = Block; /// transactions to produce a [`Block`] or /// it can be created with pre-executed transactions in /// order to validate they were constructed correctly. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct PartialFuelBlock { /// The partial header. pub header: PartialBlockHeader, @@ -121,6 +122,7 @@ impl Block { } /// Compresses the fuel block and replaces transactions with hashes. + /// Note that this is different from the DA compression process. pub fn compress(&self, chain_id: &ChainId) -> CompressedBlock { match self { Block::V1(inner) => { @@ -184,6 +186,13 @@ impl Block { } } + /// Get the executed transactions as a `Vec` type. + pub fn transactions_vec(&self) -> &Vec { + match self { + Block::V1(inner) => &inner.transactions, + } + } + /// Get the complete header. pub fn header(&self) -> &BlockHeader { match self { diff --git a/crates/types/src/blockchain/header.rs b/crates/types/src/blockchain/header.rs index eecf4dbbaee..e487502c2bc 100644 --- a/crates/types/src/blockchain/header.rs +++ b/crates/types/src/blockchain/header.rs @@ -148,9 +148,9 @@ impl BlockHeader { } } -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -#[cfg_attr(any(test, feature = "test-helpers"), derive(Default))] +#[derive(Default)] /// A partially complete fuel block header that does not /// have any generated fields because it has not been executed yet. pub struct PartialBlockHeader { @@ -188,7 +188,6 @@ pub struct ApplicationHeader { pub generated: Generated, } -#[cfg(any(test, feature = "test-helpers"))] impl Default for ApplicationHeader where Generated: Default, @@ -543,7 +542,6 @@ impl ConsensusHeader { } } -#[cfg(any(test, feature = "test-helpers"))] impl Default for ConsensusHeader where T: Default, diff --git a/crates/types/src/blockchain/primitives.rs b/crates/types/src/blockchain/primitives.rs index 768c28ced91..6d98e63e75d 100644 --- a/crates/types/src/blockchain/primitives.rs +++ b/crates/types/src/blockchain/primitives.rs @@ -28,9 +28,9 @@ use zeroize::Zeroize; #[cfg(feature = "alloc")] use alloc::vec::Vec; -#[derive(Clone, Copy, Debug, Default)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] /// Empty generated fields. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct Empty; /// A cryptographically secure hash, identifying a block. diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs index 85e38aa8767..1f3d40ac444 100644 --- a/crates/types/src/lib.rs +++ b/crates/types/src/lib.rs @@ -12,6 +12,9 @@ #[cfg(feature = "alloc")] extern crate alloc; +#[doc(no_inline)] +#[cfg(feature = "da-compression")] +pub use fuel_vm_private::fuel_compression; #[doc(no_inline)] pub use fuel_vm_private::{ fuel_asm, diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 2d60439028f..57f0fe20d9b 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -34,6 +34,7 @@ fuel-core = { path = "../crates/fuel-core", default-features = false, features = fuel-core-benches = { path = "../benches" } fuel-core-bin = { path = "../bin/fuel-core", features = ["parquet", "p2p"] } fuel-core-client = { path = "../crates/client", features = ["test-helpers"] } +fuel-core-compression = { path = "../crates/compression" } fuel-core-executor = { workspace = true } fuel-core-gas-price-service = { path = "../crates/services/gas_price_service" } fuel-core-p2p = { path = "../crates/services/p2p", features = [ @@ -56,6 +57,7 @@ hyper = { workspace = true, features = ["server"] } insta = { workspace = true } itertools = { workspace = true } k256 = { version = "0.13.3", features = ["ecdsa-core"] } +postcard = { workspace = true } primitive-types = { workspace = true, default-features = false } rand = { workspace = true } reqwest = { workspace = true } diff --git a/tests/tests/da_compression.rs b/tests/tests/da_compression.rs new file mode 100644 index 00000000000..1aadd50728e --- /dev/null +++ b/tests/tests/da_compression.rs @@ -0,0 +1,118 @@ +use core::time::Duration; +use fuel_core::{ + combined_database::CombinedDatabase, + fuel_core_graphql_api::worker_service::DaCompressionConfig, + p2p_test_helpers::*, + service::{ + Config, + FuelService, + }, +}; +use fuel_core_client::client::{ + types::TransactionStatus, + FuelClient, +}; +use fuel_core_compression::VersionedCompressedBlock; +use fuel_core_poa::signer::SignMode; +use fuel_core_types::{ + fuel_asm::{ + op, + RegId, + }, + fuel_crypto::SecretKey, + fuel_tx::{ + GasCosts, + Input, + TransactionBuilder, + }, + secrecy::Secret, +}; +use rand::{ + rngs::StdRng, + SeedableRng, +}; + +#[tokio::test] +async fn can_fetch_da_compressed_block_from_graphql() { + let mut rng = StdRng::seed_from_u64(10); + let poa_secret = SecretKey::random(&mut rng); + + let db = CombinedDatabase::default(); + let mut config = Config::local_node(); + config.consensus_signer = SignMode::Key(Secret::new(poa_secret.into())); + let compression_config = fuel_core_compression::Config { + temporal_registry_retention: Duration::from_secs(3600), + }; + config.da_compression = DaCompressionConfig::Enabled(compression_config); + let srv = FuelService::from_combined_database(db.clone(), config) + .await + .unwrap(); + let client = FuelClient::from(srv.bound_address); + + let tx = + TransactionBuilder::script([op::ret(RegId::ONE)].into_iter().collect(), vec![]) + .max_fee_limit(0) + .script_gas_limit(1_000_000) + .with_gas_costs(GasCosts::free()) + .add_random_fee_input() + .finalize_as_transaction(); + + let status = client.submit_and_await_commit(&tx).await.unwrap(); + + let block_height = match status { + TransactionStatus::Success { block_height, .. } => block_height, + other => { + panic!("unexpected result {other:?}") + } + }; + + let block = client.da_compressed_block(block_height).await.unwrap(); + let block = block.expect("Unable to get compressed block"); + let _: VersionedCompressedBlock = postcard::from_bytes(&block).unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +async fn da_compressed_blocks_are_available_from_non_block_producing_nodes() { + let mut rng = StdRng::seed_from_u64(line!() as u64); + + // Create a producer and a validator that share the same key pair. + let secret = SecretKey::random(&mut rng); + let pub_key = Input::owner(&secret.public_key()); + + let mut config = Config::local_node(); + config.da_compression = DaCompressionConfig::Enabled(fuel_core_compression::Config { + temporal_registry_retention: Duration::from_secs(3600), + }); + + let Nodes { + mut producers, + mut validators, + bootstrap_nodes: _dont_drop, + } = make_nodes( + [Some(BootstrapSetup::new(pub_key))], + [Some( + ProducerSetup::new(secret).with_txs(1).with_name("Alice"), + )], + [Some(ValidatorSetup::new(pub_key).with_name("Bob"))], + Some(config), + ) + .await; + + let producer = producers.pop().unwrap(); + let mut validator = validators.pop().unwrap(); + + let v_client = FuelClient::from(validator.node.shared.graph_ql.bound_address); + + // Insert some txs + let expected = producer.insert_txs().await; + validator.consistency_20s(&expected).await; + + let block_height = 1u32.into(); + + let block = v_client + .da_compressed_block(block_height) + .await + .unwrap() + .expect("Compressed block not available from validator"); + let _: VersionedCompressedBlock = postcard::from_bytes(&block).unwrap(); +} diff --git a/tests/tests/lib.rs b/tests/tests/lib.rs index e7a3db5bce9..5337e134358 100644 --- a/tests/tests/lib.rs +++ b/tests/tests/lib.rs @@ -8,6 +8,7 @@ mod chain; mod coin; mod coins; mod contract; +mod da_compression; mod dap; mod debugger; mod dos; diff --git a/tests/tests/node_info.rs b/tests/tests/node_info.rs index f84b767e4df..4dda549bb86 100644 --- a/tests/tests/node_info.rs +++ b/tests/tests/node_info.rs @@ -92,8 +92,18 @@ async fn test_peer_info() { // This is just a mock of what we should be able to do with GQL API. let client = producer.node.bound_address; let client = FuelClient::from(client); - let peers = client.connected_peers_info().await.unwrap(); - assert_eq!(peers.len(), 2); + let mut peers; + + // It takes some time before all validators are connected. + loop { + peers = client.connected_peers_info().await.unwrap(); + + if peers.len() == 2 { + break; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + let info = peers .iter() .find(|info| info.id.to_string() == validator_peer_id.to_base58())