diff --git a/internal/mithril-persistence/src/database/repository/cardano_transaction_repository.rs b/internal/mithril-persistence/src/database/repository/cardano_transaction_repository.rs index 034a509a3e5..c73e36f3712 100644 --- a/internal/mithril-persistence/src/database/repository/cardano_transaction_repository.rs +++ b/internal/mithril-persistence/src/database/repository/cardano_transaction_repository.rs @@ -1,5 +1,6 @@ use std::ops::Range; use std::sync::Arc; +use std::time::Duration; use anyhow::Context; use async_trait::async_trait; @@ -10,6 +11,7 @@ use mithril_common::entities::{ BlockHash, BlockNumber, BlockRange, CardanoTransaction, ChainPoint, ImmutableFileNumber, SlotNumber, TransactionHash, }; +use mithril_common::resource_pool::ResourcePool; use mithril_common::signable_builder::BlockRangeRootRetriever; use mithril_common::StdResult; @@ -19,25 +21,31 @@ use crate::database::query::{ InsertCardanoTransactionQuery, }; use crate::database::record::{BlockRangeRootRecord, CardanoTransactionRecord}; -use crate::sqlite::{ConnectionExtensions, SqliteConnection}; +use crate::sqlite::{ConnectionExtensions, SqliteConnection, SqliteConnectionWrapper}; /// ## Cardano transaction repository /// /// This is a business oriented layer to perform actions on the database through /// queries. pub struct CardanoTransactionRepository { - connection: Arc, + connection_pool: Arc>, } impl CardanoTransactionRepository { /// Instantiate service - pub fn new(connection: Arc) -> Self { - Self { connection } + pub fn new(connection_pool: Arc>) -> Self { + Self { connection_pool } + } + + fn connection(&self) -> StdResult> { + let timeout = Duration::from_millis(1000); + let connection = self.connection_pool.acquire_resource(timeout)?; + Ok((*connection).clone()) } /// Return all the [CardanoTransactionRecord]s in the database. pub async fn get_all_transactions(&self) -> StdResult> { - self.connection + self.connection()? .fetch_collect(GetCardanoTransactionQuery::all()) } @@ -47,7 +55,7 @@ impl CardanoTransactionRepository { &self, range: Range, ) -> StdResult> { - self.connection + self.connection()? .fetch_collect(GetCardanoTransactionQuery::between_blocks(range)) } @@ -56,7 +64,7 @@ impl CardanoTransactionRepository { &self, transaction_hash: T, ) -> StdResult> { - self.connection + self.connection()? .fetch_first(GetCardanoTransactionQuery::by_transaction_hash( &transaction_hash.into(), )) @@ -79,7 +87,7 @@ impl CardanoTransactionRepository { immutable_file_number, })?; - self.connection.fetch_first(query) + self.connection()?.fetch_first(query) } /// Create new [CardanoTransactionRecord]s in the database. @@ -90,7 +98,7 @@ impl CardanoTransactionRepository { let records: Vec = transactions.into_iter().map(|tx| tx.into()).collect(); - self.connection + self.connection()? .fetch_collect(InsertCardanoTransactionQuery::insert_many(records)?) } @@ -102,14 +110,14 @@ impl CardanoTransactionRepository { let records: Vec = block_ranges.into_iter().map(|tx| tx.into()).collect(); - self.connection + self.connection()? .fetch_collect(InsertBlockRangeRootQuery::insert_many(records)?) } /// Get the highest [ChainPoint] of the cardano transactions stored in the database. pub async fn get_transaction_highest_chain_point(&self) -> StdResult> { let first_transaction_with_highest_block_number = self - .connection + .connection()? .fetch_first(GetCardanoTransactionQuery::with_highest_block_number())?; Ok(first_transaction_with_highest_block_number.map(|record| { @@ -122,7 +130,7 @@ impl CardanoTransactionRepository { &self, ) -> StdResult> { let highest: Option = self - .connection + .connection()? .query_single_cell("select max(start) as highest from block_range_root;", &[])?; highest .map(u64::try_from) @@ -138,30 +146,36 @@ impl CardanoTransactionRepository { end_block_number: BlockNumber, ) -> StdResult + '_>> { let block_range_roots = self - .connection + .connection()? .fetch(GetBlockRangeRootQuery::up_to_block_number(end_block_number))? - .map(|record| -> (BlockRange, MKTreeNode) { record.into() }); + .map(|record| -> (BlockRange, MKTreeNode) { record.into() }) + .collect::>(); // TODO: remove this collect to return the iterator directly - Ok(Box::new(block_range_roots)) + Ok(Box::new(block_range_roots.into_iter())) } /// Retrieve all the [CardanoTransaction] in database. pub async fn get_all(&self) -> StdResult> { - let records = self.connection.fetch(GetCardanoTransactionQuery::all())?; + let records = self + .connection()? + .fetch(GetCardanoTransactionQuery::all())? + .map(|record| record.into()) + .collect(); - Ok(records.map(|record| record.into()).collect()) + Ok(records) } /// Retrieve all the [BlockRangeRootRecord] in database. pub fn get_all_block_range_root(&self) -> StdResult> { - self.connection.fetch_collect(GetBlockRangeRootQuery::all()) + self.connection()? + .fetch_collect(GetBlockRangeRootQuery::all()) } /// Get the highest [ImmutableFileNumber] of the cardano transactions stored in the database. pub async fn get_transaction_highest_immutable_file_number( &self, ) -> StdResult> { - let highest: Option = self.connection.query_single_cell( + let highest: Option = self.connection()?.query_single_cell( "select max(immutable_file_number) as highest from cardano_tx;", &[], )?; @@ -182,7 +196,8 @@ impl CardanoTransactionRepository { ) -> StdResult<()> { const DB_TRANSACTION_SIZE: usize = 100000; for transactions_in_db_transaction_chunk in transactions.chunks(DB_TRANSACTION_SIZE) { - let transaction = self.connection.begin_transaction()?; + let connection = self.connection()?; + let transaction = connection.begin_transaction()?; // Chunk transactions to avoid an error when we exceed sqlite binding limitations for transactions_in_chunk in transactions_in_db_transaction_chunk.chunks(100) { @@ -201,7 +216,7 @@ impl CardanoTransactionRepository { &self, ) -> StdResult>> { let interval = self - .connection + .connection()? .fetch_first(GetIntervalWithoutBlockRangeRootQuery::new())? // Should be impossible - the request as written in the query always returns a single row .unwrap_or_else(|| { @@ -221,7 +236,7 @@ impl CardanoTransactionRepository { hashes.into_iter().map(Into::into).collect(), up_to, ); - self.connection.fetch_collect(query) + self.connection()?.fetch_collect(query) } /// Get the [CardanoTransactionRecord] for the given block ranges. @@ -232,7 +247,7 @@ impl CardanoTransactionRepository { let mut transactions = vec![]; for block_range in block_ranges { let block_range_transactions: Vec = self - .connection + .connection()? .fetch_collect(GetCardanoTransactionQuery::by_block_ranges(vec![ block_range, ]))?; @@ -251,7 +266,7 @@ impl CardanoTransactionRepository { { let threshold = highest_block_range_start.saturating_sub(number_of_blocks_to_keep); let query = DeleteCardanoTransactionQuery::below_block_number_threshold(threshold)?; - self.connection.fetch_first(query)?; + self.connection()?.fetch_first(query)?; } Ok(()) @@ -292,7 +307,11 @@ mod tests { #[tokio::test] async fn repository_create_and_get_transaction() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); repository .create_transactions(vec![ CardanoTransaction::new("tx_hash-123", 10, 50, "block_hash-123", 99), @@ -323,7 +342,11 @@ mod tests { #[tokio::test] async fn repository_get_transaction_by_hashes() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); repository .create_transactions(vec![ CardanoTransactionRecord::new("tx_hash-123", 10, 50, "block_hash-123", 1234), @@ -390,7 +413,11 @@ mod tests { #[tokio::test] async fn repository_create_ignore_further_transactions_when_exists() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); repository .create_transaction("tx-hash-123", 10, 50, "block_hash-123", 99) .await @@ -416,7 +443,11 @@ mod tests { #[tokio::test] async fn repository_store_transactions_and_get_stored_transactions() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); let cardano_transactions = vec![ CardanoTransaction::new("tx-hash-123", 10, 50, "block-hash-123", 99), @@ -457,7 +488,11 @@ mod tests { #[tokio::test] async fn repository_get_all_stored_transactions() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); let cardano_transactions = vec![ CardanoTransaction::new("tx-hash-123".to_string(), 10, 50, "block-hash-123", 99), @@ -480,7 +515,11 @@ mod tests { #[tokio::test] async fn repository_store_transactions_doesnt_erase_existing_data() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); repository .create_transaction("tx-hash-000", 1, 5, "block-hash", 9) @@ -516,7 +555,11 @@ mod tests { #[tokio::test] async fn repository_get_transaction_highest_chain_point_without_transactions_in_db() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); let highest_beacon = repository .get_transaction_highest_chain_point() @@ -528,7 +571,11 @@ mod tests { #[tokio::test] async fn repository_get_transaction_highest_chain_point_with_transactions_in_db() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); let cardano_transactions = vec![ CardanoTransaction::new("tx-hash-123", 10, 50, "block-hash-10", 50), @@ -557,7 +604,11 @@ mod tests { async fn repository_get_transaction_highest_chain_point_with_transactions_with_same_block_number_in_db( ) { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); let cardano_transactions = vec![ CardanoTransaction::new("tx-hash-123", 10, 50, "block-hash-10", 50), @@ -586,7 +637,11 @@ mod tests { #[tokio::test] async fn repository_get_transaction_highest_immutable_file_number_without_transactions_in_db() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); let highest_beacon = repository .get_transaction_highest_immutable_file_number() @@ -598,7 +653,11 @@ mod tests { #[tokio::test] async fn repository_get_transaction_highest_immutable_file_number_with_transactions_in_db() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); let cardano_transactions = vec![ CardanoTransaction::new("tx-hash-123".to_string(), 10, 50, "block-hash-123", 50), @@ -619,7 +678,11 @@ mod tests { #[tokio::test] async fn repository_get_transactions_in_range_blocks() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); let transactions = vec![ CardanoTransactionRecord::new("tx-hash-1", 10, 50, "block-hash-1", 99), @@ -671,7 +734,11 @@ mod tests { #[tokio::test] async fn repository_get_block_interval_without_block_range_root() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); // The last block range give the lower bound let last_block_range = BlockRange::from_block_number(0); @@ -704,7 +771,11 @@ mod tests { #[tokio::test] async fn repository_get_transactions_by_block_ranges() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); let transactions = vec![ CardanoTransactionRecord::new("tx-hash-1", 10, 50, "block-hash-1", 99), @@ -761,7 +832,11 @@ mod tests { #[tokio::test] async fn repository_store_block_range() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection.clone()); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection.clone())], + )); + let repository = CardanoTransactionRepository::new(connection_pool); repository .create_block_range_roots(vec![ @@ -798,7 +873,11 @@ mod tests { #[tokio::test] async fn repository_store_block_range_with_existing_hash_doesnt_erase_existing_data() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection.clone()); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection.clone())], + )); + let repository = CardanoTransactionRepository::new(connection_pool); let range = BlockRange::from_block_number(0); repository @@ -825,7 +904,11 @@ mod tests { #[tokio::test] async fn repository_retrieve_block_range_roots_up_to() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); let block_range_roots = vec![ ( BlockRange::from_block_number(15), @@ -894,7 +977,11 @@ mod tests { #[tokio::test] async fn repository_prune_transactions() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); let cardano_transactions: Vec = CardanoTransactionsBuilder::new() .blocks_per_block_range(15) @@ -944,7 +1031,11 @@ mod tests { #[tokio::test] async fn get_highest_start_block_number_for_block_range_roots() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); let highest = repository .get_highest_start_block_number_for_block_range_roots() @@ -977,7 +1068,11 @@ mod tests { #[tokio::test] async fn find_block_scanner_lower_bound() { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); let cardano_transactions = vec![ CardanoTransaction::new("tx-hash-123".to_string(), 10, 50, "block-hash-123", 50), diff --git a/internal/mithril-persistence/src/sqlite/mod.rs b/internal/mithril-persistence/src/sqlite/mod.rs index 191dde3601e..5eb355bbac0 100644 --- a/internal/mithril-persistence/src/sqlite/mod.rs +++ b/internal/mithril-persistence/src/sqlite/mod.rs @@ -12,6 +12,8 @@ mod query; mod source_alias; mod transaction; +use std::{ops::Deref, sync::Arc}; + pub use condition::{GetAllCondition, WhereCondition}; pub use connection_builder::{ConnectionBuilder, ConnectionOptions}; pub use connection_extensions::ConnectionExtensions; @@ -22,7 +24,7 @@ pub use query::Query; pub use source_alias::SourceAlias; pub use transaction::Transaction; -use mithril_common::StdResult; +use mithril_common::{resource_pool::Reset, StdResult}; use sqlite::ConnectionThreadSafe; /// Type of the connection used in Mithril @@ -36,6 +38,26 @@ pub async fn vacuum_database(connection: &SqliteConnection) -> StdResult<()> { Ok(()) } +/// SqliteConnection wrapper +pub struct SqliteConnectionWrapper(Arc); + +impl SqliteConnectionWrapper { + /// Create a new SqliteConnectionWrapper + pub fn new(connection: Arc) -> Self { + Self(connection) + } +} + +impl Deref for SqliteConnectionWrapper { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Reset for SqliteConnectionWrapper {} + #[cfg(test)] mod test { use crate::sqlite::vacuum_database; diff --git a/mithril-aggregator/benches/cardano_transactions_get.rs b/mithril-aggregator/benches/cardano_transactions_get.rs index 5a1e47bfd23..1483d5f0724 100644 --- a/mithril-aggregator/benches/cardano_transactions_get.rs +++ b/mithril-aggregator/benches/cardano_transactions_get.rs @@ -1,12 +1,13 @@ use std::sync::Arc; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use mithril_common::resource_pool::ResourcePool; use sqlite::ConnectionThreadSafe; use mithril_aggregator::services::TransactionStore; use mithril_common::{entities::CardanoTransaction, test_utils::TempDir}; use mithril_persistence::database::repository::CardanoTransactionRepository; -use mithril_persistence::sqlite::ConnectionBuilder; +use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnectionWrapper}; fn cardano_tx_db_connection(db_file_name: &str) -> ConnectionThreadSafe { let db_path = @@ -47,7 +48,11 @@ async fn init_db(nb_transaction_in_db: usize) -> CardanoTransactionRepository { let connection = Arc::new(cardano_tx_db_connection(&format!( "cardano_tx-{nb_transaction_in_db}.db", ))); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); repository.store_transactions(transactions).await.unwrap(); repository diff --git a/mithril-aggregator/benches/cardano_transactions_import.rs b/mithril-aggregator/benches/cardano_transactions_import.rs index 75621c0e921..f79950f1a59 100644 --- a/mithril-aggregator/benches/cardano_transactions_import.rs +++ b/mithril-aggregator/benches/cardano_transactions_import.rs @@ -1,10 +1,11 @@ use criterion::{criterion_group, criterion_main, Criterion}; +use mithril_common::resource_pool::ResourcePool; use sqlite::ConnectionThreadSafe; use std::sync::Arc; use mithril_common::{entities::CardanoTransaction, test_utils::TempDir}; use mithril_persistence::database::repository::CardanoTransactionRepository; -use mithril_persistence::sqlite::ConnectionBuilder; +use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnectionWrapper}; fn cardano_tx_db_connection() -> ConnectionThreadSafe { let db_path = @@ -45,7 +46,11 @@ fn bench_store_transactions(c: &mut Criterion) { group.bench_function("store_transactions", |bencher| { bencher.to_async(&runtime).iter(|| async { let connection = Arc::new(cardano_tx_db_connection()); - let repository = CardanoTransactionRepository::new(connection); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool); repository.store_transactions(transactions.clone()).await }); }); diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index a893ebf9b88..f08c0608aab 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -29,6 +29,7 @@ use mithril_common::{ adapters::{EraReaderAdapterBuilder, EraReaderDummyAdapter}, EraChecker, EraMarker, EraReader, EraReaderAdapter, SupportedEra, }, + resource_pool::ResourcePool, signable_builder::{ CardanoImmutableFilesFullSignableBuilder, CardanoTransactionsSignableBuilder, MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder, @@ -38,7 +39,7 @@ use mithril_common::{ }; use mithril_persistence::{ database::{repository::CardanoTransactionRepository, ApplicationNodeType, SqlMigration}, - sqlite::{ConnectionBuilder, ConnectionOptions, SqliteConnection}, + sqlite::{ConnectionBuilder, ConnectionOptions, SqliteConnection, SqliteConnectionWrapper}, store::adapter::{MemoryAdapter, SQLiteAdapter, StoreAdapter}, }; @@ -96,7 +97,11 @@ pub struct DependenciesBuilder { pub sqlite_connection: Option>, /// Cardano transactions SQLite database connection - pub transaction_sqlite_connection: Option>, + pub sqlite_connection_cardano_transaction: Option>, + + /// Cardano transactions SQLite database connection pool + pub sqlite_connection_cardano_transaction_pool: + Option>>, /// Stake Store used by the StakeDistributionService /// It shall be a private dependency. @@ -218,7 +223,8 @@ impl DependenciesBuilder { configuration, signed_entity_config: None, sqlite_connection: None, - transaction_sqlite_connection: None, + sqlite_connection_cardano_transaction: None, + sqlite_connection_cardano_transaction_pool: None, stake_store: None, snapshot_uploader: None, multi_signer: None, @@ -309,7 +315,7 @@ impl DependenciesBuilder { let _ = connection.execute("pragma analysis_limit=400; pragma optimize;"); } - if let Some(connection) = &self.transaction_sqlite_connection { + if let Some(connection) = &self.sqlite_connection_cardano_transaction { let _ = connection.execute("pragma analysis_limit=400; pragma optimize;"); } } @@ -333,8 +339,8 @@ impl DependenciesBuilder { pub async fn get_sqlite_connection_cardano_transaction( &mut self, ) -> Result> { - if self.transaction_sqlite_connection.is_none() { - self.transaction_sqlite_connection = Some( + if self.sqlite_connection_cardano_transaction.is_none() { + self.sqlite_connection_cardano_transaction = Some( self.build_sqlite_connection( SQLITE_FILE_CARDANO_TRANSACTION, mithril_persistence::database::cardano_transaction_migration::get_migrations(), @@ -344,7 +350,36 @@ impl DependenciesBuilder { } Ok(self - .transaction_sqlite_connection + .sqlite_connection_cardano_transaction + .as_ref() + .cloned() + .unwrap()) + } + + /// Get SQLite connection pool for the cardano transactions store + pub async fn get_sqlite_connection_cardano_transaction_pool( + &mut self, + ) -> Result>> { + let connection_pool_size = 10; // TODO: to be configured + if self.sqlite_connection_cardano_transaction_pool.is_none() { + let mut connections = vec![]; + for _ in 0..connection_pool_size { + let connection = self + .build_sqlite_connection( + SQLITE_FILE_CARDANO_TRANSACTION, + mithril_persistence::database::cardano_transaction_migration::get_migrations(), + ) + .await?; + connections.push(SqliteConnectionWrapper::new(connection)); + } + self.sqlite_connection_cardano_transaction_pool = Some(Arc::new(ResourcePool::new( + connection_pool_size, + connections, + ))); + } + + Ok(self + .sqlite_connection_cardano_transaction_pool .as_ref() .cloned() .unwrap()) @@ -667,7 +702,8 @@ impl DependenciesBuilder { async fn build_transaction_repository(&mut self) -> Result> { let transaction_store = CardanoTransactionRepository::new( - self.get_sqlite_connection_cardano_transaction().await?, + self.get_sqlite_connection_cardano_transaction_pool() + .await?, ); Ok(Arc::new(transaction_store)) @@ -1154,7 +1190,12 @@ impl DependenciesBuilder { config: self.configuration.clone(), signed_entity_config: self.get_signed_entity_config()?, sqlite_connection: self.get_sqlite_connection().await?, - sqlite_connection_transaction: self.get_sqlite_connection_cardano_transaction().await?, + sqlite_connection_cardano_transaction: self + .get_sqlite_connection_cardano_transaction() + .await?, + sqlite_connection_cardano_transaction_pool: self + .get_sqlite_connection_cardano_transaction_pool() + .await?, stake_store: self.get_stake_store().await?, snapshot_uploader: self.get_snapshot_uploader().await?, multi_signer: self.get_multi_signer().await?, diff --git a/mithril-aggregator/src/dependency_injection/containers.rs b/mithril-aggregator/src/dependency_injection/containers.rs index 7a59d148461..2818577c912 100644 --- a/mithril-aggregator/src/dependency_injection/containers.rs +++ b/mithril-aggregator/src/dependency_injection/containers.rs @@ -1,3 +1,5 @@ +use mithril_common::resource_pool::ResourcePool; +use mithril_persistence::sqlite::SqliteConnectionWrapper; use std::sync::Arc; use tokio::sync::RwLock; @@ -56,7 +58,10 @@ pub struct DependencyContainer { pub sqlite_connection: Arc, /// SQLite database connection for Cardano transactions - pub sqlite_connection_transaction: Arc, + pub sqlite_connection_cardano_transaction: Arc, + + /// Cardano transactions SQLite database connection pool + pub sqlite_connection_cardano_transaction_pool: Arc>, /// Stake Store used by the StakeDistributionService /// It shall be a private dependency. diff --git a/mithril-aggregator/src/services/cardano_transactions_importer.rs b/mithril-aggregator/src/services/cardano_transactions_importer.rs index 77e2c7f04a7..31342533947 100644 --- a/mithril-aggregator/src/services/cardano_transactions_importer.rs +++ b/mithril-aggregator/src/services/cardano_transactions_importer.rs @@ -174,6 +174,8 @@ impl TransactionsImporter for CardanoTransactionsImporter { #[cfg(test)] mod tests { + use mithril_common::resource_pool::ResourcePool; + use mithril_persistence::sqlite::SqliteConnectionWrapper; use mockall::mock; use mithril_common::cardano_block_scanner::{ @@ -250,7 +252,11 @@ mod tests { #[tokio::test] async fn if_nothing_stored_parse_and_store_all_transactions() { let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); let blocks = vec![ ScannedBlock::new("block_hash-1", 10, 15, 11, vec!["tx_hash-1", "tx_hash-2"]), @@ -280,7 +286,11 @@ mod tests { #[tokio::test] async fn if_nothing_stored_parse_and_store_all_block_ranges() { let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); let blocks = build_blocks(0, BlockRange::LENGTH * 5 + 1); let transactions = into_transactions(&blocks); @@ -315,7 +325,11 @@ mod tests { #[tokio::test] async fn if_theres_gap_between_two_stored_block_ranges_it_can_still_compute_their_root() { let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); // Two block ranges with a gap let blocks: Vec = [ @@ -352,7 +366,11 @@ mod tests { #[tokio::test] async fn if_all_block_ranges_computed_nothing_computed_and_stored() { let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); let importer = CardanoTransactionsImporter::new_for_test( Arc::new(MockBlockScannerImpl::new()), @@ -375,7 +393,11 @@ mod tests { async fn if_all_transactions_stored_nothing_is_parsed_and_stored() { let up_to_block_number = 12; let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); let scanner = DumbBlockScanner::new(vec![ ScannedBlock::new("block_hash-1", 10, 15, 10, vec!["tx_hash-1", "tx_hash-2"]), ScannedBlock::new("block_hash-2", 20, 25, 11, vec!["tx_hash-3", "tx_hash-4"]), @@ -402,7 +424,11 @@ mod tests { #[tokio::test] async fn if_half_transactions_are_already_stored_the_other_half_is_parsed_and_stored() { let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); let highest_stored_chain_point = ChainPoint::new(134, 10, "block_hash-1"); let stored_block = ScannedBlock::new( @@ -457,7 +483,11 @@ mod tests { #[tokio::test] async fn if_half_block_ranges_are_stored_the_other_half_is_computed_and_stored() { let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); let blocks = build_blocks(0, BlockRange::LENGTH * 4 + 1); let transactions = into_transactions(&blocks); @@ -565,7 +595,11 @@ mod tests { #[tokio::test] async fn compute_block_range_merkle_root() { let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); // 2 block ranges worth of blocks with one more block that should be ignored for merkle root computation let blocks = build_blocks(0, BlockRange::LENGTH * 2 + 1); @@ -617,10 +651,14 @@ mod tests { let (importer, repository) = { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = Arc::new(CardanoTransactionRepository::new(connection.clone())); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool.clone())); let importer = CardanoTransactionsImporter::new_for_test( Arc::new(DumbBlockScanner::new(blocks.clone())), - Arc::new(CardanoTransactionRepository::new(connection.clone())), + Arc::new(CardanoTransactionRepository::new(connection_pool.clone())), ); (importer, repository) }; diff --git a/mithril-signer/src/cardano_transactions_importer.rs b/mithril-signer/src/cardano_transactions_importer.rs index b853a9146ea..89892bc11ce 100644 --- a/mithril-signer/src/cardano_transactions_importer.rs +++ b/mithril-signer/src/cardano_transactions_importer.rs @@ -174,6 +174,8 @@ impl TransactionsImporter for CardanoTransactionsImporter { #[cfg(test)] mod tests { + use mithril_common::resource_pool::ResourcePool; + use mithril_persistence::sqlite::SqliteConnectionWrapper; use mockall::mock; use mithril_common::cardano_block_scanner::{ @@ -250,7 +252,11 @@ mod tests { #[tokio::test] async fn if_nothing_stored_parse_and_store_all_transactions() { let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); let blocks = vec![ ScannedBlock::new("block_hash-1", 10, 15, 11, vec!["tx_hash-1", "tx_hash-2"]), @@ -280,7 +286,11 @@ mod tests { #[tokio::test] async fn if_nothing_stored_parse_and_store_all_block_ranges() { let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); let blocks = build_blocks(0, BlockRange::LENGTH * 5 + 1); let transactions = into_transactions(&blocks); @@ -315,7 +325,11 @@ mod tests { #[tokio::test] async fn if_theres_gap_between_two_stored_block_ranges_it_can_still_compute_their_root() { let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); // Two block ranges with a gap let blocks: Vec = [ @@ -352,7 +366,11 @@ mod tests { #[tokio::test] async fn if_all_block_ranges_computed_nothing_computed_and_stored() { let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); let importer = CardanoTransactionsImporter::new_for_test( Arc::new(MockBlockScannerImpl::new()), @@ -375,7 +393,11 @@ mod tests { async fn if_all_transactions_stored_nothing_is_parsed_and_stored() { let up_to_block_number = 12; let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); let scanner = DumbBlockScanner::new(vec![ ScannedBlock::new("block_hash-1", 10, 15, 10, vec!["tx_hash-1", "tx_hash-2"]), ScannedBlock::new("block_hash-2", 20, 25, 11, vec!["tx_hash-3", "tx_hash-4"]), @@ -402,7 +424,11 @@ mod tests { #[tokio::test] async fn if_half_transactions_are_already_stored_the_other_half_is_parsed_and_stored() { let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); let highest_stored_chain_point = ChainPoint::new(134, 10, "block_hash-1"); let stored_block = ScannedBlock::new( @@ -457,7 +483,11 @@ mod tests { #[tokio::test] async fn if_half_block_ranges_are_stored_the_other_half_is_computed_and_stored() { let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); let blocks = build_blocks(0, BlockRange::LENGTH * 4 + 1); let transactions = into_transactions(&blocks); @@ -565,7 +595,11 @@ mod tests { #[tokio::test] async fn compute_block_range_merkle_root() { let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection))); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(Arc::new(connection))], + )); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); // 2 block ranges worth of blocks with one more block that should be ignored for merkle root computation let blocks = build_blocks(0, BlockRange::LENGTH * 2 + 1); @@ -617,10 +651,14 @@ mod tests { let (importer, repository) = { let connection = Arc::new(cardano_tx_db_connection().unwrap()); - let repository = Arc::new(CardanoTransactionRepository::new(connection.clone())); + let connection_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(connection)], + )); + let repository = CardanoTransactionRepository::new(connection_pool.clone()); let importer = CardanoTransactionsImporter::new_for_test( Arc::new(DumbBlockScanner::new(blocks.clone())), - Arc::new(CardanoTransactionRepository::new(connection.clone())), + Arc::new(CardanoTransactionRepository::new(connection_pool)), ); (importer, repository) }; diff --git a/mithril-signer/src/runtime/signer_services.rs b/mithril-signer/src/runtime/signer_services.rs index 3a2b6d42d3e..5dd1614e164 100644 --- a/mithril-signer/src/runtime/signer_services.rs +++ b/mithril-signer/src/runtime/signer_services.rs @@ -13,6 +13,7 @@ use mithril_common::{ ImmutableFileSystemObserver, }, era::{EraChecker, EraReader}, + resource_pool::ResourcePool, signable_builder::{ CardanoImmutableFilesFullSignableBuilder, CardanoTransactionsSignableBuilder, MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder, @@ -22,7 +23,7 @@ use mithril_common::{ }; use mithril_persistence::{ database::{repository::CardanoTransactionRepository, ApplicationNodeType, SqlMigration}, - sqlite::{ConnectionBuilder, SqliteConnection}, + sqlite::{ConnectionBuilder, SqliteConnection, SqliteConnectionWrapper}, store::{adapter::SQLiteAdapter, StakeStore}, }; @@ -200,6 +201,10 @@ impl<'a> ServiceBuilder for ProductionServiceBuilder<'a> { mithril_persistence::database::cardano_transaction_migration::get_migrations(), ) .await?; + let sqlite_connection_cardano_transaction_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(transaction_sqlite_connection)], + )); let protocol_initializer_store = Arc::new(ProtocolInitializerStore::new( Box::new(SQLiteAdapter::new( @@ -258,7 +263,7 @@ impl<'a> ServiceBuilder for ProductionServiceBuilder<'a> { let mithril_stake_distribution_signable_builder = Arc::new(MithrilStakeDistributionSignableBuilder::default()); let transaction_store = Arc::new(CardanoTransactionRepository::new( - transaction_sqlite_connection, + sqlite_connection_cardano_transaction_pool, )); let block_scanner = Arc::new(CardanoBlockScanner::new( slog_scope::logger(), diff --git a/mithril-signer/tests/test_extensions/state_machine_tester.rs b/mithril-signer/tests/test_extensions/state_machine_tester.rs index ca7f4b512cb..05994923045 100644 --- a/mithril-signer/tests/test_extensions/state_machine_tester.rs +++ b/mithril-signer/tests/test_extensions/state_machine_tester.rs @@ -12,14 +12,17 @@ use mithril_common::{ digesters::{DumbImmutableDigester, DumbImmutableFileObserver, ImmutableFileObserver}, entities::{ChainPoint, Epoch, SignerWithStake, TimePoint}, era::{adapters::EraReaderDummyAdapter, EraChecker, EraMarker, EraReader, SupportedEra}, + resource_pool::ResourcePool, signable_builder::{ CardanoImmutableFilesFullSignableBuilder, CardanoTransactionsSignableBuilder, MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder, }, MithrilTickerService, StdError, TickerService, }; -use mithril_persistence::database::repository::CardanoTransactionRepository; use mithril_persistence::store::{adapter::MemoryAdapter, StakeStore, StakeStorer}; +use mithril_persistence::{ + database::repository::CardanoTransactionRepository, sqlite::SqliteConnectionWrapper, +}; use mithril_signer::{ metrics::*, AggregatorClient, CardanoTransactionsImporter, Configuration, MetricsService, @@ -86,6 +89,10 @@ impl StateMachineTester { ) .await .unwrap(); + let sqlite_connection_cardano_transaction_pool = Arc::new(ResourcePool::new( + 1, + vec![SqliteConnectionWrapper::new(transaction_sqlite_connection)], + )); let decorator = slog_term::PlainDecorator::new(slog_term::TestStdoutWriter); let drain = slog_term::CompactFormat::new(decorator).build().fuse(); @@ -152,7 +159,7 @@ impl StateMachineTester { Arc::new(MithrilStakeDistributionSignableBuilder::default()); let transaction_parser = Arc::new(DumbBlockScanner::new(vec![])); let transaction_store = Arc::new(CardanoTransactionRepository::new( - transaction_sqlite_connection, + sqlite_connection_cardano_transaction_pool, )); let transaction_importer = Arc::new(CardanoTransactionsImporter::new( transaction_parser.clone(),