diff --git a/rust/README.md b/rust/README.md index 8e5c1b7fe..dc80d1322 100644 --- a/rust/README.md +++ b/rust/README.md @@ -42,5 +42,5 @@ cargo run --example simple_consumer [codecov-url]: https://app.codecov.io/gh/apache/rocketmq-clients [crates-image]: https://img.shields.io/crates/v/rocketmq.svg [crates-url]: https://crates.io/crates/rocketmq -[rust-doc-image]: https://img.shields.io/crates/v/rocketmq.svg +[rust-doc-image]: https://img.shields.io/docsrs/rocketmq [rust-doc-url]: https://docs.rs/rocketmq diff --git a/rust/examples/transaction_producer.rs b/rust/examples/transaction_producer.rs index 07516eb04..7423cc9b6 100644 --- a/rust/examples/transaction_producer.rs +++ b/rust/examples/transaction_producer.rs @@ -14,11 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::collections::HashSet; +use std::sync::Mutex; + use rocketmq::conf::{ClientOption, ProducerOption}; use rocketmq::model::message::MessageBuilder; use rocketmq::model::transaction::{Transaction, TransactionResolution}; use rocketmq::Producer; +lazy_static::lazy_static! { + static ref MESSAGE_ID_SET: Mutex> = Mutex::new(HashSet::new()); +} + #[tokio::main] async fn main() { // recommend to specify which topic(s) you would like to send message to @@ -30,16 +37,28 @@ async fn main() { let mut client_option = ClientOption::default(); client_option.set_access_url("localhost:8081"); - // build and start producer + // build and start transaction producer, which has TransactionChecker let mut producer = Producer::new_transaction_producer( producer_option, client_option, Box::new(|transaction_id, message| { - println!( - "receive transaction check request: transaction_id: {}, message: {:?}", - transaction_id, message - ); - TransactionResolution::COMMIT + if MESSAGE_ID_SET + .lock() + .unwrap() + .contains(message.message_id()) + { + println!( + "commit transaction: transaction_id: {}, message_id: {}", + transaction_id, message.message_id() + ); + TransactionResolution::COMMIT + } else { + println!( + "rollback transaction due to unknown message: transaction_id: {}, message_id: {}", + transaction_id, message.message_id() + ); + TransactionResolution::ROLLBACK + } }), ) .unwrap(); @@ -65,6 +84,14 @@ async fn main() { transaction.message_id(), transaction.transaction_id() ); + + MESSAGE_ID_SET + .lock() + .unwrap() + .insert(transaction.message_id().to_string()); + + // commit transaction manually + // delete following two lines so that RocketMQ server will check transaction status periodically let result = transaction.commit().await; debug_assert!(result.is_ok(), "commit transaction failed: {:?}", result); } diff --git a/rust/src/client.rs b/rust/src/client.rs index 27cfb4df1..57bddb84b 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -31,7 +31,7 @@ use crate::conf::ClientOption; use crate::error::{ClientError, ErrorKind}; use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, SendReceipt}; use crate::model::message::{AckMessageEntry, MessageView}; -use crate::model::transaction::TransactionChecker; +use crate::model::transaction::{TransactionChecker, TransactionResolution}; use crate::pb; use crate::pb::receive_message_response::Content; use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand; @@ -109,6 +109,10 @@ impl Client { self.telemetry_command_tx.is_some() } + pub(crate) fn has_transaction_checker(&self) -> bool { + self.transaction_checker.is_some() + } + pub(crate) fn set_transaction_checker(&mut self, transaction_checker: Box) { if self.is_started() { panic!("client {} is started, can not be modified", self.id) @@ -116,7 +120,7 @@ impl Client { self.transaction_checker = Some(transaction_checker); } - pub(crate) async fn start(&mut self) { + pub(crate) async fn start(&mut self) -> Result<(), ClientError> { let logger = self.logger.clone(); let session_manager = self.session_manager.clone(); @@ -127,9 +131,13 @@ impl Client { // send heartbeat and handle telemetry command let (tx, mut rx) = mpsc::channel(16); self.telemetry_command_tx = Some(tx); - let rpc_client = self.get_session().await.unwrap(); + let rpc_client = self.get_session().await?; let endpoints = self.access_endpoints.clone(); let transaction_checker = self.transaction_checker.take(); + // give a placeholder + if transaction_checker.is_some() { + self.transaction_checker = Some(Box::new(|_, _| TransactionResolution::UNKNOWN)); + } tokio::spawn(async move { rpc_client.is_started(); let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); @@ -181,6 +189,7 @@ impl Client { } } }); + Ok(()) } async fn handle_telemetry_command( @@ -690,7 +699,7 @@ pub(crate) mod tests { .returning(|_, _, _| Ok(Session::mock())); let mut client = new_client_with_session_manager(session_manager); - client.start().await; + client.start().await?; // TODO use countdown latch instead sleeping // wait for run diff --git a/rust/src/model/transaction.rs b/rust/src/model/transaction.rs index 62a4c9a76..2f74679f3 100644 --- a/rust/src/model/transaction.rs +++ b/rust/src/model/transaction.rs @@ -125,12 +125,18 @@ pub enum TransactionResolution { COMMIT = 1, /// Notify server that current transaction should be roll-backed. ROLLBACK = 2, - /// Notify the server that the state of this transaction is not sure. You should be cautious before return unknown + /// Notify server that the state of this transaction is not sure. You should be cautious before return unknown /// because the examination from the server will be performed periodically. UNKNOWN = 0, } /// A closure to check the state of transaction. +/// RocketMQ Server will call producer periodically to check the state of uncommitted transaction. +/// +/// # Arguments +/// +/// * transaction id +/// * message pub type TransactionChecker = dyn Fn(String, MessageView) -> TransactionResolution + Send + Sync; #[cfg(test)] diff --git a/rust/src/producer.rs b/rust/src/producer.rs index fd82cdc4e..d5e768c5e 100644 --- a/rust/src/producer.rs +++ b/rust/src/producer.rs @@ -50,6 +50,7 @@ pub struct Producer { impl Producer { const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message"; + const OPERATION_SEND_TRANSACTION_MESSAGE: &'static str = "producer.send_transaction_message"; /// Create a new producer instance /// @@ -79,7 +80,7 @@ impl Producer { /// /// * `option` - producer option /// * `client_option` - client option - /// * `transaction_checker` - A closure to check the state of transaction. + /// * `transaction_checker` - handle server query for uncommitted transaction status pub fn new_transaction_producer( option: ProducerOption, client_option: ClientOption, @@ -103,7 +104,7 @@ impl Producer { /// Start the producer pub async fn start(&mut self) -> Result<(), ClientError> { - self.client.start().await; + self.client.start().await?; if let Some(topics) = self.option.topics() { for topic in topics { self.client.topic_route(topic, true).await?; @@ -265,6 +266,13 @@ impl Producer { &self, mut message: impl message::Message, ) -> Result { + if !self.client.has_transaction_checker() { + return Err(ClientError::new( + ErrorKind::InvalidMessage, + "this producer can not send transaction message, please create a transaction producer using producer::new_transaction_producer", + Self::OPERATION_SEND_TRANSACTION_MESSAGE, + )); + } let topic = message.take_topic(); let receipt = self.send(message).await?; Ok(TransactionImpl::new( @@ -313,7 +321,7 @@ mod tests { queue: vec![], })) }); - client.expect_start().returning(|| ()); + client.expect_start().returning(|| Ok(())); client .expect_client_id() .return_const("fake_id".to_string()); @@ -340,7 +348,7 @@ mod tests { queue: vec![], })) }); - client.expect_start().returning(|| ()); + client.expect_start().returning(|| Ok(())); client.expect_set_transaction_checker().returning(|_| ()); client .expect_client_id() @@ -543,6 +551,10 @@ mod tests { .client .expect_get_session() .return_once(|| Ok(Session::mock())); + producer + .client + .expect_has_transaction_checker() + .return_once(|| true); let _ = producer .send_transaction_message( diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs index 48e0ce9f7..d4e222ea1 100644 --- a/rust/src/simple_consumer.rs +++ b/rust/src/simple_consumer.rs @@ -90,7 +90,7 @@ impl SimpleConsumer { Self::OPERATION_START_SIMPLE_CONSUMER, )); } - self.client.start().await; + self.client.start().await?; if let Some(topics) = self.option.topics() { for topic in topics { self.client.topic_route(topic, true).await?; @@ -198,7 +198,7 @@ mod tests { queue: vec![], })) }); - client.expect_start().returning(|| ()); + client.expect_start().returning(|| Ok(())); client .expect_client_id() .return_const("fake_id".to_string());