diff --git a/Cargo.lock b/Cargo.lock index c88041476c..e900d3a088 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5580,7 +5580,6 @@ dependencies = [ "petgraph 0.5.1", "pin-project 0.4.30", "prost 0.9.0", - "prost-types 0.9.0", "rand", "serde", "tari_common", diff --git a/applications/minotari_app_grpc/proto/network.proto b/applications/minotari_app_grpc/proto/network.proto index 5a0d6c603d..62f6c57cf8 100644 --- a/applications/minotari_app_grpc/proto/network.proto +++ b/applications/minotari_app_grpc/proto/network.proto @@ -39,12 +39,12 @@ message Peer { /// Peer's addresses repeated Address addresses = 3; /// Last connection attempt to peer - google.protobuf.Timestamp last_connection = 4; + uint64 last_connection = 4; /// Flags for the peer. uint32 flags = 5; - google.protobuf.Timestamp banned_until= 6; + uint64 banned_until= 6; string banned_reason= 7; - google.protobuf.Timestamp offline_at = 8; + uint64 offline_at = 8; /// Features supported by the peer uint32 features = 9; /// used as information for more efficient protocol negotiation. diff --git a/applications/minotari_app_grpc/proto/wallet.proto b/applications/minotari_app_grpc/proto/wallet.proto index 2df811fee6..d6919f55d7 100644 --- a/applications/minotari_app_grpc/proto/wallet.proto +++ b/applications/minotari_app_grpc/proto/wallet.proto @@ -188,7 +188,7 @@ message TransactionInfo { uint64 fee = 7; bool is_cancelled = 8; bytes excess_sig = 9; - google.protobuf.Timestamp timestamp = 10; + uint64 timestamp = 10; string message = 11; } diff --git a/applications/minotari_app_grpc/src/conversions/peer.rs b/applications/minotari_app_grpc/src/conversions/peer.rs index 8948fe6dde..5329e32e0f 100644 --- a/applications/minotari_app_grpc/src/conversions/peer.rs +++ b/applications/minotari_app_grpc/src/conversions/peer.rs @@ -23,7 +23,7 @@ use tari_comms::{connectivity::ConnectivityStatus, net_address::MultiaddrWithStats, peer_manager::Peer}; use tari_utilities::ByteArray; -use crate::{conversions::naive_datetime_to_timestamp, tari_rpc as grpc}; +use crate::tari_rpc as grpc; #[allow(clippy::cast_possible_truncation)] #[allow(clippy::cast_sign_loss)] @@ -32,14 +32,18 @@ impl From for grpc::Peer { let public_key = peer.public_key.to_vec(); let node_id = peer.node_id.to_vec(); let mut addresses = Vec::with_capacity(peer.addresses.len()); - let last_connection = peer.addresses.last_seen().map(naive_datetime_to_timestamp); + let last_connection = peer + .addresses + .last_seen() + .map(|f| f.timestamp() as u64) + .unwrap_or_default(); for address in peer.addresses.addresses() { addresses.push(address.clone().into()) } let flags = u32::from(peer.flags.bits()); - let banned_until = peer.banned_until.map(naive_datetime_to_timestamp); + let banned_until = peer.banned_until.map(|f| f.timestamp() as u64).unwrap_or_default(); let banned_reason = peer.banned_reason.to_string(); - let offline_at = peer.offline_at().map(naive_datetime_to_timestamp); + let offline_at = peer.offline_at().map(|f| f.timestamp() as u64).unwrap_or_default(); let features = peer.features.bits(); let supported_protocols = peer.supported_protocols.into_iter().map(|p| p.to_vec()).collect(); diff --git a/applications/minotari_console_wallet/src/automation/commands.rs b/applications/minotari_console_wallet/src/automation/commands.rs index ed20171dde..3647acdc8d 100644 --- a/applications/minotari_console_wallet/src/automation/commands.rs +++ b/applications/minotari_console_wallet/src/automation/commands.rs @@ -1117,7 +1117,7 @@ async fn get_tip_height(wallet: &WalletSqlite) -> Option { .await .ok() .and_then(|t| t.metadata) - .and_then(|m| m.height_of_longest_chain), + .map(|m| m.height_of_longest_chain), None => None, } } diff --git a/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs b/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs index e161327db4..010c700775 100644 --- a/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs +++ b/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs @@ -28,59 +28,56 @@ use futures::{ SinkExt, }; use log::*; -use minotari_app_grpc::{ - conversions::naive_datetime_to_timestamp, - tari_rpc::{ - self, - payment_recipient::PaymentType, - wallet_server, - CheckConnectivityResponse, - ClaimHtlcRefundRequest, - ClaimHtlcRefundResponse, - ClaimShaAtomicSwapRequest, - ClaimShaAtomicSwapResponse, - CoinSplitRequest, - CoinSplitResponse, - CommitmentSignature, - CreateBurnTransactionRequest, - CreateBurnTransactionResponse, - CreateTemplateRegistrationRequest, - CreateTemplateRegistrationResponse, - GetAddressResponse, - GetBalanceRequest, - GetBalanceResponse, - GetCoinbaseRequest, - GetCoinbaseResponse, - GetCompletedTransactionsRequest, - GetCompletedTransactionsResponse, - GetConnectivityRequest, - GetIdentityRequest, - GetIdentityResponse, - GetTransactionInfoRequest, - GetTransactionInfoResponse, - GetUnspentAmountsResponse, - GetVersionRequest, - GetVersionResponse, - ImportUtxosRequest, - ImportUtxosResponse, - RegisterValidatorNodeRequest, - RegisterValidatorNodeResponse, - RevalidateRequest, - RevalidateResponse, - SendShaAtomicSwapRequest, - SendShaAtomicSwapResponse, - SetBaseNodeRequest, - SetBaseNodeResponse, - TransactionDirection, - TransactionEvent, - TransactionEventRequest, - TransactionEventResponse, - TransactionInfo, - TransactionStatus, - TransferRequest, - TransferResponse, - TransferResult, - }, +use minotari_app_grpc::tari_rpc::{ + self, + payment_recipient::PaymentType, + wallet_server, + CheckConnectivityResponse, + ClaimHtlcRefundRequest, + ClaimHtlcRefundResponse, + ClaimShaAtomicSwapRequest, + ClaimShaAtomicSwapResponse, + CoinSplitRequest, + CoinSplitResponse, + CommitmentSignature, + CreateBurnTransactionRequest, + CreateBurnTransactionResponse, + CreateTemplateRegistrationRequest, + CreateTemplateRegistrationResponse, + GetAddressResponse, + GetBalanceRequest, + GetBalanceResponse, + GetCoinbaseRequest, + GetCoinbaseResponse, + GetCompletedTransactionsRequest, + GetCompletedTransactionsResponse, + GetConnectivityRequest, + GetIdentityRequest, + GetIdentityResponse, + GetTransactionInfoRequest, + GetTransactionInfoResponse, + GetUnspentAmountsResponse, + GetVersionRequest, + GetVersionResponse, + ImportUtxosRequest, + ImportUtxosResponse, + RegisterValidatorNodeRequest, + RegisterValidatorNodeResponse, + RevalidateRequest, + RevalidateResponse, + SendShaAtomicSwapRequest, + SendShaAtomicSwapResponse, + SetBaseNodeRequest, + SetBaseNodeResponse, + TransactionDirection, + TransactionEvent, + TransactionEventRequest, + TransactionEventResponse, + TransactionInfo, + TransactionStatus, + TransferRequest, + TransferResponse, + TransferResult, }; use minotari_wallet::{ connectivity_service::{OnlineStatus, WalletConnectivityInterface}, @@ -777,7 +774,7 @@ impl wallet_server::Wallet for WalletGrpcServer { is_cancelled: txn.cancelled.is_some(), direction: TransactionDirection::from(txn.direction) as i32, fee: txn.fee.into(), - timestamp: Some(naive_datetime_to_timestamp(txn.timestamp)), + timestamp: txn.timestamp.timestamp() as u64, excess_sig: txn .transaction .first_kernel_excess_sig() @@ -1113,7 +1110,7 @@ fn convert_wallet_transaction_into_transaction_info( direction: TransactionDirection::Inbound as i32, fee: 0, excess_sig: Default::default(), - timestamp: Some(naive_datetime_to_timestamp(tx.timestamp)), + timestamp: tx.timestamp.timestamp() as u64, message: tx.message, }, PendingOutbound(tx) => TransactionInfo { @@ -1126,7 +1123,7 @@ fn convert_wallet_transaction_into_transaction_info( direction: TransactionDirection::Outbound as i32, fee: tx.fee.into(), excess_sig: Default::default(), - timestamp: Some(naive_datetime_to_timestamp(tx.timestamp)), + timestamp: tx.timestamp.timestamp() as u64, message: tx.message, }, Completed(tx) => TransactionInfo { @@ -1138,7 +1135,7 @@ fn convert_wallet_transaction_into_transaction_info( is_cancelled: tx.cancelled.is_some(), direction: TransactionDirection::from(tx.direction) as i32, fee: tx.fee.into(), - timestamp: Some(naive_datetime_to_timestamp(tx.timestamp)), + timestamp: tx.timestamp.timestamp() as u64, excess_sig: tx .transaction .first_kernel_excess_sig() diff --git a/base_layer/core/src/base_node/chain_metadata_service/service.rs b/base_layer/core/src/base_node/chain_metadata_service/service.rs index 7cefa67a55..fbd89604e9 100644 --- a/base_layer/core/src/base_node/chain_metadata_service/service.rs +++ b/base_layer/core/src/base_node/chain_metadata_service/service.rs @@ -234,6 +234,7 @@ mod test { }; use tari_service_framework::reply_channel; use tari_test_utils::unpack_enum; + use tari_utilities::epoch_time::EpochTime; use tokio::{sync::broadcast, task}; use super::*; @@ -254,14 +255,14 @@ mod test { fn create_sample_proto_chain_metadata() -> proto::ChainMetadata { let diff: u128 = 1; proto::ChainMetadata { - height_of_longest_chain: Some(1), + height_of_longest_chain: 1, best_block: vec![ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, ], pruned_height: 0, accumulated_difficulty: diff.to_be_bytes().to_vec(), - timestamp: Some(0), + timestamp: EpochTime::now().as_u64(), } } @@ -290,7 +291,7 @@ mod test { let (mut service, liveness_mock_state, mut base_node_receiver, _) = setup(); let mut proto_chain_metadata = create_sample_proto_chain_metadata(); - proto_chain_metadata.height_of_longest_chain = Some(123); + proto_chain_metadata.height_of_longest_chain = 123; let chain_metadata = proto_chain_metadata.clone().try_into().unwrap(); task::spawn(async move { @@ -309,7 +310,7 @@ mod test { unpack_enum!(LivenessRequest::SetMetadataEntry(metadata_key, data) = last_call); assert_eq!(metadata_key, MetadataKey::ChainMetadata); let chain_metadata = proto::ChainMetadata::decode(data.as_slice()).unwrap(); - assert_eq!(chain_metadata.height_of_longest_chain, Some(123)); + assert_eq!(chain_metadata.height_of_longest_chain, 123); } #[tokio::test] async fn handle_liveness_event_ok() { @@ -332,7 +333,7 @@ mod test { assert_eq!(*metadata.node_id(), node_id); assert_eq!( metadata.claimed_chain_metadata().height_of_longest_chain(), - proto_chain_metadata.height_of_longest_chain.unwrap() + proto_chain_metadata.height_of_longest_chain ); } diff --git a/base_layer/core/src/base_node/proto/chain_metadata.proto b/base_layer/core/src/base_node/proto/chain_metadata.proto index 10f9b5a82f..8f77ffac0c 100644 --- a/base_layer/core/src/base_node/proto/chain_metadata.proto +++ b/base_layer/core/src/base_node/proto/chain_metadata.proto @@ -9,7 +9,7 @@ package tari.base_node; message ChainMetadata { // The current chain height, or the block number of the longest valid chain, or `None` if there is no chain - google.protobuf.UInt64Value height_of_longest_chain = 1; + uint64 height_of_longest_chain = 1; // The block hash of the current tip of the longest valid chain, or `None` for an empty chain bytes best_block = 2; // The current geometric mean of the pow of the chain tip, or `None` if there is no chain @@ -20,5 +20,5 @@ message ChainMetadata { // Archival nodes wil always have an `pruned_height` of zero. uint64 pruned_height = 6; // Timestamp of the last block in the chain, or `None` if there is no chain - google.protobuf.UInt64Value timestamp = 7; + uint64 timestamp = 7; } diff --git a/base_layer/core/src/base_node/proto/chain_metadata.rs b/base_layer/core/src/base_node/proto/chain_metadata.rs index 3e56b4de16..33a19dcee7 100644 --- a/base_layer/core/src/base_node/proto/chain_metadata.rs +++ b/base_layer/core/src/base_node/proto/chain_metadata.rs @@ -45,9 +45,8 @@ impl TryFrom for ChainMetadata { let mut acc_diff = [0; ACC_DIFFICULTY_ARRAY_LEN]; acc_diff.copy_from_slice(&metadata.accumulated_difficulty[0..ACC_DIFFICULTY_ARRAY_LEN]); let accumulated_difficulty = u128::from_be_bytes(acc_diff); - let height_of_longest_chain = metadata - .height_of_longest_chain - .ok_or_else(|| "Height of longest chain is missing".to_string())?; + let height_of_longest_chain = metadata.height_of_longest_chain; + let pruning_horizon = if metadata.pruned_height == 0 { metadata.pruned_height } else { @@ -67,7 +66,7 @@ impl TryFrom for ChainMetadata { pruning_horizon, metadata.pruned_height, accumulated_difficulty, - metadata.timestamp.unwrap_or_default(), + metadata.timestamp, )) } } @@ -76,17 +75,17 @@ impl From for proto::ChainMetadata { fn from(metadata: ChainMetadata) -> Self { let accumulated_difficulty = metadata.accumulated_difficulty().to_be_bytes().to_vec(); Self { - height_of_longest_chain: Some(metadata.height_of_longest_chain()), + height_of_longest_chain: metadata.height_of_longest_chain(), best_block: metadata.best_block().to_vec(), pruned_height: metadata.pruned_height(), accumulated_difficulty, - timestamp: Some(metadata.timestamp()), + timestamp: metadata.timestamp(), } } } impl proto::ChainMetadata { pub fn height_of_longest_chain(&self) -> u64 { - self.height_of_longest_chain.unwrap_or(0) + self.height_of_longest_chain } } diff --git a/base_layer/core/src/base_node/proto/wallet_rpc.proto b/base_layer/core/src/base_node/proto/wallet_rpc.proto index e0792847fa..26ae9b6a19 100644 --- a/base_layer/core/src/base_node/proto/wallet_rpc.proto +++ b/base_layer/core/src/base_node/proto/wallet_rpc.proto @@ -39,7 +39,7 @@ message TxQueryResponse { uint64 confirmations = 3; bool is_synced = 4; uint64 height_of_longest_chain = 5; - google.protobuf.UInt64Value mined_timestamp = 6; + uint64 mined_timestamp = 6; } message TxQueryBatchResponse { @@ -48,7 +48,7 @@ message TxQueryBatchResponse { bytes block_hash = 3; uint64 confirmations = 4; uint64 block_height = 5; - google.protobuf.UInt64Value mined_timestamp = 6; + uint64 mined_timestamp = 6; } message TxQueryBatchResponses { @@ -56,7 +56,7 @@ message TxQueryBatchResponses { bool is_synced = 2; bytes tip_hash = 3; uint64 height_of_longest_chain = 4; - google.protobuf.UInt64Value tip_mined_timestamp = 5; + uint64 tip_mined_timestamp = 5; } message FetchMatchingUtxos { diff --git a/base_layer/core/src/base_node/proto/wallet_rpc.rs b/base_layer/core/src/base_node/proto/wallet_rpc.rs index 5af9ac4848..c68df96ee2 100644 --- a/base_layer/core/src/base_node/proto/wallet_rpc.rs +++ b/base_layer/core/src/base_node/proto/wallet_rpc.rs @@ -202,6 +202,12 @@ impl TryFrom for TxQueryResponse { }, }) }; + + let mined_timestamp = match proto_response.mined_timestamp { + 0 => None, + t => Some(t), + }; + Ok(Self { location: TxLocation::try_from( proto::TxLocation::from_i32(proto_response.location) @@ -211,7 +217,7 @@ impl TryFrom for TxQueryResponse { confirmations: proto_response.confirmations, is_synced: proto_response.is_synced, height_of_longest_chain: proto_response.height_of_longest_chain, - mined_timestamp: proto_response.mined_timestamp, + mined_timestamp, }) } } @@ -224,7 +230,7 @@ impl From for proto::TxQueryResponse { confirmations: response.confirmations, is_synced: response.is_synced, height_of_longest_chain: response.height_of_longest_chain, - mined_timestamp: response.mined_timestamp, + mined_timestamp: response.mined_timestamp.unwrap_or_default(), } } } @@ -243,6 +249,10 @@ impl TryFrom for TxQueryBatchResponse { }, }) }; + let mined_timestamp = match proto_response.mined_timestamp { + 0 => None, + t => Some(t), + }; Ok(Self { signature: Signature::try_from( proto_response @@ -256,7 +266,7 @@ impl TryFrom for TxQueryBatchResponse { block_hash: hash, block_height: proto_response.block_height, confirmations: proto_response.confirmations, - mined_timestamp: proto_response.mined_timestamp, + mined_timestamp, }) } } diff --git a/base_layer/core/src/base_node/rpc/service.rs b/base_layer/core/src/base_node/rpc/service.rs index 3172e21e5b..47bdf23b14 100644 --- a/base_layer/core/src/base_node/rpc/service.rs +++ b/base_layer/core/src/base_node/rpc/service.rs @@ -129,7 +129,7 @@ impl BaseNodeWalletRpcService { confirmations, is_synced, height_of_longest_chain: chain_metadata.height_of_longest_chain(), - mined_timestamp: Some(header.timestamp.as_u64()), + mined_timestamp: header.timestamp.as_u64(), }; return Ok(response); }, @@ -150,7 +150,7 @@ impl BaseNodeWalletRpcService { confirmations: 0, is_synced, height_of_longest_chain: chain_metadata.height_of_longest_chain(), - mined_timestamp: None, + mined_timestamp: 0, }, TxStorageResponse::ReorgPool | TxStorageResponse::NotStoredOrphan | @@ -165,7 +165,7 @@ impl BaseNodeWalletRpcService { confirmations: 0, is_synced, height_of_longest_chain: chain_metadata.height_of_longest_chain(), - mined_timestamp: None, + mined_timestamp: 0, }, }; Ok(mempool_response) @@ -320,7 +320,7 @@ impl BaseNodeWalletService for BaseNodeWalletRpc is_synced, tip_hash: metadata.best_block().to_vec(), height_of_longest_chain: metadata.height_of_longest_chain(), - tip_mined_timestamp: Some(metadata.timestamp()), + tip_mined_timestamp: metadata.timestamp(), })) } diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs index ace948adb5..3b8f7834b9 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs @@ -350,9 +350,7 @@ where Some(( batch_response.height_of_longest_chain, tip, - batch_response.tip_mined_timestamp.ok_or_else(|| { - TransactionServiceError::ProtobufConversionError("Missing `tip_hash` field".to_string()) - })?, + batch_response.tip_mined_timestamp, )), )) } diff --git a/base_layer/wallet/tests/support/comms_rpc.rs b/base_layer/wallet/tests/support/comms_rpc.rs index 82ea00410c..56eae2e17e 100644 --- a/base_layer/wallet/tests/support/comms_rpc.rs +++ b/base_layer/wallet/tests/support/comms_rpc.rs @@ -70,6 +70,7 @@ use tari_core::{ }, transactions::transaction_components::{Transaction, TransactionOutput}, }; +use tari_utilities::epoch_time::EpochTime; use tokio::{sync::mpsc, time::sleep}; pub async fn connect_rpc_client(connection: &mut PeerConnection) -> T @@ -144,15 +145,15 @@ impl BaseNodeWalletRpcMockState { tip_hash: FixedHash::zero().to_vec(), is_synced: true, height_of_longest_chain: 0, - tip_mined_timestamp: Some(0), + tip_mined_timestamp: EpochTime::now().as_u64(), })), tip_info_response: Arc::new(Mutex::new(TipInfoResponse { metadata: Some(ChainMetadataProto { - height_of_longest_chain: Some(std::i64::MAX as u64), + height_of_longest_chain: std::i64::MAX as u64, best_block: FixedHash::zero().to_vec(), accumulated_difficulty: Vec::new(), pruned_height: 0, - timestamp: Some(0), + timestamp: EpochTime::now().as_u64(), }), is_synced: true, })), @@ -879,6 +880,7 @@ mod test { proto::base_node::{ChainMetadata, TipInfoResponse}, transactions::transaction_components::Transaction, }; + use tari_utilities::epoch_time::EpochTime; use tokio::time::Duration; use crate::support::comms_rpc::BaseNodeWalletRpcMockService; @@ -931,11 +933,11 @@ mod test { assert_eq!(calls.len(), 1); let chain_metadata = ChainMetadata { - height_of_longest_chain: Some(444), + height_of_longest_chain: 444, best_block: vec![], accumulated_difficulty: vec![], pruned_height: 0, - timestamp: Some(0), + timestamp: EpochTime::now().as_u64(), }; service_state.set_tip_info_response(TipInfoResponse { metadata: Some(chain_metadata), diff --git a/base_layer/wallet/tests/transaction_service_tests/service.rs b/base_layer/wallet/tests/transaction_service_tests/service.rs index b2af430060..3e64b78529 100644 --- a/base_layer/wallet/tests/transaction_service_tests/service.rs +++ b/base_layer/wallet/tests/transaction_service_tests/service.rs @@ -159,7 +159,7 @@ use tari_script::{inputs, one_sided_payment_script, script, ExecutionStack}; use tari_service_framework::{reply_channel, RegisterHandle, StackBuilder}; use tari_shutdown::{Shutdown, ShutdownSignal}; use tari_test_utils::{comms_and_services::get_next_memory_address, random}; -use tari_utilities::{ByteArray, SafePassword}; +use tari_utilities::{epoch_time::EpochTime, ByteArray, SafePassword}; use tempfile::tempdir; use tokio::{ sync::{broadcast, broadcast::channel}, @@ -3762,6 +3762,7 @@ async fn test_coinbase_generation_and_monitoring() { let tx1 = db.get_completed_transaction(tx_id1).unwrap(); let tx2b = db.get_completed_transaction(tx_id2b).unwrap(); + let timestamp = EpochTime::now().as_u64(); let mut block_headers = HashMap::new(); for i in 0..=4 { let mut block_header = BlockHeader::new(1); @@ -3780,7 +3781,7 @@ async fn test_coinbase_generation_and_monitoring() { block_hash: vec![], confirmations: 0, block_height: 0, - mined_timestamp: None, + mined_timestamp: 0, }, TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -3790,7 +3791,7 @@ async fn test_coinbase_generation_and_monitoring() { block_hash: block_headers.get(&1).unwrap().hash().to_vec(), confirmations: 0, block_height: 1, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }, ]; let batch_query_response = TxQueryBatchResponsesProto { @@ -3798,7 +3799,7 @@ async fn test_coinbase_generation_and_monitoring() { is_synced: true, tip_hash: block_headers.get(&1).unwrap().hash().to_vec(), height_of_longest_chain: 1, - tip_mined_timestamp: Some(0), + tip_mined_timestamp: timestamp, }; alice_ts_interface @@ -3841,7 +3842,7 @@ async fn test_coinbase_generation_and_monitoring() { block_hash: block_headers.get(&4).unwrap().hash().to_vec(), confirmations: 3, block_height: 4, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }); let batch_query_response = TxQueryBatchResponsesProto { @@ -3849,7 +3850,7 @@ async fn test_coinbase_generation_and_monitoring() { is_synced: true, tip_hash: block_headers.get(&4).unwrap().hash().to_vec(), height_of_longest_chain: 4, - tip_mined_timestamp: Some(0), + tip_mined_timestamp: timestamp, }; alice_ts_interface .base_node_rpc_mock_state @@ -3918,13 +3919,15 @@ async fn test_coinbase_abandoned() { MicroMinotari::from(0) ); + let timestamp = EpochTime::now().as_u64(); + let transaction_query_batch_responses = vec![TxQueryBatchResponseProto { signature: Some(SignatureProto::from(tx1.first_kernel_excess_sig().unwrap().clone())), location: TxLocationProto::from(TxLocation::InMempool) as i32, block_hash: vec![], confirmations: 0, block_height: 0, - mined_timestamp: None, + mined_timestamp: 0, }]; let batch_query_response = TxQueryBatchResponsesProto { @@ -3932,7 +3935,7 @@ async fn test_coinbase_abandoned() { is_synced: true, tip_hash: [5u8; 32].to_vec(), height_of_longest_chain: block_height_a + TransactionServiceConfig::default().num_confirmations_required + 1, - tip_mined_timestamp: Some(0), + tip_mined_timestamp: timestamp, }; alice_ts_interface @@ -4049,7 +4052,7 @@ async fn test_coinbase_abandoned() { block_hash: vec![], confirmations: 0, block_height: 0, - mined_timestamp: None, + mined_timestamp: 0, }, TxQueryBatchResponseProto { signature: Some(SignatureProto::from(tx2.first_kernel_excess_sig().unwrap().clone())), @@ -4057,7 +4060,7 @@ async fn test_coinbase_abandoned() { block_hash: [11u8; 32].to_vec(), confirmations: 2, block_height: block_height_b, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }, ]; @@ -4066,7 +4069,7 @@ async fn test_coinbase_abandoned() { is_synced: true, tip_hash: [13u8; 32].to_vec(), height_of_longest_chain: block_height_b + 2, - tip_mined_timestamp: Some(0), + tip_mined_timestamp: timestamp, }; alice_ts_interface @@ -4135,7 +4138,7 @@ async fn test_coinbase_abandoned() { block_hash: vec![], confirmations: 0, block_height: 0, - mined_timestamp: None, + mined_timestamp: 0, }, TxQueryBatchResponseProto { signature: Some(SignatureProto::from(tx2.first_kernel_excess_sig().unwrap().clone())), @@ -4143,7 +4146,7 @@ async fn test_coinbase_abandoned() { block_hash: vec![], confirmations: 0, block_height: 0, - mined_timestamp: None, + mined_timestamp: 0, }, ]; @@ -4152,7 +4155,7 @@ async fn test_coinbase_abandoned() { is_synced: true, tip_hash: [12u8; 32].to_vec(), height_of_longest_chain: block_height_b + TransactionServiceConfig::default().num_confirmations_required + 1, - tip_mined_timestamp: Some(0), + tip_mined_timestamp: timestamp, }; alice_ts_interface @@ -4252,7 +4255,7 @@ async fn test_coinbase_abandoned() { block_hash: vec![], confirmations: 0, block_height: 0, - mined_timestamp: None, + mined_timestamp: 0, }, TxQueryBatchResponseProto { signature: Some(SignatureProto::from(tx2.first_kernel_excess_sig().unwrap().clone())), @@ -4260,7 +4263,7 @@ async fn test_coinbase_abandoned() { block_hash: block_headers.get(&10).unwrap().hash().to_vec(), confirmations: 5, block_height: 10, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }, ]; @@ -4269,7 +4272,7 @@ async fn test_coinbase_abandoned() { is_synced: true, tip_hash: [20u8; 32].to_vec(), height_of_longest_chain: 20, - tip_mined_timestamp: Some(0), + tip_mined_timestamp: timestamp, }; alice_ts_interface diff --git a/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs b/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs index db32643f06..9bad075823 100644 --- a/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs +++ b/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs @@ -88,6 +88,7 @@ use tari_core::{ use tari_service_framework::{reply_channel, reply_channel::Receiver}; use tari_shutdown::Shutdown; use tari_test_utils::random; +use tari_utilities::epoch_time::EpochTime; use tempfile::{tempdir, TempDir}; use tokio::{sync::broadcast, task, time::sleep}; @@ -780,6 +781,7 @@ async fn tx_validation_protocol_tx_becomes_mined_unconfirmed_then_confirmed() { let tx2 = resources.db.get_completed_transaction(2u64.into()).unwrap(); + let timestamp = EpochTime::now().as_u64(); let transaction_query_batch_responses = vec![TxQueryBatchResponseProto { signature: Some(SignatureProto::from( tx2.transaction.first_kernel_excess_sig().unwrap().clone(), @@ -788,7 +790,7 @@ async fn tx_validation_protocol_tx_becomes_mined_unconfirmed_then_confirmed() { block_hash: [1u8; 32].to_vec(), confirmations: 0, block_height: 1, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }]; let mut batch_query_response = TxQueryBatchResponsesProto { @@ -796,7 +798,7 @@ async fn tx_validation_protocol_tx_becomes_mined_unconfirmed_then_confirmed() { is_synced: true, tip_hash: [1u8; 32].to_vec(), height_of_longest_chain: 1, - tip_mined_timestamp: Some(0), + tip_mined_timestamp: timestamp, }; rpc_service_state.set_transaction_query_batch_responses(batch_query_response.clone()); @@ -862,7 +864,7 @@ async fn tx_validation_protocol_tx_becomes_mined_unconfirmed_then_confirmed() { block_hash: [5u8; 32].to_vec(), confirmations: 4, block_height: 5, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }]; let batch_query_response = TxQueryBatchResponsesProto { @@ -870,7 +872,7 @@ async fn tx_validation_protocol_tx_becomes_mined_unconfirmed_then_confirmed() { is_synced: true, tip_hash: [5u8; 32].to_vec(), height_of_longest_chain: 5, - tip_mined_timestamp: Some(0), + tip_mined_timestamp: timestamp, }; rpc_service_state.set_transaction_query_batch_responses(batch_query_response.clone()); @@ -936,6 +938,7 @@ async fn tx_revalidation() { let tx2 = resources.db.get_completed_transaction(2u64.into()).unwrap(); + let timestamp = EpochTime::now().as_u64(); // set tx2 as fully mined let transaction_query_batch_responses = vec![TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -945,7 +948,7 @@ async fn tx_revalidation() { block_hash: [5u8; 32].to_vec(), confirmations: 4, block_height: 5, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }]; let batch_query_response = TxQueryBatchResponsesProto { @@ -953,7 +956,7 @@ async fn tx_revalidation() { is_synced: true, tip_hash: [5u8; 32].to_vec(), height_of_longest_chain: 5, - tip_mined_timestamp: Some(0), + tip_mined_timestamp: timestamp, }; rpc_service_state.set_transaction_query_batch_responses(batch_query_response.clone()); @@ -987,7 +990,7 @@ async fn tx_revalidation() { block_hash: [5u8; 32].to_vec(), confirmations: 8, block_height: 10, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }]; let batch_query_response = TxQueryBatchResponsesProto { @@ -995,7 +998,7 @@ async fn tx_revalidation() { is_synced: true, tip_hash: [5u8; 32].to_vec(), height_of_longest_chain: 10, - tip_mined_timestamp: Some(0), + tip_mined_timestamp: timestamp, }; rpc_service_state.set_transaction_query_batch_responses(batch_query_response.clone()); @@ -1098,6 +1101,7 @@ async fn tx_validation_protocol_reorg() { let coinbase_tx1 = resources.db.get_completed_transaction(6u64.into()).unwrap(); let coinbase_tx2 = resources.db.get_completed_transaction(7u64.into()).unwrap(); + let timestamp = EpochTime::now().as_u64(); let transaction_query_batch_responses = vec![ TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -1107,7 +1111,7 @@ async fn tx_validation_protocol_reorg() { block_hash: block_headers.get(&5).unwrap().hash().to_vec(), confirmations: 5, block_height: 5, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }, TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -1117,7 +1121,7 @@ async fn tx_validation_protocol_reorg() { block_hash: block_headers.get(&6).unwrap().hash().to_vec(), confirmations: 4, block_height: 6, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }, TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -1127,7 +1131,7 @@ async fn tx_validation_protocol_reorg() { block_hash: block_headers.get(&7).unwrap().hash().to_vec(), confirmations: 3, block_height: 7, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }, TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -1137,7 +1141,7 @@ async fn tx_validation_protocol_reorg() { block_hash: block_headers.get(&8).unwrap().hash().to_vec(), confirmations: 2, block_height: 8, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }, TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -1147,7 +1151,7 @@ async fn tx_validation_protocol_reorg() { block_hash: block_headers.get(&8).unwrap().hash().to_vec(), confirmations: 2, block_height: 8, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }, TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -1157,7 +1161,7 @@ async fn tx_validation_protocol_reorg() { block_hash: block_headers.get(&9).unwrap().hash().to_vec(), confirmations: 1, block_height: 9, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }, TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -1167,7 +1171,7 @@ async fn tx_validation_protocol_reorg() { block_hash: block_headers.get(&9).unwrap().hash().to_vec(), confirmations: 1, block_height: 9, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }, ]; @@ -1176,7 +1180,7 @@ async fn tx_validation_protocol_reorg() { is_synced: true, tip_hash: block_headers.get(&10).unwrap().hash().to_vec(), height_of_longest_chain: 10, - tip_mined_timestamp: Some(0), + tip_mined_timestamp: timestamp, }; rpc_service_state.set_transaction_query_batch_responses(batch_query_response.clone()); @@ -1228,7 +1232,7 @@ async fn tx_validation_protocol_reorg() { block_hash: block_headers.get(&5).unwrap().hash().to_vec(), confirmations: 4, block_height: 5, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }, TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -1238,7 +1242,7 @@ async fn tx_validation_protocol_reorg() { block_hash: block_headers.get(&6).unwrap().hash().to_vec(), confirmations: 3, block_height: 6, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }, TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -1248,7 +1252,7 @@ async fn tx_validation_protocol_reorg() { block_hash: block_headers.get(&7).unwrap().hash().to_vec(), confirmations: 2, block_height: 7, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }, TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -1258,7 +1262,7 @@ async fn tx_validation_protocol_reorg() { block_hash: vec![], confirmations: 0, block_height: 0, - mined_timestamp: None, + mined_timestamp: 0, }, TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -1268,7 +1272,7 @@ async fn tx_validation_protocol_reorg() { block_hash: block_headers.get(&8).unwrap().hash().to_vec(), confirmations: 1, block_height: 8, - mined_timestamp: Some(0), + mined_timestamp: timestamp, }, TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -1278,7 +1282,7 @@ async fn tx_validation_protocol_reorg() { block_hash: vec![], confirmations: 0, block_height: 0, - mined_timestamp: None, + mined_timestamp: 0, }, ]; @@ -1287,7 +1291,7 @@ async fn tx_validation_protocol_reorg() { is_synced: true, tip_hash: block_headers.get(&8).unwrap().hash().to_vec(), height_of_longest_chain: 8, - tip_mined_timestamp: Some(0), + tip_mined_timestamp: timestamp, }; rpc_service_state.set_transaction_query_batch_responses(batch_query_response.clone()); diff --git a/base_layer/wallet/tests/utxo_scanner/mod.rs b/base_layer/wallet/tests/utxo_scanner/mod.rs index 09f2929bac..0ad4d3ccca 100644 --- a/base_layer/wallet/tests/utxo_scanner/mod.rs +++ b/base_layer/wallet/tests/utxo_scanner/mod.rs @@ -313,11 +313,11 @@ async fn test_utxo_scanner_recovery() { test_interface.rpc_service_state.set_blocks(block_headers.clone()); let chain_metadata = ChainMetadata { - height_of_longest_chain: Some(NUM_BLOCKS - 1), + height_of_longest_chain: NUM_BLOCKS - 1, best_block: block_headers.get(&(NUM_BLOCKS - 1)).unwrap().clone().hash().to_vec(), accumulated_difficulty: Vec::new(), pruned_height: 0, - timestamp: Some(0), + timestamp: 0, }; test_interface.rpc_service_state.set_tip_info_response(TipInfoResponse { metadata: Some(chain_metadata), @@ -412,11 +412,11 @@ async fn test_utxo_scanner_recovery_with_restart() { test_interface.rpc_service_state.set_blocks(block_headers.clone()); let chain_metadata = ChainMetadata { - height_of_longest_chain: Some(NUM_BLOCKS - 1), + height_of_longest_chain: NUM_BLOCKS - 1, best_block: block_headers.get(&(NUM_BLOCKS - 1)).unwrap().clone().hash().to_vec(), accumulated_difficulty: Vec::new(), pruned_height: 0, - timestamp: Some(0), + timestamp: 0, }; test_interface.rpc_service_state.set_tip_info_response(TipInfoResponse { metadata: Some(chain_metadata.clone()), @@ -578,11 +578,11 @@ async fn test_utxo_scanner_recovery_with_restart_and_reorg() { test_interface.rpc_service_state.set_blocks(block_headers.clone()); let chain_metadata = ChainMetadata { - height_of_longest_chain: Some(NUM_BLOCKS - 1), + height_of_longest_chain: NUM_BLOCKS - 1, best_block: block_headers.get(&(NUM_BLOCKS - 1)).unwrap().clone().hash().to_vec(), accumulated_difficulty: Vec::new(), pruned_height: 0, - timestamp: Some(0), + timestamp: 0, }; test_interface.rpc_service_state.set_tip_info_response(TipInfoResponse { metadata: Some(chain_metadata.clone()), @@ -651,11 +651,11 @@ async fn test_utxo_scanner_recovery_with_restart_and_reorg() { .set_utxos_by_block(utxos_by_block.clone()); test_interface2.rpc_service_state.set_blocks(block_headers.clone()); let chain_metadata = ChainMetadata { - height_of_longest_chain: Some(9), + height_of_longest_chain: 9, best_block: block_headers.get(&9).unwrap().clone().hash().to_vec(), accumulated_difficulty: Vec::new(), pruned_height: 0, - timestamp: Some(0), + timestamp: 0, }; test_interface2 .rpc_service_state @@ -776,7 +776,7 @@ async fn test_utxo_scanner_scanned_block_cache_clearing() { test_interface.rpc_service_state.set_blocks(block_headers.clone()); let chain_metadata = ChainMetadata { - height_of_longest_chain: Some(800 + NUM_BLOCKS - 1), + height_of_longest_chain: 800 + NUM_BLOCKS - 1, best_block: block_headers .get(&(800 + NUM_BLOCKS - 1)) .unwrap() @@ -785,7 +785,7 @@ async fn test_utxo_scanner_scanned_block_cache_clearing() { .to_vec(), accumulated_difficulty: Vec::new(), pruned_height: 0, - timestamp: Some(0), + timestamp: 0, }; test_interface.rpc_service_state.set_tip_info_response(TipInfoResponse { metadata: Some(chain_metadata), @@ -878,11 +878,11 @@ async fn test_utxo_scanner_one_sided_payments() { test_interface.rpc_service_state.set_blocks(block_headers.clone()); let chain_metadata = ChainMetadata { - height_of_longest_chain: Some(NUM_BLOCKS - 1), + height_of_longest_chain: NUM_BLOCKS - 1, best_block: block_headers.get(&(NUM_BLOCKS - 1)).unwrap().clone().hash().to_vec(), accumulated_difficulty: Vec::new(), pruned_height: 0, - timestamp: Some(0), + timestamp: 0, }; test_interface.rpc_service_state.set_tip_info_response(TipInfoResponse { metadata: Some(chain_metadata), @@ -998,11 +998,11 @@ async fn test_utxo_scanner_one_sided_payments() { .set_one_sided_payment_message("new one-sided message".to_string()); let chain_metadata = ChainMetadata { - height_of_longest_chain: Some(NUM_BLOCKS), + height_of_longest_chain: NUM_BLOCKS, best_block: block_headers.get(&(NUM_BLOCKS)).unwrap().clone().hash().to_vec(), accumulated_difficulty: Vec::new(), pruned_height: 0, - timestamp: Some(0), + timestamp: 0, }; test_interface.rpc_service_state.set_tip_info_response(TipInfoResponse { @@ -1085,11 +1085,11 @@ async fn test_birthday_timestamp_over_chain() { test_interface.rpc_service_state.set_blocks(block_headers.clone()); let chain_metadata = ChainMetadata { - height_of_longest_chain: Some(NUM_BLOCKS - 1), + height_of_longest_chain: NUM_BLOCKS - 1, best_block: block_headers.get(&(NUM_BLOCKS - 1)).unwrap().clone().hash().to_vec(), accumulated_difficulty: Vec::new(), pruned_height: 0, - timestamp: Some(0), + timestamp: 0, }; test_interface.rpc_service_state.set_tip_info_response(TipInfoResponse { metadata: Some(chain_metadata), diff --git a/comms/dht/Cargo.toml b/comms/dht/Cargo.toml index 6d0be4e336..ecb7c4646e 100644 --- a/comms/dht/Cargo.toml +++ b/comms/dht/Cargo.toml @@ -31,7 +31,6 @@ futures = "^0.3.1" log = "0.4.8" log-mdc = "0.1.0" prost = "=0.9.0" -prost-types = "=0.9.0" rand = "0.8" serde = "1.0.90" thiserror = "1.0.26" diff --git a/comms/dht/src/envelope.rs b/comms/dht/src/envelope.rs index e14162b245..920a037054 100644 --- a/comms/dht/src/envelope.rs +++ b/comms/dht/src/envelope.rs @@ -21,7 +21,6 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::{ - cmp, convert::{TryFrom, TryInto}, fmt, fmt::Display, @@ -29,7 +28,6 @@ use std::{ use bitflags::bitflags; use chrono::{DateTime, NaiveDateTime, Utc}; -use prost_types::Timestamp; use serde::{Deserialize, Serialize}; use tari_comms::{message::MessageTag, peer_manager::NodeId, types::CommsPublicKey, NodeIdentity}; use tari_utilities::{epoch_time::EpochTime, ByteArray, ByteArrayError}; @@ -39,21 +37,6 @@ use thiserror::Error; pub use crate::proto::envelope::{dht_header::Destination, DhtEnvelope, DhtHeader, DhtMessageType}; use crate::version::DhtProtocolVersion; -/// Utility function that converts a `chrono::DateTime` to a `prost_type::Timestamp` -pub(crate) fn datetime_to_timestamp(datetime: DateTime) -> Timestamp { - Timestamp { - seconds: datetime.timestamp(), - nanos: datetime.timestamp_subsec_nanos().try_into().unwrap_or(i32::MAX), - } -} - -/// Utility function that converts a `prost::Timestamp` to a `chrono::DateTime` -pub(crate) fn timestamp_to_datetime(timestamp: Timestamp) -> Option> { - let naive = - NaiveDateTime::from_timestamp_opt(timestamp.seconds, u32::try_from(cmp::max(0, timestamp.nanos)).unwrap())?; - Some(DateTime::from_utc(naive, Utc)) -} - /// Utility function that converts a `chrono::DateTime` to a `EpochTime` pub(crate) fn datetime_to_epochtime(datetime: DateTime) -> EpochTime { #[allow(clippy::cast_sign_loss)] @@ -229,7 +212,11 @@ impl TryFrom for DhtMessageHeader { ) }; - let expires = header.expires.and_then(timestamp_to_datetime); + let expires = match header.expires { + 0 => None, + t => Some(EpochTime::from_secs_since_epoch(t)), + }; + let version = DhtProtocolVersion::try_from(header.major)?; Ok(Self { @@ -240,7 +227,7 @@ impl TryFrom for DhtMessageHeader { message_type: DhtMessageType::from_i32(header.message_type).ok_or(DhtMessageError::InvalidMessageType)?, flags: DhtMessageFlags::from_bits(header.flags).ok_or(DhtMessageError::InvalidMessageFlags)?, message_tag: MessageTag::from(header.message_tag), - expires: expires.map(datetime_to_epochtime), + expires, }) } } @@ -258,7 +245,6 @@ impl TryFrom> for DhtMessageHeader { impl From for DhtHeader { fn from(header: DhtMessageHeader) -> Self { - let expires = header.expires.map(epochtime_to_datetime); Self { major: header.version.as_major(), ephemeral_public_key: header @@ -271,7 +257,7 @@ impl From for DhtHeader { message_type: header.message_type as i32, flags: header.flags.bits(), message_tag: header.message_tag.as_value(), - expires: expires.map(datetime_to_timestamp), + expires: header.expires.map(EpochTime::as_u64).unwrap_or_default(), } } } diff --git a/comms/dht/src/outbound/broadcast.rs b/comms/dht/src/outbound/broadcast.rs index d4a046f419..f82f4e0b8c 100644 --- a/comms/dht/src/outbound/broadcast.rs +++ b/comms/dht/src/outbound/broadcast.rs @@ -51,7 +51,7 @@ use crate::{ crypt, dedup, discovery::DhtDiscoveryRequester, - envelope::{datetime_to_epochtime, datetime_to_timestamp, DhtMessageFlags, DhtMessageHeader, NodeDestination}, + envelope::{datetime_to_epochtime, DhtMessageFlags, DhtMessageHeader, NodeDestination}, message_signature::MessageSignature, outbound::{ message::{DhtOutboundMessage, OutboundEncryption, SendFailure}, @@ -447,7 +447,7 @@ where S: Service ephemeral_public_key: ephemeral_public_key.clone(), message_signature: message_signature.clone(), is_broadcast, - expires: expires.map(datetime_to_timestamp), + expires: expires_epochtime.map(EpochTime::as_u64), }, send_state, ) diff --git a/comms/dht/src/outbound/message.rs b/comms/dht/src/outbound/message.rs index 226942f1b6..c2353446c7 100644 --- a/comms/dht/src/outbound/message.rs +++ b/comms/dht/src/outbound/message.rs @@ -174,7 +174,7 @@ pub struct DhtOutboundMessage { pub reply: MessagingReplyTx, pub dht_flags: DhtMessageFlags, pub is_broadcast: bool, - pub expires: Option, + pub expires: Option, } impl fmt::Display for DhtOutboundMessage { diff --git a/comms/dht/src/outbound/serialize.rs b/comms/dht/src/outbound/serialize.rs index a32c87c807..a69dddfe61 100644 --- a/comms/dht/src/outbound/serialize.rs +++ b/comms/dht/src/outbound/serialize.rs @@ -95,7 +95,7 @@ where flags: dht_flags.bits(), destination: Some(destination.into()), message_tag: tag.as_value(), - expires, + expires: expires.unwrap_or_default(), }); let envelope = DhtEnvelope::new(dht_header, body.into()); diff --git a/comms/dht/src/proto/envelope.proto b/comms/dht/src/proto/envelope.proto index 38336a0b5b..d54a5ba39e 100644 --- a/comms/dht/src/proto/envelope.proto +++ b/comms/dht/src/proto/envelope.proto @@ -45,7 +45,7 @@ message DhtHeader { // Message trace ID uint64 message_tag = 11; // Expiry timestamp for the message - google.protobuf.Timestamp expires = 12; + uint64 expires = 12; } message DhtEnvelope { diff --git a/comms/dht/src/proto/store_forward.proto b/comms/dht/src/proto/store_forward.proto index 830004e5ea..c0445dab08 100644 --- a/comms/dht/src/proto/store_forward.proto +++ b/comms/dht/src/proto/store_forward.proto @@ -13,14 +13,14 @@ package tari.dht.store_forward; // start_time is provided then only messages after the specified time will be sent, otherwise all applicable messages // will be sent. message StoredMessagesRequest { - google.protobuf.Timestamp since = 1; + uint64 since = 1; uint32 request_id = 2; uint32 limit = 3; } // Storage for a single message envelope, including the date and time when the element was stored message StoredMessage { - google.protobuf.Timestamp stored_at = 1; + uint64 stored_at = 1; uint32 version = 2; tari.dht.envelope.DhtHeader dht_header = 3; bytes body = 4; diff --git a/comms/dht/src/store_forward/message.rs b/comms/dht/src/store_forward/message.rs index bdce545204..0d30498773 100644 --- a/comms/dht/src/store_forward/message.rs +++ b/comms/dht/src/store_forward/message.rs @@ -27,7 +27,7 @@ use prost::Message; use rand::{rngs::OsRng, RngCore}; use crate::{ - envelope::datetime_to_timestamp, + envelope::datetime_to_epochtime, proto::{ envelope::DhtHeader, store_forward::{StoredMessage, StoredMessagesRequest, StoredMessagesResponse}, @@ -38,7 +38,7 @@ use crate::{ impl StoredMessagesRequest { pub fn new() -> Self { Self { - since: None, + since: 0, request_id: OsRng.next_u32(), limit: 0, } @@ -46,7 +46,7 @@ impl StoredMessagesRequest { pub fn since(since: DateTime) -> Self { Self { - since: Some(datetime_to_timestamp(since)), + since: datetime_to_epochtime(since).as_u64(), request_id: OsRng.next_u32(), limit: 0, } @@ -65,7 +65,7 @@ impl StoredMessage { version, dht_header: Some(dht_header.into()), body, - stored_at: Some(datetime_to_timestamp(stored_at)), + stored_at: stored_at.timestamp() as u64, } } } @@ -76,7 +76,7 @@ impl TryFrom for StoredMessage { fn try_from(message: database::StoredMessage) -> Result { let dht_header = DhtHeader::decode(message.header.as_slice())?; Ok(Self { - stored_at: Some(datetime_to_timestamp(DateTime::from_utc(message.stored_at, Utc))), + stored_at: message.stored_at.timestamp() as u64, version: message.version as u32, body: message.body, dht_header: Some(dht_header), diff --git a/comms/dht/src/store_forward/saf_handler/task.rs b/comms/dht/src/store_forward/saf_handler/task.rs index b1fc068f83..e630d84c41 100644 --- a/comms/dht/src/store_forward/saf_handler/task.rs +++ b/comms/dht/src/store_forward/saf_handler/task.rs @@ -26,7 +26,7 @@ use std::{ sync::Arc, }; -use chrono::{DateTime, NaiveDateTime, Utc}; +use chrono::{DateTime, Utc}; use futures::{future, stream, StreamExt}; use log::*; use prost::Message; @@ -37,7 +37,7 @@ use tari_comms::{ types::{CommsDHKE, CommsPublicKey}, BytesMut, }; -use tari_utilities::ByteArray; +use tari_utilities::{epoch_time::EpochTime, ByteArray}; use tokio::sync::mpsc; use tower::{Service, ServiceExt}; @@ -45,7 +45,7 @@ use crate::{ actor::{DhtRequester, OffenceSeverity}, crypt, dedup, - envelope::{timestamp_to_datetime, DhtMessageError, DhtMessageHeader, NodeDestination}, + envelope::{epochtime_to_datetime, DhtMessageError, DhtMessageHeader, NodeDestination}, inbound::{DecryptedDhtMessage, DhtInboundMessage}, message_signature::{MessageSignature, MessageSignatureError, ProtoMessageSignature}, outbound::{OutboundMessageRequester, SendMessageParams}, @@ -207,8 +207,10 @@ where S: Service query.with_limit(cmp::min(retrieve_msgs.limit, max)); } - let since = match retrieve_msgs.since.and_then(timestamp_to_datetime) { - Some(since) => { + let since = match retrieve_msgs.since { + 0 => None, + since => { + let since = epochtime_to_datetime(EpochTime::from_secs_since_epoch(since)); debug!( target: LOG_TARGET, "Peer '{}' requested all messages since '{}'", @@ -218,7 +220,6 @@ where S: Service query.with_messages_since(since); Some(since) }, - None => None, }; let response_types = vec![SafResponseType::ForMe]; @@ -421,23 +422,7 @@ where S: Service return Err(StoreAndForwardError::DhtMessageError(DhtMessageError::BodyEmpty)); } - let stored_at = message - .stored_at - .map(|t| { - Result::<_, StoreAndForwardError>::Ok(DateTime::from_utc( - NaiveDateTime::from_timestamp_opt(t.seconds, 0).ok_or_else(|| { - StoreAndForwardError::InvalidSafResponseMessage { - field: "stored_at", - details: "number of seconds provided represents more days than can fit in a NaiveDateTime" - .to_string(), - } - })?, - Utc, - )) - }) - .transpose()? - .unwrap_or(DateTime::::MIN_UTC); - + let stored_at = epochtime_to_datetime(EpochTime::from_secs_since_epoch(message.stored_at)); if stored_at > Utc::now() { return Err(StoreAndForwardError::StoredAtWasInFuture); } @@ -795,7 +780,7 @@ mod test { message: String, node_identity: &NodeIdentity, dht_header: DhtMessageHeader, - stored_at: NaiveDateTime, + stored_at: chrono::NaiveDateTime, ) -> StoredMessage { let msg_hash = hex::to_hex(&dedup::create_message_hash( &dht_header.message_signature, @@ -845,7 +830,10 @@ mod test { ) .unwrap(); - let since = Utc::now().checked_sub_signed(chrono::Duration::seconds(60)).unwrap(); + let since = Utc::now() + .checked_sub_signed(chrono::Duration::seconds(60)) + .map(|d| d.with_nanosecond(0).unwrap()) + .unwrap(); let mut message = DecryptedDhtMessage::succeeded( wrap_in_envelope_body!(StoredMessagesRequest::since(since)), None,