Skip to content

Commit

Permalink
wip: implement database conenction pooling in Cardano transactions re…
Browse files Browse the repository at this point in the history
…pository
  • Loading branch information
jpraynaud committed Jun 12, 2024
1 parent 198d759 commit b0e9332
Show file tree
Hide file tree
Showing 10 changed files with 344 additions and 83 deletions.

Large diffs are not rendered by default.

24 changes: 23 additions & 1 deletion internal/mithril-persistence/src/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ mod query;
mod source_alias;
mod transaction;

use std::{ops::Deref, sync::Arc};

pub use condition::{GetAllCondition, WhereCondition};
pub use connection_builder::{ConnectionBuilder, ConnectionOptions};
pub use connection_extensions::ConnectionExtensions;
Expand All @@ -22,7 +24,7 @@ pub use query::Query;
pub use source_alias::SourceAlias;
pub use transaction::Transaction;

use mithril_common::StdResult;
use mithril_common::{resource_pool::Reset, StdResult};
use sqlite::ConnectionThreadSafe;

/// Type of the connection used in Mithril
Expand All @@ -36,6 +38,26 @@ pub async fn vacuum_database(connection: &SqliteConnection) -> StdResult<()> {
Ok(())
}

/// SqliteConnection wrapper
pub struct SqliteConnectionWrapper(Arc<SqliteConnection>);

impl SqliteConnectionWrapper {
/// Create a new SqliteConnectionWrapper
pub fn new(connection: Arc<SqliteConnection>) -> Self {
Self(connection)
}
}

