Skip to content

Commit

Permalink
feat: new message callback to chat-ffi (tari-project#5592)
Browse files Browse the repository at this point in the history
Description
---
Adds a callback function to the chat ffi when a new message is received

Motivation and Context
---
So ffi consumers don't need to poll for new messages.

How Has This Been Tested?
---
Cucumber

What process can a PR reviewer use to test or verify this change?
---
Review the general implementation style.

Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify
  • Loading branch information
brianp authored Jul 14, 2023
1 parent f68b85f commit bbd543e
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 8 deletions.
7 changes: 6 additions & 1 deletion base_layer/chat_ffi/chat.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ struct ClientFFI;

struct ContactsLivenessData;

struct Message;

struct TariAddress;

typedef void (*CallbackContactStatusChange)(struct ContactsLivenessData*);

typedef void (*CallbackMessageReceived)(struct Message*);

#ifdef __cplusplus
extern "C" {
#endif // __cplusplus
Expand All @@ -40,7 +44,8 @@ extern "C" {
struct ClientFFI *create_chat_client(ApplicationConfig *config,
const char *identity_file_path,
int *error_out,
CallbackContactStatusChange callback_contact_status_change);
CallbackContactStatusChange callback_contact_status_change,
CallbackMessageReceived callback_message_received);

/**
* Frees memory for a ClientFFI
Expand Down
33 changes: 31 additions & 2 deletions base_layer/chat_ffi/src/callback_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,22 @@
use std::ops::Deref;

use log::{debug, info, trace};
use tari_contacts::contacts_service::handle::{ContactsLivenessData, ContactsLivenessEvent, ContactsServiceHandle};
use tari_contacts::contacts_service::{
handle::{ContactsLivenessData, ContactsLivenessEvent, ContactsServiceHandle},
types::Message,
};
use tari_shutdown::ShutdownSignal;

const LOG_TARGET: &str = "chat_ffi::callback_handler";

pub(crate) type CallbackContactStatusChange = unsafe extern "C" fn(*mut ContactsLivenessData);
pub(crate) type CallbackMessageReceived = unsafe extern "C" fn(*mut Message);

#[derive(Clone)]
pub struct CallbackHandler {
contacts_service_handle: ContactsServiceHandle,
callback_contact_status_change: CallbackContactStatusChange,
callback_message_received: CallbackMessageReceived,
shutdown: ShutdownSignal,
}

Expand All @@ -42,19 +47,32 @@ impl CallbackHandler {
contacts_service_handle: ContactsServiceHandle,
shutdown: ShutdownSignal,
callback_contact_status_change: CallbackContactStatusChange,
callback_message_received: CallbackMessageReceived,
) -> Self {
Self {
contacts_service_handle,
shutdown,
callback_contact_status_change,
callback_message_received,
}
}

pub(crate) async fn start(&mut self) {
let mut liveness_events = self.contacts_service_handle.get_contacts_liveness_event_stream();
let mut chat_messages = self.contacts_service_handle.get_messages_event_stream();

loop {
tokio::select! {
rec_message = chat_messages.recv() => {
match rec_message {
Ok(message) => {
trace!(target: LOG_TARGET, "FFI Callback monitor received a new Message");
self.trigger_message_received(message.deref().clone());
},
Err(_) => { debug!(target: LOG_TARGET, "FFI Callback monitor had an error receiving new messages")}
}
},

event = liveness_events.recv() => {
match event {
Ok(liveness_event) => {
Expand Down Expand Up @@ -82,11 +100,22 @@ impl CallbackHandler {
fn trigger_contact_status_change(&mut self, data: ContactsLivenessData) {
debug!(
target: LOG_TARGET,
"Calling Contacts Liveness Data Updated callback function for contact {}",
"Calling ContactStatusChanged callback function for contact {}",
data.address(),
);
unsafe {
(self.callback_contact_status_change)(Box::into_raw(Box::new(data)));
}
}

fn trigger_message_received(&mut self, message: Message) {
debug!(
target: LOG_TARGET,
"Calling MessageReceived callback function for sender {}",
message.address,
);
unsafe {
(self.callback_message_received)(Box::into_raw(Box::new(message)));
}
}
}
4 changes: 3 additions & 1 deletion base_layer/chat_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use tari_contacts::contacts_service::types::Message;
use tokio::runtime::Runtime;

use crate::{
callback_handler::CallbackHandler,
callback_handler::{CallbackHandler, CallbackMessageReceived},
error::{InterfaceError, LibChatError},
};

Expand Down Expand Up @@ -73,6 +73,7 @@ pub unsafe extern "C" fn create_chat_client(
identity_file_path: *const c_char,
error_out: *mut c_int,
callback_contact_status_change: CallbackContactStatusChange,
callback_message_received: CallbackMessageReceived,
) -> *mut ClientFFI {
let mut error = 0;
ptr::swap(error_out, &mut error as *mut c_int);
Expand Down Expand Up @@ -122,6 +123,7 @@ pub unsafe extern "C" fn create_chat_client(
client.contacts.clone().expect("No contacts service loaded yet"),
client.shutdown.to_signal(),
callback_contact_status_change,
callback_message_received,
);

runtime.spawn(async move {
Expand Down
7 changes: 7 additions & 0 deletions base_layer/contacts/src/contacts_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub struct ContactsServiceHandle {
request_response_service:
SenderService<ContactsServiceRequest, Result<ContactsServiceResponse, ContactsServiceError>>,
liveness_events: broadcast::Sender<Arc<ContactsLivenessEvent>>,
message_events: broadcast::Sender<Arc<Message>>,
}

impl ContactsServiceHandle {
Expand All @@ -159,10 +160,12 @@ impl ContactsServiceHandle {
Result<ContactsServiceResponse, ContactsServiceError>,
>,
liveness_events: broadcast::Sender<Arc<ContactsLivenessEvent>>,
message_events: broadcast::Sender<Arc<Message>>,
) -> Self {
Self {
request_response_service,
liveness_events,
message_events,
}
}

Expand Down Expand Up @@ -214,6 +217,10 @@ impl ContactsServiceHandle {
self.liveness_events.subscribe()
}

pub fn get_messages_event_stream(&self) -> broadcast::Receiver<Arc<Message>> {
self.message_events.subscribe()
}

/// Determines the contact's online status based on their last seen time
pub async fn get_contact_online_status(
&mut self,
Expand Down
8 changes: 5 additions & 3 deletions base_layer/contacts/src/contacts_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ impl<T> ServiceInitializer for ContactsServiceInitializer<T>
where T: ContactsBackend + 'static
{
async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> {
let (sender, receiver) = reply_channel::unbounded();
let (liveness_tx, liveness_rx) = reply_channel::unbounded();
let (publisher, _) = broadcast::channel(250);
let (message_publisher, _) = broadcast::channel(250);

let contacts_handle = ContactsServiceHandle::new(sender, publisher.clone());
let contacts_handle = ContactsServiceHandle::new(liveness_tx, publisher.clone(), message_publisher.clone());

// Register handle before waiting for handles to be ready
context.register_handle(contacts_handle);
Expand All @@ -108,13 +109,14 @@ where T: ContactsBackend + 'static

let service = ContactsService::new(
ContactsDatabase::new(backend),
receiver,
liveness_rx,
handles.get_shutdown_signal(),
liveness,
connectivity,
dht,
subscription_factory,
publisher,
message_publisher,
contacts_auto_ping_interval,
contacts_online_ping_window,
)
Expand Down
8 changes: 7 additions & 1 deletion base_layer/contacts/src/contacts_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ where T: ContactsBackend + 'static
dht: Dht,
subscription_factory: Arc<SubscriptionFactory>,
event_publisher: broadcast::Sender<Arc<ContactsLivenessEvent>>,
message_publisher: broadcast::Sender<Arc<Message>>,
number_of_rounds_no_pings: u16,
contacts_auto_ping_interval: Duration,
contacts_online_ping_window: usize,
Expand All @@ -150,6 +151,7 @@ where T: ContactsBackend + 'static
dht: Dht,
subscription_factory: Arc<SubscriptionFactory>,
event_publisher: broadcast::Sender<Arc<ContactsLivenessEvent>>,
message_publisher: broadcast::Sender<Arc<Message>>,
contacts_auto_ping_interval: Duration,
contacts_online_ping_window: usize,
) -> Self {
Expand All @@ -163,6 +165,7 @@ where T: ContactsBackend + 'static
dht,
subscription_factory,
event_publisher,
message_publisher,
number_of_rounds_no_pings: 0,
contacts_auto_ping_interval,
contacts_online_ping_window,
Expand Down Expand Up @@ -415,7 +418,10 @@ where T: ContactsBackend + 'static
..msg.into()
};

self.db.save_message(message).expect("Couldn't save the message");
self.db
.save_message(message.clone())
.expect("Couldn't save the message");
let _msg = self.message_publisher.send(Arc::new(message));
}

Ok(())
Expand Down
8 changes: 8 additions & 0 deletions integration_tests/src/chat_ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ extern "C" fn callback_contact_status_change(_state: *mut c_void) {
*callback.contact_status_change.lock().unwrap() += 1;
}

extern "C" fn callback_message_received(_state: *mut c_void) {
let callback = ChatCallback::instance();
*callback.message_received.lock().unwrap() += 1;
}

#[cfg_attr(windows, link(name = "tari_chat_ffi.dll"))]
#[cfg_attr(not(windows), link(name = "tari_chat_ffi"))]
extern "C" {
Expand All @@ -58,6 +63,7 @@ extern "C" {
identity_file_path: *const c_char,
out_error: *const c_int,
callback_contact_status_change: unsafe extern "C" fn(*mut c_void),
callback_message_received: unsafe extern "C" fn(*mut c_void),
) -> *mut ClientFFI;
pub fn send_message(client: *mut ClientFFI, receiver: *mut c_void, message: *const c_char, out_error: *const c_int);
pub fn add_contact(client: *mut ClientFFI, address: *mut c_void, out_error: *const c_int);
Expand Down Expand Up @@ -193,6 +199,7 @@ pub async fn spawn_ffi_chat_client(name: &str, seed_peers: Vec<Peer>, base_dir:
identity_path_c_char,
out_error,
callback_contact_status_change,
callback_message_received,
);
}

Expand All @@ -208,6 +215,7 @@ static START: Once = Once::new();
#[derive(Default)]
pub struct ChatCallback {
pub contact_status_change: Mutex<u64>,
pub message_received: Mutex<u64>,
}

impl ChatCallback {
Expand Down
8 changes: 8 additions & 0 deletions integration_tests/tests/features/ChatFFI.feature
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ Feature: Chat FFI messaging
When I use CHAT_A to send a message 'Hey there' to CHAT_B
Then CHAT_B will have 1 message with CHAT_A

Scenario: Callback for new message received
Given I have a seed node SEED_A
When I have a chat FFI client CHAT_A connected to seed node SEED_A
When I have a chat FFI client CHAT_B connected to seed node SEED_A
When I use CHAT_A to send a message 'Hey there' to CHAT_B
Then there will be a MessageReceived callback of at least 1
Then CHAT_B will have 1 message with CHAT_A

# Also flaky on CI. Seems liveness has issues on CI
@broken
Scenario: Callback for status change is received
Expand Down
19 changes: 19 additions & 0 deletions integration_tests/tests/steps/chat_ffi_steps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,22 @@ async fn contact_status_update_callback(_world: &mut TariWorld, callback_count:
callback_count, count
);
}

#[then(expr = "there will be a MessageReceived callback of at least {int}")]
async fn message_reveived_callback(_world: &mut TariWorld, callback_count: usize) {
let mut count = 0;
for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) {
count = *ChatCallback::instance().message_received.lock().unwrap();

if count >= callback_count as u64 {
return;
}

tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await;
}

panic!(
"contact status update never received. Callbacks expected: {}, Callbacks received: {:?}",
callback_count, count
);
}

0 comments on commit bbd543e

Please sign in to comment.