diff --git a/base_layer/chat_ffi/chat.h b/base_layer/chat_ffi/chat.h index 08445cc2a3..abb0eef7e8 100644 --- a/base_layer/chat_ffi/chat.h +++ b/base_layer/chat_ffi/chat.h @@ -18,6 +18,8 @@ struct ChatMessageMetadataVector; struct ChatMessages; +struct Confirmation; + struct Message; struct TariAddress; @@ -43,6 +45,10 @@ struct ChatFFIMessage { typedef void (*CallbackMessageReceived)(struct ChatFFIMessage*); +typedef void (*CallbackDeliveryConfirmationReceived)(struct Confirmation*); + +typedef void (*CallbackReadConfirmationReceived)(struct Confirmation*); + #ifdef __cplusplus extern "C" { #endif // __cplusplus @@ -65,7 +71,9 @@ extern "C" { struct ChatClientFFI *create_chat_client(struct ApplicationConfig *config, int *error_out, CallbackContactStatusChange callback_contact_status_change, - CallbackMessageReceived callback_message_received); + CallbackMessageReceived callback_message_received, + CallbackDeliveryConfirmationReceived callback_delivery_confirmation_received, + CallbackReadConfirmationReceived callback_read_confirmation_received); /** * Frees memory for a ChatClientFFI @@ -118,6 +126,52 @@ struct ApplicationConfig *create_chat_config(const char *network_str, */ void destroy_chat_config(struct ApplicationConfig *config); +/** + * Get a pointer to a ChatByteVector representation of a message id + * + * ## Arguments + * `confirmation` - A pointer to the Confirmation + * `error_out` - Pointer to an int which will be modified + * + * ## Returns + * `*mut ChatByteVector` - A ptr to a ChatByteVector + * + * # Safety + * The ```confirmation``` When done with the confirmation it should be destroyed + * The ```ChatByteVector``` When done with the returned ChatByteVector it should be destroyed + */ +struct ChatByteVector *read_confirmation_message_id(struct Confirmation *confirmation, + int *error_out); + +/** + * Get a c_uint timestamp for the confirmation + * + * ## Arguments + * `confirmation` - A pointer to the Confirmation + * `error_out` - Pointer to an int which will be modified + * + * ## Returns + * `c_uint` - A uint representation of time. May return 0 if casting fails + * + * # Safety + * None + */ +unsigned int read_confirmation_timestamp(struct Confirmation *confirmation, int *error_out); + +/** + * Frees memory for a Confirmation + * + * ## Arguments + * `address` - The pointer of a Confirmation + * + * ## Returns + * `()` - Does not return a value, equivalent to void in C + * + * # Safety + * None + */ +void destroy_confirmation(struct Confirmation *address); + /** * Add a contact * @@ -261,6 +315,25 @@ void add_chat_message_metadata(struct Message *message, struct ChatByteVector *data, int *error_out); +/** + * Sends a read confirmation for a given message + * + * ## Arguments + * `client` - The chat client + * `message` - The message that was read + * `error_out` - Pointer to an int which will be modified + * + * ## Returns + * `*mut TariAddress` - A ptr to a TariAddress + * + * # Safety + * The ```ChatClientFFI``` When done with the client it should be destroyed + * The ```Message``` When done with the Message it should be destroyed + */ +void send_read_confirmation_for_message(struct ChatClientFFI *client, + struct Message *message, + int *error_out); + /** * Creates a tor transport config * diff --git a/base_layer/chat_ffi/src/callback_handler.rs b/base_layer/chat_ffi/src/callback_handler.rs index c0d770973b..0251956129 100644 --- a/base_layer/chat_ffi/src/callback_handler.rs +++ b/base_layer/chat_ffi/src/callback_handler.rs @@ -25,7 +25,7 @@ use std::{convert::TryFrom, ops::Deref}; use log::{debug, error, info, trace}; use tari_contacts::contacts_service::{ handle::{ContactsLivenessData, ContactsLivenessEvent, ContactsServiceHandle}, - types::Message, + types::{Confirmation, Message, MessageDispatch}, }; use tari_shutdown::ShutdownSignal; @@ -35,12 +35,16 @@ const LOG_TARGET: &str = "chat_ffi::callback_handler"; pub(crate) type CallbackContactStatusChange = unsafe extern "C" fn(*mut ChatFFIContactsLivenessData); pub(crate) type CallbackMessageReceived = unsafe extern "C" fn(*mut ChatFFIMessage); +pub(crate) type CallbackDeliveryConfirmationReceived = unsafe extern "C" fn(*mut Confirmation); +pub(crate) type CallbackReadConfirmationReceived = unsafe extern "C" fn(*mut Confirmation); #[derive(Clone)] pub struct CallbackHandler { contacts_service_handle: ContactsServiceHandle, callback_contact_status_change: CallbackContactStatusChange, callback_message_received: CallbackMessageReceived, + callback_delivery_confirmation_received: CallbackDeliveryConfirmationReceived, + callback_read_confirmation_received: CallbackReadConfirmationReceived, shutdown: ShutdownSignal, } @@ -50,12 +54,16 @@ impl CallbackHandler { shutdown: ShutdownSignal, callback_contact_status_change: CallbackContactStatusChange, callback_message_received: CallbackMessageReceived, + callback_delivery_confirmation_received: CallbackDeliveryConfirmationReceived, + callback_read_confirmation_received: CallbackReadConfirmationReceived, ) -> Self { Self { contacts_service_handle, shutdown, callback_contact_status_change, callback_message_received, + callback_delivery_confirmation_received, + callback_read_confirmation_received, } } @@ -67,9 +75,22 @@ impl CallbackHandler { tokio::select! { rec_message = chat_messages.recv() => { match rec_message { - Ok(message) => { - trace!(target: LOG_TARGET, "FFI Callback monitor received a new Message"); - self.trigger_message_received(message.deref().clone()); + Ok(message_dispatch) => { + trace!(target: LOG_TARGET, "FFI Callback monitor received a new MessageDispatch"); + match message_dispatch.deref() { + MessageDispatch::Message(m) => { + trace!(target: LOG_TARGET, "FFI Callback monitor received a new Message"); + self.trigger_message_received(m.clone()); + } + MessageDispatch::DeliveryConfirmation(c) => { + trace!(target: LOG_TARGET, "FFI Callback monitor received a new Delivery Confirmation"); + self.trigger_delivery_confirmation_received(c.clone()); + }, + MessageDispatch::ReadConfirmation(c) => { + trace!(target: LOG_TARGET, "FFI Callback monitor received a new Read Confirmation"); + self.trigger_read_confirmation_received(c.clone()); + } + }; }, Err(_) => { debug!(target: LOG_TARGET, "FFI Callback monitor had an error receiving new messages")} } @@ -130,4 +151,28 @@ impl CallbackHandler { Err(e) => error!(target: LOG_TARGET, "Error processing message received callback: {}", e), } } + + fn trigger_delivery_confirmation_received(&mut self, confirmation: Confirmation) { + debug!( + target: LOG_TARGET, + "Calling DeliveryConfirmationReceived callback function for message {:?}", + confirmation.message_id, + ); + + unsafe { + (self.callback_delivery_confirmation_received)(Box::into_raw(Box::new(confirmation))); + } + } + + fn trigger_read_confirmation_received(&mut self, confirmation: Confirmation) { + debug!( + target: LOG_TARGET, + "Calling ReadConfirmationReceived callback function for message {:?}", + confirmation.message_id, + ); + + unsafe { + (self.callback_read_confirmation_received)(Box::into_raw(Box::new(confirmation))); + } + } } diff --git a/base_layer/chat_ffi/src/confirmation.rs b/base_layer/chat_ffi/src/confirmation.rs new file mode 100644 index 0000000000..93c1a2bdb4 --- /dev/null +++ b/base_layer/chat_ffi/src/confirmation.rs @@ -0,0 +1,147 @@ +// Copyright 2023, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{convert::TryFrom, ptr}; + +use libc::{c_int, c_uint}; +use tari_contacts::contacts_service::types::Confirmation; + +use crate::{ + error::{InterfaceError, LibChatError}, + types::{chat_byte_vector_create, ChatByteVector}, +}; + +/// Get a pointer to a ChatByteVector representation of a message id +/// +/// ## Arguments +/// `confirmation` - A pointer to the Confirmation +/// `error_out` - Pointer to an int which will be modified +/// +/// ## Returns +/// `*mut ChatByteVector` - A ptr to a ChatByteVector +/// +/// # Safety +/// The ```confirmation``` When done with the confirmation it should be destroyed +/// The ```ChatByteVector``` When done with the returned ChatByteVector it should be destroyed +#[no_mangle] +pub unsafe extern "C" fn read_confirmation_message_id( + confirmation: *mut Confirmation, + error_out: *mut c_int, +) -> *mut ChatByteVector { + let mut error = 0; + ptr::swap(error_out, &mut error as *mut c_int); + + if confirmation.is_null() { + error = LibChatError::from(InterfaceError::NullError("client".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + } + + let c = &(*confirmation); + let data_bytes = c.message_id.clone(); + let len = u32::try_from(data_bytes.len()).expect("Can't cast from usize"); + chat_byte_vector_create(data_bytes.as_ptr(), len as c_uint, error_out) +} + +/// Get a c_uint timestamp for the confirmation +/// +/// ## Arguments +/// `confirmation` - A pointer to the Confirmation +/// `error_out` - Pointer to an int which will be modified +/// +/// ## Returns +/// `c_uint` - A uint representation of time. May return 0 if casting fails +/// +/// # Safety +/// None +#[no_mangle] +pub unsafe extern "C" fn read_confirmation_timestamp(confirmation: *mut Confirmation, error_out: *mut c_int) -> c_uint { + let mut error = 0; + ptr::swap(error_out, &mut error as *mut c_int); + + if confirmation.is_null() { + error = LibChatError::from(InterfaceError::NullError("client".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + } + + let c = &(*confirmation); + c_uint::try_from(c.timestamp).unwrap_or(0) +} + +/// Frees memory for a Confirmation +/// +/// ## Arguments +/// `address` - The pointer of a Confirmation +/// +/// ## Returns +/// `()` - Does not return a value, equivalent to void in C +/// +/// # Safety +/// None +#[no_mangle] +pub unsafe extern "C" fn destroy_confirmation(address: *mut Confirmation) { + if !address.is_null() { + drop(Box::from_raw(address)) + } +} + +#[cfg(test)] +mod test { + use tari_contacts::contacts_service::types::{Confirmation, MessageBuilder}; + use tari_utilities::epoch_time::EpochTime; + + use crate::{ + confirmation::{destroy_confirmation, read_confirmation_message_id, read_confirmation_timestamp}, + types::{chat_byte_vector_get_at, chat_byte_vector_get_length}, + }; + + #[test] + fn test_reading_from_confrimation() { + let message_id = MessageBuilder::new().build().message_id; + let timestamp = EpochTime::now().as_u64(); + let confirmation = Confirmation { + message_id: message_id.clone(), + timestamp, + }; + + let confirmation_ptr = Box::into_raw(Box::new(confirmation)); + let error_out = Box::into_raw(Box::new(0)); + + unsafe { + let id_byte_vec = read_confirmation_message_id(confirmation_ptr, error_out); + let len = chat_byte_vector_get_length(id_byte_vec, error_out); + + let mut read_id = vec![]; + for i in 0..len { + read_id.push(chat_byte_vector_get_at(id_byte_vec, i, error_out)); + } + + assert_eq!(message_id, read_id) + } + + unsafe { + let read_timestamp = read_confirmation_timestamp(confirmation_ptr, error_out); + assert_eq!(timestamp, u64::from(read_timestamp)) + } + + unsafe { destroy_confirmation(confirmation_ptr) } + } +} diff --git a/base_layer/chat_ffi/src/lib.rs b/base_layer/chat_ffi/src/lib.rs index 87903ba8a0..4f367c5d47 100644 --- a/base_layer/chat_ffi/src/lib.rs +++ b/base_layer/chat_ffi/src/lib.rs @@ -32,18 +32,25 @@ use tari_chat_client::{config::ApplicationConfig, networking::PeerFeatures, Chat use tokio::runtime::Runtime; use crate::{ - callback_handler::{CallbackHandler, CallbackMessageReceived}, + callback_handler::{ + CallbackDeliveryConfirmationReceived, + CallbackHandler, + CallbackMessageReceived, + CallbackReadConfirmationReceived, + }, error::{InterfaceError, LibChatError}, logging::init_logging, }; mod application_config; mod callback_handler; +mod confirmation; mod contacts; mod error; mod logging; mod message; mod message_metadata; +mod read_receipt; mod tansport_config; mod tari_address; mod types; @@ -79,6 +86,8 @@ pub unsafe extern "C" fn create_chat_client( error_out: *mut c_int, callback_contact_status_change: CallbackContactStatusChange, callback_message_received: CallbackMessageReceived, + callback_delivery_confirmation_received: CallbackDeliveryConfirmationReceived, + callback_read_confirmation_received: CallbackReadConfirmationReceived, ) -> *mut ChatClientFFI { let mut error = 0; ptr::swap(error_out, &mut error as *mut c_int); @@ -137,6 +146,8 @@ pub unsafe extern "C" fn create_chat_client( client.shutdown.to_signal(), callback_contact_status_change, callback_message_received, + callback_delivery_confirmation_received, + callback_read_confirmation_received, ); runtime.spawn(async move { diff --git a/base_layer/chat_ffi/src/read_receipt.rs b/base_layer/chat_ffi/src/read_receipt.rs new file mode 100644 index 0000000000..55ae263958 --- /dev/null +++ b/base_layer/chat_ffi/src/read_receipt.rs @@ -0,0 +1,69 @@ +// Copyright 2023, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::ptr; + +use libc::c_int; +use tari_chat_client::ChatClient; +use tari_contacts::contacts_service::types::Message; + +use crate::{ + error::{InterfaceError, LibChatError}, + ChatClientFFI, +}; + +/// Sends a read confirmation for a given message +/// +/// ## Arguments +/// `client` - The chat client +/// `message` - The message that was read +/// `error_out` - Pointer to an int which will be modified +/// +/// ## Returns +/// `*mut TariAddress` - A ptr to a TariAddress +/// +/// # Safety +/// The ```ChatClientFFI``` When done with the client it should be destroyed +/// The ```Message``` When done with the Message it should be destroyed +#[no_mangle] +pub unsafe extern "C" fn send_read_confirmation_for_message( + client: *mut ChatClientFFI, + message: *mut Message, + error_out: *mut c_int, +) { + let mut error = 0; + ptr::swap(error_out, &mut error as *mut c_int); + + if client.is_null() { + error = LibChatError::from(InterfaceError::NullError("client".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + } + + if message.is_null() { + error = LibChatError::from(InterfaceError::NullError("message".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + } + + (*client) + .runtime + .block_on((*client).client.send_read_receipt((*message).clone())); +} diff --git a/base_layer/contacts/examples/chat_client/src/client.rs b/base_layer/contacts/examples/chat_client/src/client.rs index 9fe6b475e1..9f46f4d57d 100644 --- a/base_layer/contacts/examples/chat_client/src/client.rs +++ b/base_layer/contacts/examples/chat_client/src/client.rs @@ -49,6 +49,7 @@ pub trait ChatClient { fn create_message(&self, receiver: &TariAddress, message: String) -> Message; async fn get_messages(&self, sender: &TariAddress, limit: u64, page: u64) -> Vec; async fn send_message(&self, message: Message); + async fn send_read_receipt(&self, message: Message); fn identity(&self) -> &NodeIdentity; fn shutdown(&mut self); } @@ -171,6 +172,15 @@ impl ChatClient for Client { messages } + async fn send_read_receipt(&self, message: Message) { + if let Some(mut contacts_service) = self.contacts.clone() { + contacts_service + .send_read_confirmation(message.address.clone(), message.message_id) + .await + .expect("Read receipt not sent"); + } + } + fn create_message(&self, receiver: &TariAddress, message: String) -> Message { MessageBuilder::new().address(receiver.clone()).message(message).build() } diff --git a/base_layer/contacts/migrations/2023-09-05-132343_add_metadata_to_messages/down.sql b/base_layer/contacts/migrations/2023-09-05-132343_add_metadata_to_messages/down.sql index edfc520ef0..67d5f203b3 100644 --- a/base_layer/contacts/migrations/2023-09-05-132343_add_metadata_to_messages/down.sql +++ b/base_layer/contacts/migrations/2023-09-05-132343_add_metadata_to_messages/down.sql @@ -1 +1 @@ -ALTER TABLE contacts drop metadata; +ALTER TABLE messages drop metadata; diff --git a/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/down.sql b/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/down.sql new file mode 100644 index 0000000000..7a105dcd03 --- /dev/null +++ b/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/down.sql @@ -0,0 +1,2 @@ +ALTER TABLE messages drop delivery_confirmation_at; +ALTER TABLE messages drop read_confirmation_at; diff --git a/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/up.sql b/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/up.sql new file mode 100644 index 0000000000..b634514a74 --- /dev/null +++ b/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/up.sql @@ -0,0 +1,2 @@ + ALTER TABLE messages ADD delivery_confirmation_at TIMESTAMP NULL; + ALTER TABLE messages ADD read_confirmation_at TIMESTAMP NULL; diff --git a/base_layer/contacts/proto/message.proto b/base_layer/contacts/proto/message.proto index 41301eb01c..6770cb65b7 100644 --- a/base_layer/contacts/proto/message.proto +++ b/base_layer/contacts/proto/message.proto @@ -9,8 +9,7 @@ message Message { repeated MessageMetadata metadata = 2; bytes address = 3; DirectionEnum direction = 4; - uint64 stored_at = 5; - bytes message_id = 6; + bytes message_id = 5; } enum DirectionEnum { @@ -25,4 +24,17 @@ message MessageMetadata { enum MessageTypeEnum { TokenRequest = 0; -} \ No newline at end of file +} + +message Confirmation { + bytes message_id = 1; + uint64 timestamp = 2; +} + +message MessageDispatch { + oneof contents { + Message message = 1; + Confirmation delivery_confirmation = 2; + Confirmation read_confirmation = 3; + } +} diff --git a/base_layer/contacts/src/contacts_service/handle.rs b/base_layer/contacts/src/contacts_service/handle.rs index 3ea70aeb89..d69c74d7eb 100644 --- a/base_layer/contacts/src/contacts_service/handle.rs +++ b/base_layer/contacts/src/contacts_service/handle.rs @@ -30,13 +30,14 @@ use chrono::{DateTime, Local, NaiveDateTime}; use tari_common_types::tari_address::TariAddress; use tari_comms::peer_manager::NodeId; use tari_service_framework::reply_channel::SenderService; +use tari_utilities::epoch_time::EpochTime; use tokio::sync::broadcast; use tower::Service; use crate::contacts_service::{ error::ContactsServiceError, service::{ContactMessageType, ContactOnlineStatus}, - types::{Contact, Message}, + types::{Confirmation, Contact, Message, MessageDispatch}, }; pub static DEFAULT_MESSAGE_LIMIT: u64 = 35; @@ -137,6 +138,7 @@ pub enum ContactsServiceRequest { GetContactOnlineStatus(Contact), SendMessage(TariAddress, Message), GetMessages(TariAddress, i64, i64), + SendReadConfirmation(TariAddress, Confirmation), } #[derive(Debug)] @@ -148,6 +150,7 @@ pub enum ContactsServiceResponse { OnlineStatus(ContactOnlineStatus), Messages(Vec), MessageSent, + ReadConfirmationSent, } #[derive(Clone)] @@ -155,7 +158,7 @@ pub struct ContactsServiceHandle { request_response_service: SenderService>, liveness_events: broadcast::Sender>, - message_events: broadcast::Sender>, + message_events: broadcast::Sender>, } impl ContactsServiceHandle { @@ -165,7 +168,7 @@ impl ContactsServiceHandle { Result, >, liveness_events: broadcast::Sender>, - message_events: broadcast::Sender>, + message_events: broadcast::Sender>, ) -> Self { Self { request_response_service, @@ -222,7 +225,7 @@ impl ContactsServiceHandle { self.liveness_events.subscribe() } - pub fn get_messages_event_stream(&self) -> broadcast::Receiver> { + pub fn get_messages_event_stream(&self) -> broadcast::Receiver> { self.message_events.subscribe() } @@ -282,4 +285,25 @@ impl ContactsServiceHandle { _ => Err(ContactsServiceError::UnexpectedApiResponse), } } + + pub async fn send_read_confirmation( + &mut self, + address: TariAddress, + message_id: Vec, + ) -> Result<(), ContactsServiceError> { + match self + .request_response_service + .call(ContactsServiceRequest::SendReadConfirmation( + address.clone(), + Confirmation { + message_id, + timestamp: EpochTime::now().as_u64(), + }, + )) + .await?? + { + ContactsServiceResponse::ReadConfirmationSent => Ok(()), + _ => Err(ContactsServiceError::UnexpectedApiResponse), + } + } } diff --git a/base_layer/contacts/src/contacts_service/service.rs b/base_layer/contacts/src/contacts_service/service.rs index 47c57f6ac6..424276d9f2 100644 --- a/base_layer/contacts/src/contacts_service/service.rs +++ b/base_layer/contacts/src/contacts_service/service.rs @@ -32,7 +32,10 @@ use chrono::{NaiveDateTime, Utc}; use futures::{pin_mut, StreamExt}; use log::*; use tari_common_types::tari_address::TariAddress; -use tari_comms::connectivity::{ConnectivityEvent, ConnectivityRequester}; +use tari_comms::{ + connectivity::{ConnectivityEvent, ConnectivityRequester}, + types::CommsPublicKey, +}; use tari_comms_dht::{domain_message::OutboundDomainMessage, outbound::OutboundEncryption, Dht}; use tari_p2p::{ comms_connector::SubscriptionFactory, @@ -53,7 +56,7 @@ use crate::contacts_service::{ handle::{ContactsLivenessData, ContactsLivenessEvent, ContactsServiceRequest, ContactsServiceResponse}, proto, storage::database::{ContactsBackend, ContactsDatabase}, - types::{Contact, Message}, + types::{Confirmation, Contact, Message, MessageDispatch}, }; const LOG_TARGET: &str = "contacts::contacts_service"; @@ -130,7 +133,7 @@ where T: ContactsBackend + 'static dht: Dht, subscription_factory: Arc, event_publisher: broadcast::Sender>, - message_publisher: broadcast::Sender>, + message_publisher: broadcast::Sender>, number_of_rounds_no_pings: u16, contacts_auto_ping_interval: Duration, contacts_online_ping_window: usize, @@ -151,7 +154,7 @@ where T: ContactsBackend + 'static dht: Dht, subscription_factory: Arc, event_publisher: broadcast::Sender>, - message_publisher: broadcast::Sender>, + message_publisher: broadcast::Sender>, contacts_auto_ping_interval: Duration, contacts_online_ping_window: usize, ) -> Self { @@ -189,7 +192,7 @@ where T: ContactsBackend + 'static let chat_messages = self .subscription_factory .get_subscription(TariMessageType::Chat, SUBSCRIPTION_LABEL) - .map(map_decode::); + .map(map_decode::); pin_mut!(chat_messages); @@ -296,42 +299,25 @@ where T: ContactsBackend + 'static Ok(result.map(ContactsServiceResponse::Messages)?) }, ContactsServiceRequest::SendMessage(address, mut message) => { - let contact = match self.db.get_contact(address.clone()) { - Ok(contact) => contact, - Err(_) => Contact::from(&address), - }; - - let ob_message = OutboundDomainMessage::from(message.clone()); - let encryption = OutboundEncryption::EncryptFor(Box::new(address.public_key().clone())); - - match self.get_online_status(&contact).await { - Ok(ContactOnlineStatus::Online) => { - info!(target: LOG_TARGET, "Chat message being sent directed"); - let mut comms_outbound = self.dht.outbound_requester(); - - comms_outbound - .send_direct_encrypted( - address.public_key().clone(), - ob_message, - encryption, - "contact service messaging".to_string(), - ) - .await?; - }, - Err(e) => return Err(e), - _ => { - let mut comms_outbound = self.dht.outbound_requester(); - comms_outbound - .closest_broadcast(address.public_key().clone(), encryption, vec![], ob_message) - .await?; - }, - } + let ob_message = OutboundDomainMessage::from(MessageDispatch::Message(message.clone())); message.stored_at = Utc::now().naive_utc().timestamp() as u64; self.db.save_message(message)?; + self.deliver_message(address, ob_message).await?; Ok(ContactsServiceResponse::MessageSent) }, + ContactsServiceRequest::SendReadConfirmation(address, confirmation) => { + let msg = OutboundDomainMessage::from(MessageDispatch::ReadConfirmation(confirmation.clone())); + trace!(target: LOG_TARGET, "Sending read confirmation with details: message_id: {:?}, timestamp: {:?}", confirmation.message_id, confirmation.timestamp); + + self.deliver_message(address, msg).await?; + + self.db + .confirm_message(confirmation.message_id.clone(), None, Some(confirmation.timestamp))?; + + Ok(ContactsServiceResponse::ReadConfirmationSent) + }, } } @@ -406,7 +392,7 @@ where T: ContactsBackend + 'static async fn handle_incoming_message( &mut self, - msg: DomainMessage>, + msg: DomainMessage>, ) -> Result<(), ContactsServiceError> { let msg_inner = match &msg.inner { Ok(msg) => msg.clone(), @@ -421,22 +407,17 @@ where T: ContactsBackend + 'static }, }; if let Some(source_public_key) = msg.authenticated_origin { - let message = Message::try_from(msg_inner).map_err(ContactsServiceError::MessageParsingError)?; - - let our_message = Message { - address: TariAddress::from_public_key(&source_public_key, message.address.network()), - stored_at: EpochTime::now().as_u64(), - ..message - }; - - self.db.save_message(our_message.clone())?; + let dispatch = MessageDispatch::try_from(msg_inner).map_err(ContactsServiceError::MessageParsingError)?; - let _msg = self.message_publisher.send(Arc::new(our_message)); + match dispatch { + MessageDispatch::Message(m) => self.handle_chat_message(m, source_public_key).await, + MessageDispatch::DeliveryConfirmation(_) | MessageDispatch::ReadConfirmation(_) => { + self.handle_confirmation(dispatch.clone()).await + }, + } } else { - return Err(ContactsServiceError::MessageSourceDoesNotMatchOrigin); + Err(ContactsServiceError::MessageSourceDoesNotMatchOrigin) } - - Ok(()) } async fn get_online_status(&self, contact: &Contact) -> Result { @@ -559,4 +540,105 @@ where T: ContactsBackend + 'static _ => {}, } } + + async fn handle_chat_message( + &mut self, + message: Message, + source_public_key: CommsPublicKey, + ) -> Result<(), ContactsServiceError> { + let our_message = Message { + address: TariAddress::from_public_key(&source_public_key, message.address.network()), + stored_at: EpochTime::now().as_u64(), + ..message + }; + + match self.db.save_message(our_message.clone()) { + Ok(..) => { + let _msg = self + .message_publisher + .send(Arc::new(MessageDispatch::Message(our_message.clone()))); + + // Send a delivery notification + self.create_and_send_delivery_confirmation_for_msg(&our_message).await?; + + Ok(()) + }, + Err(e) => Err(e.into()), + } + } + + async fn create_and_send_delivery_confirmation_for_msg( + &mut self, + message: &Message, + ) -> Result<(), ContactsServiceError> { + let address = &message.address; + let delivery_time = EpochTime::now().as_u64(); + let confirmation = MessageDispatch::DeliveryConfirmation(Confirmation { + message_id: message.message_id.clone(), + timestamp: delivery_time, + }); + let msg = OutboundDomainMessage::from(confirmation); + + self.deliver_message(address.clone(), msg).await?; + + self.db + .confirm_message(message.message_id.clone(), Some(delivery_time), None)?; + + Ok(()) + } + + async fn handle_confirmation(&mut self, dispatch: MessageDispatch) -> Result<(), ContactsServiceError> { + let (message_id, delivery, read) = match dispatch.clone() { + MessageDispatch::DeliveryConfirmation(c) => (c.message_id, Some(c.timestamp), None), + MessageDispatch::ReadConfirmation(c) => (c.message_id, None, Some(c.timestamp)), + _ => { + return Err(ContactsServiceError::MessageParsingError( + "Incorrect confirmation type".to_string(), + )) + }, + }; + + trace!(target: LOG_TARGET, "Handling confirmation with details: message_id: {:?}, delivery: {:?}, read: {:?}", message_id, delivery, read); + self.db.confirm_message(message_id, delivery, read)?; + let _msg = self.message_publisher.send(Arc::new(dispatch)); + + Ok(()) + } + + async fn deliver_message( + &mut self, + address: TariAddress, + message: OutboundDomainMessage, + ) -> Result<(), ContactsServiceError> { + let contact = match self.db.get_contact(address.clone()) { + Ok(contact) => contact, + Err(_) => Contact::from(&address), + }; + let encryption = OutboundEncryption::EncryptFor(Box::new(address.public_key().clone())); + + match self.get_online_status(&contact).await { + Ok(ContactOnlineStatus::Online) => { + info!(target: LOG_TARGET, "Chat message being sent directed"); + let mut comms_outbound = self.dht.outbound_requester(); + + comms_outbound + .send_direct_encrypted( + address.public_key().clone(), + message, + encryption, + "contact service messaging".to_string(), + ) + .await?; + }, + Err(e) => return Err(e), + _ => { + let mut comms_outbound = self.dht.outbound_requester(); + comms_outbound + .closest_broadcast(address.public_key().clone(), encryption, vec![], message) + .await?; + }, + }; + + Ok(()) + } } diff --git a/base_layer/contacts/src/contacts_service/storage/database.rs b/base_layer/contacts/src/contacts_service/storage/database.rs index a80de6fe74..e7c8ae3caf 100644 --- a/base_layer/contacts/src/contacts_service/storage/database.rs +++ b/base_layer/contacts/src/contacts_service/storage/database.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::{ + convert::TryFrom, fmt::{Display, Error, Formatter}, sync::Arc, }; @@ -50,6 +51,7 @@ pub enum DbKey { Contact(TariAddress), ContactId(NodeId), Contacts, + Message(Vec), Messages(TariAddress, i64, i64), } @@ -64,6 +66,7 @@ pub enum DbValue { #[allow(clippy::large_enum_variant)] pub enum DbKeyValuePair { Contact(TariAddress, Contact), + MessageConfirmations(Vec, Option, Option), LastSeen(NodeId, NaiveDateTime, Option), } @@ -188,6 +191,38 @@ where T: ContactsBackend + 'static Ok(()) } + + pub fn confirm_message( + &self, + message_id: Vec, + delivery_confirmation: Option, + read_confirmation: Option, + ) -> Result<(), ContactsServiceStorageError> { + let mut delivery = None; + if let Some(timestamp) = delivery_confirmation { + let secs = i64::try_from(timestamp).map_err(|_e| ContactsServiceStorageError::ConversionError)?; + delivery = Some( + NaiveDateTime::from_timestamp_opt(secs, 0) + .ok_or_else(|| ContactsServiceStorageError::ConversionError)?, + ) + }; + + let mut read = None; + if let Some(timestamp) = read_confirmation { + let secs = i64::try_from(timestamp).map_err(|_e| ContactsServiceStorageError::ConversionError)?; + read = Some( + NaiveDateTime::from_timestamp_opt(secs, 0) + .ok_or_else(|| ContactsServiceStorageError::ConversionError)?, + ) + }; + + self.db + .write(WriteOperation::Upsert(Box::new(DbKeyValuePair::MessageConfirmations( + message_id, delivery, read, + ))))?; + + Ok(()) + } } fn unexpected_result(req: DbKey, res: DbValue) -> Result { @@ -203,6 +238,7 @@ impl Display for DbKey { DbKey::ContactId(id) => f.write_str(&format!("Contact: {:?}", id)), DbKey::Contacts => f.write_str("Contacts"), DbKey::Messages(c, _l, _p) => f.write_str(&format!("Messages for id: {:?}", c)), + DbKey::Message(m) => f.write_str(&format!("Message for id: {:?}", m)), } } } diff --git a/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs b/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs index aba31a0da0..4de0cdf77b 100644 --- a/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs +++ b/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs @@ -34,7 +34,7 @@ use crate::contacts_service::{ database::{ContactsBackend, DbKey, DbKeyValuePair, DbValue, WriteOperation}, types::{ contacts::{ContactSql, UpdateContact}, - messages::{MessagesSql, MessagesSqlInsert}, + messages::{MessageUpdate, MessagesSql, MessagesSqlInsert}, }, }, types::{Contact, Message}, @@ -116,6 +116,11 @@ where TContactServiceDbConnection: PooledDbConnection return Err(e), } }, + DbKey::Message(id) => match MessagesSql::find_by_message_id(&id.to_vec(), &mut conn) { + Ok(c) => Some(DbValue::Message(Box::new(Message::try_from(c)?))), + Err(ContactsServiceStorageError::DieselError(DieselError::NotFound)) => None, + Err(e) => return Err(e), + }, }; Ok(result) @@ -126,6 +131,16 @@ where TContactServiceDbConnection: PooledDbConnection match *kvp { + DbKeyValuePair::MessageConfirmations(k, d, r) => { + if MessagesSql::find_by_message_id_and_update(&mut conn, &k, MessageUpdate { + delivery_confirmation_at: d, + read_confirmation_at: r, + }) + .is_err() + { + MessagesSql::find_by_message_id(&k, &mut conn)?; + } + }, DbKeyValuePair::Contact(k, c) => { if ContactSql::find_by_address_and_update(&mut conn, &k.to_bytes(), UpdateContact { alias: Some(c.clone().alias), @@ -155,6 +170,9 @@ where TContactServiceDbConnection: PooledDbConnection return Err(ContactsServiceStorageError::OperationNotSupported), + DbKeyValuePair::MessageConfirmations(..) => { + return Err(ContactsServiceStorageError::OperationNotSupported) + }, }, WriteOperation::Remove(k) => match k { DbKey::Contact(k) => match ContactSql::find_by_address_and_delete(&mut conn, &k.to_bytes()) { @@ -173,6 +191,7 @@ where TContactServiceDbConnection: PooledDbConnection return Err(ContactsServiceStorageError::OperationNotSupported), DbKey::Messages(_pk, _l, _p) => return Err(ContactsServiceStorageError::OperationNotSupported), + DbKey::Message(_id) => return Err(ContactsServiceStorageError::OperationNotSupported), }, WriteOperation::Insert(i) => { if let DbValue::Message(m) = *i { diff --git a/base_layer/contacts/src/contacts_service/storage/types/messages.rs b/base_layer/contacts/src/contacts_service/storage/types/messages.rs index 47506388bd..091bc23a18 100644 --- a/base_layer/contacts/src/contacts_service/storage/types/messages.rs +++ b/base_layer/contacts/src/contacts_service/storage/types/messages.rs @@ -25,6 +25,7 @@ use std::convert::TryFrom; use chrono::NaiveDateTime; use diesel::{prelude::*, SqliteConnection}; use serde_json; +use tari_common_sqlite::util::diesel_ext::ExpectedRowsExtension; use tari_common_types::tari_address::TariAddress; use crate::{ @@ -57,8 +58,17 @@ pub struct MessagesSql { pub body: Vec, pub metadata: Vec, pub stored_at: NaiveDateTime, + pub delivery_confirmation_at: Option, + pub read_confirmation_at: Option, pub direction: i32, } +#[derive(Clone, Debug, AsChangeset, PartialEq, Eq)] +#[diesel(table_name = messages)] +#[diesel(primary_key(message_id))] +pub struct MessageUpdate { + pub delivery_confirmation_at: Option, + pub read_confirmation_at: Option, +} impl MessagesSqlInsert { /// Write this struct to the database @@ -85,6 +95,30 @@ impl MessagesSql { .limit(limit) .load::(conn)?) } + + /// Find a particular message by its message_id + pub fn find_by_message_id( + message_id: &[u8], + conn: &mut SqliteConnection, + ) -> Result { + Ok(messages::table + .filter(messages::message_id.eq(message_id)) + .first::(conn)?) + } + + /// Find a particular Message by message_id, and update it if it exists, returning the affected record + pub fn find_by_message_id_and_update( + conn: &mut SqliteConnection, + message_id: &[u8], + updated_message: MessageUpdate, + ) -> Result { + // Note: `get_result` not implemented for SQLite + diesel::update(messages::table.filter(messages::message_id.eq(message_id))) + .set(updated_message) + .execute(conn) + .num_rows_affected_or_not_found(1)?; + MessagesSql::find_by_message_id(message_id, conn) + } } /// Conversion from an Message to the Sql datatype form @@ -106,6 +140,8 @@ impl TryFrom for Message { ) .unwrap_or_else(|| panic!("Direction from byte {}", o.direction)), stored_at: o.stored_at.timestamp() as u64, + delivery_confirmation_at: Some(o.stored_at.timestamp() as u64), + read_confirmation_at: Some(o.stored_at.timestamp() as u64), body: o.body, metadata, message_id: o.message_id, diff --git a/base_layer/contacts/src/contacts_service/types/confirmation.rs b/base_layer/contacts/src/contacts_service/types/confirmation.rs new file mode 100644 index 0000000000..f41b06a555 --- /dev/null +++ b/base_layer/contacts/src/contacts_service/types/confirmation.rs @@ -0,0 +1,47 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::contacts_service::proto; + +#[derive(Clone, Debug, Default)] +pub struct Confirmation { + pub message_id: Vec, + pub timestamp: u64, +} + +impl From for Confirmation { + fn from(confirmation: proto::Confirmation) -> Self { + Self { + message_id: confirmation.message_id, + timestamp: confirmation.timestamp, + } + } +} + +impl From for proto::Confirmation { + fn from(confirmation: Confirmation) -> Self { + Self { + message_id: confirmation.message_id, + timestamp: confirmation.timestamp, + } + } +} diff --git a/base_layer/contacts/src/contacts_service/types/message.rs b/base_layer/contacts/src/contacts_service/types/message.rs index 46d9fc3767..9dac4657ad 100644 --- a/base_layer/contacts/src/contacts_service/types/message.rs +++ b/base_layer/contacts/src/contacts_service/types/message.rs @@ -39,6 +39,8 @@ pub struct Message { pub address: TariAddress, pub direction: Direction, pub stored_at: u64, + pub delivery_confirmation_at: Option, + pub read_confirmation_at: Option, pub message_id: Vec, } @@ -108,8 +110,8 @@ impl TryFrom for Message { address: TariAddress::from_bytes(&message.address).map_err(|e| e.to_string())?, // A Message from a proto::Message will always be an inbound message direction: Direction::Inbound, - stored_at: message.stored_at, message_id: message.message_id, + ..Message::default() }) } } @@ -125,7 +127,6 @@ impl From for proto::Message { .collect(), address: message.address.to_bytes().to_vec(), direction: i32::from(message.direction.as_byte()), - stored_at: message.stored_at, message_id: message.message_id, } } diff --git a/base_layer/contacts/src/contacts_service/types/message_dispatch.rs b/base_layer/contacts/src/contacts_service/types/message_dispatch.rs new file mode 100644 index 0000000000..0f6d81952b --- /dev/null +++ b/base_layer/contacts/src/contacts_service/types/message_dispatch.rs @@ -0,0 +1,77 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::convert::TryFrom; + +use tari_comms_dht::domain_message::OutboundDomainMessage; +use tari_p2p::tari_message::TariMessageType; + +use crate::contacts_service::{ + proto, + types::{Confirmation, Message}, +}; + +#[derive(Clone)] +pub enum MessageDispatch { + Message(Message), + DeliveryConfirmation(Confirmation), + ReadConfirmation(Confirmation), +} + +impl TryFrom for MessageDispatch { + type Error = String; + + fn try_from(dispatch: proto::MessageDispatch) -> Result { + Ok(match dispatch.contents { + Some(proto::message_dispatch::Contents::Message(m)) => MessageDispatch::Message(Message::try_from(m)?), + Some(proto::message_dispatch::Contents::DeliveryConfirmation(c)) => { + MessageDispatch::DeliveryConfirmation(Confirmation::from(c)) + }, + Some(proto::message_dispatch::Contents::ReadConfirmation(c)) => { + MessageDispatch::ReadConfirmation(Confirmation::from(c)) + }, + None => return Err("We didn't get any known type of chat message".to_string()), + }) + } +} + +impl From for proto::MessageDispatch { + fn from(dispatch: MessageDispatch) -> Self { + let content = match dispatch { + MessageDispatch::Message(m) => proto::message_dispatch::Contents::Message(m.into()), + MessageDispatch::DeliveryConfirmation(c) => { + proto::message_dispatch::Contents::DeliveryConfirmation(c.into()) + }, + MessageDispatch::ReadConfirmation(c) => proto::message_dispatch::Contents::ReadConfirmation(c.into()), + }; + + Self { + contents: Some(content), + } + } +} + +impl From for OutboundDomainMessage { + fn from(dispatch: MessageDispatch) -> Self { + Self::new(&TariMessageType::Chat, dispatch.into()) + } +} diff --git a/base_layer/contacts/src/contacts_service/types/mod.rs b/base_layer/contacts/src/contacts_service/types/mod.rs index d2bc7105af..16fdb38a46 100644 --- a/base_layer/contacts/src/contacts_service/types/mod.rs +++ b/base_layer/contacts/src/contacts_service/types/mod.rs @@ -28,3 +28,9 @@ pub use message::{Direction, Message, MessageMetadata, MessageMetadataType}; mod message_builder; pub use message_builder::MessageBuilder; + +mod message_dispatch; +pub use message_dispatch::MessageDispatch; + +mod confirmation; +pub use confirmation::Confirmation; diff --git a/base_layer/contacts/src/schema.rs b/base_layer/contacts/src/schema.rs index f3b921bca3..9a57007c44 100644 --- a/base_layer/contacts/src/schema.rs +++ b/base_layer/contacts/src/schema.rs @@ -18,6 +18,8 @@ diesel::table! { body -> Binary, metadata -> Binary, stored_at -> Timestamp, + delivery_confirmation_at -> Nullable, + read_confirmation_at -> Nullable, direction -> Integer, } } diff --git a/integration_tests/src/chat_client.rs b/integration_tests/src/chat_client.rs index 6b4516b074..1bf22cbaf5 100644 --- a/integration_tests/src/chat_client.rs +++ b/integration_tests/src/chat_client.rs @@ -73,6 +73,7 @@ pub fn test_config(address: Multiaddr) -> ApplicationConfig { chat_client_config.p2p.transport.tcp.listener_address = address.clone(); chat_client_config.p2p.public_addresses = MultiaddrList::from(vec![address]); chat_client_config.log_path = Some(PathBuf::from("log/chat_client/chat_client.log")); + chat_client_config.log_verbosity = Some(11); ApplicationConfig { chat_client: chat_client_config, diff --git a/integration_tests/src/chat_ffi.rs b/integration_tests/src/chat_ffi.rs index c8a229d86c..03ea45ff50 100644 --- a/integration_tests/src/chat_ffi.rs +++ b/integration_tests/src/chat_ffi.rs @@ -58,6 +58,16 @@ extern "C" fn callback_message_received(_state: *mut c_void) { *callback.message_received.lock().unwrap() += 1; } +extern "C" fn callback_delivery_confirmation_received(_state: *mut c_void) { + let callback = ChatCallback::instance(); + *callback.delivery_confirmation_received.lock().unwrap() += 1; +} + +extern "C" fn callback_read_confirmation_received(_state: *mut c_void) { + let callback = ChatCallback::instance(); + *callback.read_confirmation_received.lock().unwrap() += 1; +} + #[cfg_attr(windows, link(name = "minotari_chat_ffi.dll"))] #[cfg_attr(not(windows), link(name = "minotari_chat_ffi"))] extern "C" { @@ -66,6 +76,8 @@ extern "C" { error_out: *const c_int, callback_contact_status_change: unsafe extern "C" fn(*mut c_void), callback_message_received: unsafe extern "C" fn(*mut c_void), + callback_delivery_confirmation_received: unsafe extern "C" fn(*mut c_void), + callback_read_confirmation_received: unsafe extern "C" fn(*mut c_void), ) -> *mut ClientFFI; pub fn create_chat_message(receiver: *mut c_void, message: *const c_char, error_out: *const c_int) -> *mut c_void; pub fn send_chat_message(client: *mut ClientFFI, message: *mut c_void, error_out: *const c_int); @@ -90,6 +102,7 @@ extern "C" { element_count: c_uint, error_our: *const c_int, ) -> *mut c_void; + pub fn send_read_confirmation_for_message(client: *mut ClientFFI, message: *mut c_void, error_out: *const c_int); } #[derive(Debug)] @@ -188,6 +201,16 @@ impl ChatClient for ChatFFI { } } + async fn send_read_receipt(&self, message: Message) { + let client = self.ptr.lock().unwrap(); + let message_ptr = Box::into_raw(Box::new(message)) as *mut c_void; + let error_out = Box::into_raw(Box::new(0)); + + unsafe { + send_read_confirmation_for_message(client.0, message_ptr, error_out); + } + } + fn identity(&self) -> &NodeIdentity { &self.identity } @@ -241,6 +264,8 @@ pub async fn spawn_ffi_chat_client(name: &str, seed_peers: Vec, base_dir: error_out, callback_contact_status_change, callback_message_received, + callback_delivery_confirmation_received, + callback_read_confirmation_received, ); } @@ -257,6 +282,8 @@ static START: Once = Once::new(); pub struct ChatCallback { pub contact_status_change: Mutex, pub message_received: Mutex, + pub delivery_confirmation_received: Mutex, + pub read_confirmation_received: Mutex, } impl ChatCallback { diff --git a/integration_tests/tests/features/Chat.feature b/integration_tests/tests/features/Chat.feature index f18d34031f..35323135d1 100644 --- a/integration_tests/tests/features/Chat.feature +++ b/integration_tests/tests/features/Chat.feature @@ -48,3 +48,19 @@ Feature: Chat messaging Then CHAT_A will have 2 messages with CHAT_B Then CHAT_A will have 1 messages with CHAT_C + Scenario: A message receives a delivery receipt + Given I have a seed node SEED_A + When I have a chat client CHAT_A connected to seed node SEED_A + When I have a chat client CHAT_B connected to seed node SEED_A + When I use CHAT_A to send a message 'Hey there' to CHAT_B + When CHAT_B will have 1 message with CHAT_A + Then CHAT_A and CHAT_B will have a message 'Hey there' with matching delivery timestamps + + Scenario: A message receives a read receipt + Given I have a seed node SEED_A + When I have a chat client CHAT_A connected to seed node SEED_A + When I have a chat client CHAT_B connected to seed node SEED_A + When I use CHAT_A to send a message 'Hey there' to CHAT_B + When CHAT_B will have 1 message with CHAT_A + When CHAT_B sends a read receipt to CHAT_A for message 'Hey there' + Then CHAT_A and CHAT_B will have a message 'Hey there' with matching read timestamps diff --git a/integration_tests/tests/features/ChatFFI.feature b/integration_tests/tests/features/ChatFFI.feature index 4a98580b69..34cc37d58d 100644 --- a/integration_tests/tests/features/ChatFFI.feature +++ b/integration_tests/tests/features/ChatFFI.feature @@ -19,6 +19,25 @@ Feature: Chat FFI messaging Then there will be a MessageReceived callback of at least 1 Then CHAT_B will have 1 message with CHAT_A + Scenario: Callback for delivery confirmation received + Given I have a seed node SEED_A + When I have a chat FFI client CHAT_A connected to seed node SEED_A + When I have a chat FFI client CHAT_B connected to seed node SEED_A + When I use CHAT_A to send a message 'Hey there' to CHAT_B + Then there will be a DeliveryConfirmationReceived callback of at least 1 + Then CHAT_B will have 1 message with CHAT_A + Then CHAT_A and CHAT_B will have a message 'Hey there' with matching delivery timestamps + + Scenario: Callback for read confirmation received + Given I have a seed node SEED_A + When I have a chat FFI client CHAT_A connected to seed node SEED_A + When I have a chat FFI client CHAT_B connected to seed node SEED_A + When I use CHAT_A to send a message 'Hey there' to CHAT_B + Then CHAT_B will have 1 message with CHAT_A + When CHAT_B sends a read receipt to CHAT_A for message 'Hey there' + Then there will be a ReadConfirmationReceived callback of at least 1 + Then CHAT_A and CHAT_B will have a message 'Hey there' with matching read timestamps + # Also flaky on CI. Seems liveness has issues on CI @broken Scenario: Callback for status change is received @@ -55,3 +74,20 @@ Feature: Chat FFI messaging Then CHAT_B will have 2 messages with CHAT_A Then CHAT_A will have 2 messages with CHAT_B Then CHAT_A will have a replied to message from CHAT_B with 'oh hai' + + Scenario: A message receives a delivery receipt via FFI + Given I have a seed node SEED_A + When I have a chat FFI client CHAT_A connected to seed node SEED_A + When I have a chat FFI client CHAT_B connected to seed node SEED_A + When I use CHAT_A to send a message 'Hey there' to CHAT_B + When CHAT_B will have 1 message with CHAT_A + Then CHAT_A and CHAT_B will have a message 'Hey there' with matching delivery timestamps + + Scenario: A message receives a read receipt via FFI + Given I have a seed node SEED_A + When I have a chat FFI client CHAT_A connected to seed node SEED_A + When I have a chat FFI client CHAT_B connected to seed node SEED_A + When I use CHAT_A to send a message 'Hey there' to CHAT_B + When CHAT_B will have 1 message with CHAT_A + When CHAT_B sends a read receipt to CHAT_A for message 'Hey there' + Then CHAT_A and CHAT_B will have a message 'Hey there' with matching read timestamps \ No newline at end of file diff --git a/integration_tests/tests/steps/chat_ffi_steps.rs b/integration_tests/tests/steps/chat_ffi_steps.rs index ee43e8a1cf..b3148d817a 100644 --- a/integration_tests/tests/steps/chat_ffi_steps.rs +++ b/integration_tests/tests/steps/chat_ffi_steps.rs @@ -81,6 +81,44 @@ async fn message_reveived_callback(_world: &mut TariWorld, callback_count: usize ); } +#[then(expr = "there will be a DeliveryConfirmationReceived callback of at least {int}")] +async fn delivery_confirmation_reveived_callback(_world: &mut TariWorld, callback_count: usize) { + let mut count = 0; + for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + count = *ChatCallback::instance().delivery_confirmation_received.lock().unwrap(); + + if count >= callback_count as u64 { + return; + } + + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + } + + panic!( + "contact status update never received. Callbacks expected: {}, Callbacks received: {:?}", + callback_count, count + ); +} + +#[then(expr = "there will be a ReadConfirmationReceived callback of at least {int}")] +async fn read_confirmation_received_callback(_world: &mut TariWorld, callback_count: usize) { + let mut count = 0; + for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + count = *ChatCallback::instance().read_confirmation_received.lock().unwrap(); + + if count >= callback_count as u64 { + return; + } + + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + } + + panic!( + "contact status update never received. Callbacks expected: {}, Callbacks received: {:?}", + callback_count, count + ); +} + #[then(expr = "I can shutdown {word} without a problem")] async fn can_shutdown(world: &mut TariWorld, name: String) { let mut client = world.chat_clients.remove(&name).unwrap(); diff --git a/integration_tests/tests/steps/chat_steps.rs b/integration_tests/tests/steps/chat_steps.rs index 7dc277e2b0..9ae9ec2146 100644 --- a/integration_tests/tests/steps/chat_steps.rs +++ b/integration_tests/tests/steps/chat_steps.rs @@ -114,6 +114,7 @@ async fn i_reply_to_message( panic!("Never received incoming chat message",) } +#[when(expr = "{word} will have {int} message(s) with {word}")] #[then(expr = "{word} will have {int} message(s) with {word}")] async fn receive_n_messages(world: &mut TariWorld, receiver: String, message_count: u64, sender: String) { let receiver = world.chat_clients.get(&receiver).unwrap(); @@ -224,3 +225,131 @@ async fn have_replied_message(world: &mut TariWorld, receiver: String, sender: S panic!("Never received incoming chat message",) } + +#[then(regex = r"^(.+) and (.+) will have a message '(.+)' with matching delivery timestamps")] +async fn matching_delivery_timestamps(world: &mut TariWorld, sender: String, receiver: String, msg: String) { + let client_1 = world.chat_clients.get(&sender).unwrap(); + let client_2 = world.chat_clients.get(&receiver).unwrap(); + let client_1_address = TariAddress::from_public_key(client_1.identity().public_key(), Network::LocalNet); + let client_2_address = TariAddress::from_public_key(client_2.identity().public_key(), Network::LocalNet); + + for _a in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + let client_1_messages: Vec = (*client_1) + .get_messages(&client_2_address, DEFAULT_MESSAGE_LIMIT, DEFAULT_MESSAGE_PAGE) + .await; + + if client_1_messages.is_empty() { + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + continue; + } + + let client_1_message = client_1_messages + .iter() + .find(|m| m.body == msg.clone().into_bytes()) + .expect("no message with that content found") + .clone(); + + let client_2_messages: Vec = (*client_2) + .get_messages(&client_1_address, DEFAULT_MESSAGE_LIMIT, DEFAULT_MESSAGE_PAGE) + .await; + + if client_2_messages.is_empty() { + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + continue; + } + + let client_2_message = client_2_messages + .iter() + .find(|m| m.body == msg.clone().into_bytes()) + .expect("no message with that content found") + .clone(); + + assert_eq!( + client_1_message.delivery_confirmation_at.unwrap(), + client_2_message.delivery_confirmation_at.unwrap() + ); + + return; + } + + panic!("Never received incoming chat message",) +} + +#[then(regex = r"^(.+) and (.+) will have a message '(.+)' with matching read timestamps")] +async fn matching_read_timestamps(world: &mut TariWorld, sender: String, receiver: String, msg: String) { + let client_1 = world.chat_clients.get(&sender).unwrap(); + let client_2 = world.chat_clients.get(&receiver).unwrap(); + let client_1_address = TariAddress::from_public_key(client_1.identity().public_key(), Network::LocalNet); + let client_2_address = TariAddress::from_public_key(client_2.identity().public_key(), Network::LocalNet); + + for _a in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + let client_1_messages: Vec = (*client_1) + .get_messages(&client_2_address, DEFAULT_MESSAGE_LIMIT, DEFAULT_MESSAGE_PAGE) + .await; + + if client_1_messages.is_empty() { + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + continue; + } + + let client_1_message = client_1_messages + .iter() + .find(|m| m.body == msg.clone().into_bytes()) + .expect("no message with that content found") + .clone(); + + let client_2_messages: Vec = (*client_2) + .get_messages(&client_1_address, DEFAULT_MESSAGE_LIMIT, DEFAULT_MESSAGE_PAGE) + .await; + + if client_2_messages.is_empty() { + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + continue; + } + + let client_2_message = client_2_messages + .iter() + .find(|m| m.body == msg.clone().into_bytes()) + .expect("no message with that content found") + .clone(); + + if client_1_message.read_confirmation_at.is_none() || client_2_message.read_confirmation_at.is_none() { + continue; + } + + assert_eq!( + client_1_message.read_confirmation_at.unwrap(), + client_2_message.read_confirmation_at.unwrap() + ); + + return; + } + + panic!("Never received incoming chat message",) +} + +#[when(regex = r"^(.+) sends a read receipt to (.+) for message '(.+)'")] +async fn send_read_receipt(world: &mut TariWorld, sender: String, receiver: String, msg: String) { + let client_1 = world.chat_clients.get(&receiver).unwrap(); + let client_2 = world.chat_clients.get(&sender).unwrap(); + let client_2_address = TariAddress::from_public_key(client_2.identity().public_key(), Network::LocalNet); + + for _a in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + let messages: Vec = (*client_1) + .get_messages(&client_2_address, DEFAULT_MESSAGE_LIMIT, DEFAULT_MESSAGE_PAGE) + .await; + + if messages.is_empty() { + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + continue; + } + + let message = messages + .iter() + .find(|m| m.body == msg.clone().into_bytes()) + .expect("no message with that content found") + .clone(); + + client_1.send_read_receipt(message).await; + } +}