impl Deref for SqliteConnectionWrapper {
type Target = Arc<SqliteConnection>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Reset for SqliteConnectionWrapper {}

#[cfg(test)]
mod test {
use crate::sqlite::vacuum_database;
Expand Down
9 changes: 7 additions & 2 deletions mithril-aggregator/benches/cardano_transactions_get.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use mithril_common::resource_pool::ResourcePool;
use sqlite::ConnectionThreadSafe;

use mithril_aggregator::services::TransactionStore;
use mithril_common::{entities::CardanoTransaction, test_utils::TempDir};
use mithril_persistence::database::repository::CardanoTransactionRepository;
use mithril_persistence::sqlite::ConnectionBuilder;
use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnectionWrapper};

fn cardano_tx_db_connection(db_file_name: &str) -> ConnectionThreadSafe {
let db_path =
Expand Down Expand Up @@ -47,7 +48,11 @@ async fn init_db(nb_transaction_in_db: usize) -> CardanoTransactionRepository {
let connection = Arc::new(cardano_tx_db_connection(&format!(
"cardano_tx-{nb_transaction_in_db}.db",
)));
let repository = CardanoTransactionRepository::new(connection);
let connection_pool = Arc::new(ResourcePool::new(
1,
vec![SqliteConnectionWrapper::new(connection)],
));
let repository = CardanoTransactionRepository::new(connection_pool);
repository.store_transactions(transactions).await.unwrap();

repository
Expand Down
9 changes: 7 additions & 2 deletions mithril-aggregator/benches/cardano_transactions_import.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use criterion::{criterion_group, criterion_main, Criterion};
use mithril_common::resource_pool::ResourcePool;
use sqlite::ConnectionThreadSafe;
use std::sync::Arc;

use mithril_common::{entities::CardanoTransaction, test_utils::TempDir};
use mithril_persistence::database::repository::CardanoTransactionRepository;
use mithril_persistence::sqlite::ConnectionBuilder;
use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnectionWrapper};

fn cardano_tx_db_connection() -> ConnectionThreadSafe {
let db_path =
Expand Down Expand Up @@ -45,7 +46,11 @@ fn bench_store_transactions(c: &mut Criterion) {
group.bench_function("store_transactions", |bencher| {
bencher.to_async(&runtime).iter(|| async {
let connection = Arc::new(cardano_tx_db_connection());
let repository = CardanoTransactionRepository::new(connection);
let connection_pool = Arc::new(ResourcePool::new(
1,
vec![SqliteConnectionWrapper::new(connection)],
));
let repository = CardanoTransactionRepository::new(connection_pool);
repository.store_transactions(transactions.clone()).await
});
});
Expand Down
59 changes: 50 additions & 9 deletions mithril-aggregator/src/dependency_injection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use mithril_common::{
adapters::{EraReaderAdapterBuilder, EraReaderDummyAdapter},
EraChecker, EraMarker, EraReader, EraReaderAdapter, SupportedEra,
},
resource_pool::ResourcePool,
signable_builder::{
CardanoImmutableFilesFullSignableBuilder, CardanoTransactionsSignableBuilder,
MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder,
Expand All @@ -38,7 +39,7 @@ use mithril_common::{
};
use mithril_persistence::{
database::{repository::CardanoTransactionRepository, ApplicationNodeType, SqlMigration},
sqlite::{ConnectionBuilder, ConnectionOptions, SqliteConnection},
sqlite::{ConnectionBuilder, ConnectionOptions, SqliteConnection, SqliteConnectionWrapper},
store::adapter::{MemoryAdapter, SQLiteAdapter, StoreAdapter},
};

Expand Down Expand Up @@ -96,7 +97,11 @@ pub struct DependenciesBuilder {
pub sqlite_connection: Option<Arc<SqliteConnection>>,

/// Cardano transactions SQLite database connection
pub transaction_sqlite_connection: Option<Arc<SqliteConnection>>,
pub sqlite_connection_cardano_transaction: Option<Arc<SqliteConnection>>,

/// Cardano transactions SQLite database connection pool
pub sqlite_connection_cardano_transaction_pool:
Option<Arc<ResourcePool<SqliteConnectionWrapper>>>,

/// Stake Store used by the StakeDistributionService
/// It shall be a private dependency.
Expand Down Expand Up @@ -218,7 +223,8 @@ impl DependenciesBuilder {
configuration,
signed_entity_config: None,
sqlite_connection: None,
transaction_sqlite_connection: None,
sqlite_connection_cardano_transaction: None,
sqlite_connection_cardano_transaction_pool: None,
stake_store: None,
snapshot_uploader: None,
multi_signer: None,
Expand Down Expand Up @@ -309,7 +315,7 @@ impl DependenciesBuilder {
let _ = connection.execute("pragma analysis_limit=400; pragma optimize;");
}

if let Some(connection) = &self.transaction_sqlite_connection {
if let Some(connection) = &self.sqlite_connection_cardano_transaction {
let _ = connection.execute("pragma analysis_limit=400; pragma optimize;");
}
}
Expand All @@ -333,8 +339,8 @@ impl DependenciesBuilder {
pub async fn get_sqlite_connection_cardano_transaction(
&mut self,
) -> Result<Arc<SqliteConnection>> {
if self.transaction_sqlite_connection.is_none() {
self.transaction_sqlite_connection = Some(
if self.sqlite_connection_cardano_transaction.is_none() {
self.sqlite_connection_cardano_transaction = Some(
self.build_sqlite_connection(
SQLITE_FILE_CARDANO_TRANSACTION,
mithril_persistence::database::cardano_transaction_migration::get_migrations(),
Expand All @@ -344,7 +350,36 @@ impl DependenciesBuilder {
}

Ok(self
.transaction_sqlite_connection
.sqlite_connection_cardano_transaction
.as_ref()
.cloned()
.unwrap())
}

/// Get SQLite connection pool for the cardano transactions store
pub async fn get_sqlite_connection_cardano_transaction_pool(
&mut self,
) -> Result<Arc<ResourcePool<SqliteConnectionWrapper>>> {
let connection_pool_size = 10; // TODO: to be configured
if self.sqlite_connection_cardano_transaction_pool.is_none() {
let mut connections = vec![];
for _ in 0..connection_pool_size {
let connection = self
.build_sqlite_connection(
SQLITE_FILE_CARDANO_TRANSACTION,
mithril_persistence::database::cardano_transaction_migration::get_migrations(),
)
.await?;
connections.push(SqliteConnectionWrapper::new(connection));
}
self.sqlite_connection_cardano_transaction_pool = Some(Arc::new(ResourcePool::new(
connection_pool_size,
connections,
)));
}

Ok(self
.sqlite_connection_cardano_transaction_pool
.as_ref()
.cloned()
.unwrap())
Expand Down Expand Up @@ -667,7 +702,8 @@ impl DependenciesBuilder {

async fn build_transaction_repository(&mut self) -> Result<Arc<CardanoTransactionRepository>> {
let transaction_store = CardanoTransactionRepository::new(
self.get_sqlite_connection_cardano_transaction().await?,
self.get_sqlite_connection_cardano_transaction_pool()
.await?,
);

Ok(Arc::new(transaction_store))
Expand Down Expand Up @@ -1154,7 +1190,12 @@ impl DependenciesBuilder {
config: self.configuration.clone(),
signed_entity_config: self.get_signed_entity_config()?,
sqlite_connection: self.get_sqlite_connection().await?,
sqlite_connection_transaction: self.get_sqlite_connection_cardano_transaction().await?,
sqlite_connection_cardano_transaction: self
.get_sqlite_connection_cardano_transaction()
.await?,
sqlite_connection_cardano_transaction_pool: self
.get_sqlite_connection_cardano_transaction_pool()
.await?,
stake_store: self.get_stake_store().await?,
snapshot_uploader: self.get_snapshot_uploader().await?,
multi_signer: self.get_multi_signer().await?,
Expand Down
7 changes: 6 additions & 1 deletion mithril-aggregator/src/dependency_injection/containers.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use mithril_common::resource_pool::ResourcePool;
use mithril_persistence::sqlite::SqliteConnectionWrapper;
use std::sync::Arc;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -56,7 +58,10 @@ pub struct DependencyContainer {
pub sqlite_connection: Arc<SqliteConnection>,

/// SQLite database connection for Cardano transactions
pub sqlite_connection_transaction: Arc<SqliteConnection>,
pub sqlite_connection_cardano_transaction: Arc<SqliteConnection>,

/// Cardano transactions SQLite database connection pool
pub sqlite_connection_cardano_transaction_pool: Arc<ResourcePool<SqliteConnectionWrapper>>,

/// Stake Store used by the StakeDistributionService
/// It shall be a private dependency.
Expand Down
58 changes: 48 additions & 10 deletions mithril-aggregator/src/services/cardano_transactions_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ impl TransactionsImporter for CardanoTransactionsImporter {

#[cfg(test)]
mod tests {
use mithril_common::resource_pool::ResourcePool;
use mithril_persistence::sqlite::SqliteConnectionWrapper;
use mockall::mock;

use mithril_common::cardano_block_scanner::{
Expand Down Expand Up @@ -250,7 +252,11 @@ mod tests {
#[tokio::test]
async fn if_nothing_stored_parse_and_store_all_transactions() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let connection_pool = Arc::new(ResourcePool::new(
1,
vec![SqliteConnectionWrapper::new(Arc::new(connection))],
));
let repository = Arc::new(CardanoTransactionRepository::new(connection_pool));

let blocks = vec![
ScannedBlock::new("block_hash-1", 10, 15, 11, vec!["tx_hash-1", "tx_hash-2"]),
Expand Down Expand Up @@ -280,7 +286,11 @@ mod tests {
#[tokio::test]
async fn if_nothing_stored_parse_and_store_all_block_ranges() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let connection_pool = Arc::new(ResourcePool::new(
1,
vec![SqliteConnectionWrapper::new(Arc::new(connection))],
));
let repository = Arc::new(CardanoTransactionRepository::new(connection_pool));

let blocks = build_blocks(0, BlockRange::LENGTH * 5 + 1);
let transactions = into_transactions(&blocks);
Expand Down Expand Up @@ -315,7 +325,11 @@ mod tests {
#[tokio::test]
async fn if_theres_gap_between_two_stored_block_ranges_it_can_still_compute_their_root() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let connection_pool = Arc::new(ResourcePool::new(
1,
vec![SqliteConnectionWrapper::new(Arc::new(connection))],
));
let repository = Arc::new(CardanoTransactionRepository::new(connection_pool));

// Two block ranges with a gap
let blocks: Vec<ScannedBlock> = [
Expand Down Expand Up @@ -352,7 +366,11 @@ mod tests {
#[tokio::test]
async fn if_all_block_ranges_computed_nothing_computed_and_stored() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let connection_pool = Arc::new(ResourcePool::new(
1,
vec![SqliteConnectionWrapper::new(Arc::new(connection))],
));
let repository = Arc::new(CardanoTransactionRepository::new(connection_pool));

let importer = CardanoTransactionsImporter::new_for_test(
Arc::new(MockBlockScannerImpl::new()),
Expand All @@ -375,7 +393,11 @@ mod tests {
async fn if_all_transactions_stored_nothing_is_parsed_and_stored() {
let up_to_block_number = 12;
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let connection_pool = Arc::new(ResourcePool::new(
1,
vec![SqliteConnectionWrapper::new(Arc::new(connection))],
));
let repository = Arc::new(CardanoTransactionRepository::new(connection_pool));
let scanner = DumbBlockScanner::new(vec![
ScannedBlock::new("block_hash-1", 10, 15, 10, vec!["tx_hash-1", "tx_hash-2"]),
ScannedBlock::new("block_hash-2", 20, 25, 11, vec!["tx_hash-3", "tx_hash-4"]),
Expand All @@ -402,7 +424,11 @@ mod tests {
#[tokio::test]
async fn if_half_transactions_are_already_stored_the_other_half_is_parsed_and_stored() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let connection_pool = Arc::new(ResourcePool::new(
1,
vec![SqliteConnectionWrapper::new(Arc::new(connection))],
));
let repository = Arc::new(CardanoTransactionRepository::new(connection_pool));

let highest_stored_chain_point = ChainPoint::new(134, 10, "block_hash-1");
let stored_block = ScannedBlock::new(
Expand Down Expand Up @@ -457,7 +483,11 @@ mod tests {
#[tokio::test]
async fn if_half_block_ranges_are_stored_the_other_half_is_computed_and_stored() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let connection_pool = Arc::new(ResourcePool::new(
1,
vec![SqliteConnectionWrapper::new(Arc::new(connection))],
));
let repository = Arc::new(CardanoTransactionRepository::new(connection_pool));

let blocks = build_blocks(0, BlockRange::LENGTH * 4 + 1);
let transactions = into_transactions(&blocks);
Expand Down Expand Up @@ -565,7 +595,11 @@ mod tests {
#[tokio::test]
async fn compute_block_range_merkle_root() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let connection_pool = Arc::new(ResourcePool::new(
1,
vec![SqliteConnectionWrapper::new(Arc::new(connection))],
));
let repository = Arc::new(CardanoTransactionRepository::new(connection_pool));

// 2 block ranges worth of blocks with one more block that should be ignored for merkle root computation
let blocks = build_blocks(0, BlockRange::LENGTH * 2 + 1);
Expand Down Expand Up @@ -617,10 +651,14 @@ mod tests {

let (importer, repository) = {
let connection = Arc::new(cardano_tx_db_connection().unwrap());
let repository = Arc::new(CardanoTransactionRepository::new(connection.clone()));
let connection_pool = Arc::new(ResourcePool::new(
1,
vec![SqliteConnectionWrapper::new(connection)],
));
let repository = Arc::new(CardanoTransactionRepository::new(connection_pool.clone()));
let importer = CardanoTransactionsImporter::new_for_test(
Arc::new(DumbBlockScanner::new(blocks.clone())),
Arc::new(CardanoTransactionRepository::new(connection.clone())),
Arc::new(CardanoTransactionRepository::new(connection_pool.clone())),
);
(importer, repository)
};
Expand Down
Loading

0 comments on commit b0e9332

Please sign in to comment.