diff --git a/rust/examples/delay_producer.rs b/rust/examples/delay_producer.rs index 3e237c380..cca3baeaa 100644 --- a/rust/examples/delay_producer.rs +++ b/rust/examples/delay_producer.rs @@ -23,8 +23,8 @@ use rocketmq::Producer; #[tokio::main] async fn main() { - // recommend to specify which topic(s) you would like to send message to - // producer will prefetch topic route when start and failed fast if topic not exist + // It's recommended to specify the topics that applications will publish messages to + // because the producer will prefetch topic routes for them on start and fail fast in case they do not exist let mut producer_option = ProducerOption::default(); producer_option.set_topics(vec!["delay_test"]); @@ -34,7 +34,11 @@ async fn main() { // build and start producer let mut producer = Producer::new(producer_option, client_option).unwrap(); - producer.start().await.unwrap(); + let start_result = producer.start().await; + if start_result.is_err() { + eprintln!("producer start failed: {:?}", start_result.unwrap_err()); + return; + } // build message let message = MessageBuilder::delay_message_builder( @@ -51,10 +55,23 @@ async fn main() { .unwrap(); // send message to rocketmq proxy - let result = producer.send(message).await; - debug_assert!(result.is_ok(), "send message failed: {:?}", result); + let send_result = producer.send(message).await; + if send_result.is_err() { + eprintln!("send message failed: {:?}", send_result.unwrap_err()); + return; + } println!( "send message success, message_id={}", - result.unwrap().message_id() + send_result.unwrap().message_id() ); + + // shutdown the producer when you don't need it anymore. + // recommend shutdown manually to gracefully stop and unregister from server + let shutdown_result = producer.shutdown().await; + if shutdown_result.is_err() { + eprintln!( + "producer shutdown failed: {:?}", + shutdown_result.unwrap_err() + ); + } } diff --git a/rust/examples/fifo_producer.rs b/rust/examples/fifo_producer.rs index 38562e2bc..211ae6836 100644 --- a/rust/examples/fifo_producer.rs +++ b/rust/examples/fifo_producer.rs @@ -20,8 +20,8 @@ use rocketmq::Producer; #[tokio::main] async fn main() { - // recommend to specify which topic(s) you would like to send message to - // producer will prefetch topic route when start and failed fast if topic not exist + // It's recommended to specify the topics that applications will publish messages to + // because the producer will prefetch topic routes for them on start and fail fast in case they do not exist let mut producer_option = ProducerOption::default(); producer_option.set_topics(vec!["fifo_test"]); @@ -31,7 +31,11 @@ async fn main() { // build and start producer let mut producer = Producer::new(producer_option, client_option).unwrap(); - producer.start().await.unwrap(); + let start_result = producer.start().await; + if start_result.is_err() { + eprintln!("producer start failed: {:?}", start_result.unwrap_err()); + return; + } // build message let message = MessageBuilder::fifo_message_builder( @@ -44,10 +48,23 @@ async fn main() { .unwrap(); // send message to rocketmq proxy - let result = producer.send(message).await; - debug_assert!(result.is_ok(), "send message failed: {:?}", result); + let send_result = producer.send(message).await; + if send_result.is_err() { + eprintln!("send message failed: {:?}", send_result.unwrap_err()); + return; + } println!( "send message success, message_id={}", - result.unwrap().message_id() + send_result.unwrap().message_id() ); + + // shutdown the producer when you don't need it anymore. + // you should shutdown it manually to gracefully stop and unregister from server + let shutdown_result = producer.shutdown().await; + if shutdown_result.is_err() { + eprintln!( + "producer shutdown failed: {:?}", + shutdown_result.unwrap_err() + ); + } } diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs index 335b31fb3..3e818e91a 100644 --- a/rust/examples/producer.rs +++ b/rust/examples/producer.rs @@ -20,8 +20,8 @@ use rocketmq::Producer; #[tokio::main] async fn main() { - // recommend to specify which topic(s) you would like to send message to - // producer will prefetch topic route when start and failed fast if topic not exist + // It's recommended to specify the topics that applications will publish messages to + // because the producer will prefetch topic routes for them on start and fail fast in case they do not exist let mut producer_option = ProducerOption::default(); producer_option.set_topics(vec!["test_topic"]); @@ -31,7 +31,11 @@ async fn main() { // build and start producer let mut producer = Producer::new(producer_option, client_option).unwrap(); - producer.start().await.unwrap(); + let start_result = producer.start().await; + if start_result.is_err() { + eprintln!("producer start failed: {:?}", start_result.unwrap_err()); + return; + } // build message let message = MessageBuilder::builder() @@ -42,10 +46,23 @@ async fn main() { .unwrap(); // send message to rocketmq proxy - let result = producer.send(message).await; - debug_assert!(result.is_ok(), "send message failed: {:?}", result); + let send_result = producer.send(message).await; + if send_result.is_err() { + eprintln!("send message failed: {:?}", send_result.unwrap_err()); + return; + } println!( "send message success, message_id={}", - result.unwrap().message_id() + send_result.unwrap().message_id() ); + + // shutdown the producer when you don't need it anymore. + // you should shutdown it manually to gracefully stop and unregister from server + let shutdown_result = producer.shutdown().await; + if shutdown_result.is_ok() { + eprintln!( + "producer shutdown failed: {:?}", + shutdown_result.unwrap_err() + ); + } } diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs index a5f94085b..fc1a83884 100644 --- a/rust/examples/simple_consumer.rs +++ b/rust/examples/simple_consumer.rs @@ -20,8 +20,8 @@ use rocketmq::SimpleConsumer; #[tokio::main] async fn main() { - // recommend to specify which topic(s) you would like to send message to - // simple consumer will prefetch topic route when start and failed fast if topic not exist + // It's recommended to specify the topics that applications will publish messages to + // because the simple consumer will prefetch topic routes for them on start and fail fast in case they do not exist let mut consumer_option = SimpleConsumerOption::default(); consumer_option.set_topics(vec!["test_topic"]); consumer_option.set_consumer_group("SimpleConsumerGroup"); @@ -33,38 +33,54 @@ async fn main() { // build and start simple consumer let mut consumer = SimpleConsumer::new(consumer_option, client_option).unwrap(); - consumer.start().await.unwrap(); - - loop { - // pop message from rocketmq proxy - let receive_result = consumer - .receive( - "test_topic".to_string(), - &FilterExpression::new(FilterType::Tag, "test_tag"), - ) - .await; - debug_assert!( - receive_result.is_ok(), - "receive message failed: {:?}", - receive_result.unwrap_err() + let start_result = consumer.start().await; + if start_result.is_err() { + eprintln!( + "simple consumer start failed: {:?}", + start_result.unwrap_err() ); + return; + } - let messages = receive_result.unwrap(); + // pop message from rocketmq proxy + let receive_result = consumer + .receive( + "test_topic".to_string(), + &FilterExpression::new(FilterType::Tag, "test_tag"), + ) + .await; + if receive_result.is_err() { + eprintln!("receive message failed: {:?}", receive_result.unwrap_err()); + return; + } - if messages.is_empty() { - println!("no message received"); - return; - } + let messages = receive_result.unwrap(); - for message in messages { - println!("receive message: {:?}", message); - // ack message to rocketmq proxy - let ack_result = consumer.ack(&message).await; - debug_assert!( - ack_result.is_ok(), - "ack message failed: {:?}", + if messages.is_empty() { + println!("no message received"); + return; + } + + for message in messages { + println!("receive message: {:?}", message); + // ack message to rocketmq proxy + let ack_result = consumer.ack(&message).await; + if ack_result.is_err() { + eprintln!( + "ack message {} failed: {:?}", + message.message_id(), ack_result.unwrap_err() ); } } + + // shutdown the simple consumer when you don't need it anymore. + // you should shutdown it manually to gracefully stop and unregister from server + let shutdown_result = consumer.shutdown().await; + if shutdown_result.is_err() { + eprintln!( + "simple consumer shutdown failed: {:?}", + shutdown_result.unwrap_err() + ); + } } diff --git a/rust/examples/transaction_producer.rs b/rust/examples/transaction_producer.rs index 7423cc9b6..6df6cb65a 100644 --- a/rust/examples/transaction_producer.rs +++ b/rust/examples/transaction_producer.rs @@ -28,8 +28,8 @@ lazy_static::lazy_static! { #[tokio::main] async fn main() { - // recommend to specify which topic(s) you would like to send message to - // producer will prefetch topic route when start and failed fast if topic not exist + // It's recommended to specify the topics that applications will publish messages to + // because the producer will prefetch topic routes for them on start and fail fast in case they do not exist let mut producer_option = ProducerOption::default(); producer_option.set_topics(vec!["transaction_test"]); @@ -62,7 +62,11 @@ async fn main() { }), ) .unwrap(); - producer.start().await.unwrap(); + let start_result = producer.start().await; + if start_result.is_err() { + eprintln!("producer start failed: {:?}", start_result.unwrap_err()); + return; + } // build message let message = MessageBuilder::transaction_message_builder( @@ -93,5 +97,18 @@ async fn main() { // commit transaction manually // delete following two lines so that RocketMQ server will check transaction status periodically let result = transaction.commit().await; - debug_assert!(result.is_ok(), "commit transaction failed: {:?}", result); + if result.is_err() { + eprintln!("commit transaction failed: {:?}", result.unwrap_err()); + return; + } + + // shutdown the producer when you don't need it anymore. + // you should shutdown it manually to gracefully stop and unregister from server + let shutdown_result = producer.shutdown().await; + if shutdown_result.is_err() { + eprintln!( + "transaction producer shutdown failed: {:?}", + shutdown_result.unwrap_err() + ); + } } diff --git a/rust/src/client.rs b/rust/src/client.rs index 57bddb84b..3804b4ae9 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -37,9 +37,9 @@ use crate::pb::receive_message_response::Content; use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand; use crate::pb::{ AckMessageRequest, AckMessageResultEntry, Code, EndTransactionRequest, FilterExpression, - HeartbeatRequest, HeartbeatResponse, Message, MessageQueue, QueryRouteRequest, - ReceiveMessageRequest, Resource, SendMessageRequest, Status, TelemetryCommand, - TransactionSource, + HeartbeatRequest, HeartbeatResponse, Message, MessageQueue, NotifyClientTerminationRequest, + QueryRouteRequest, ReceiveMessageRequest, Resource, SendMessageRequest, Status, + TelemetryCommand, TransactionSource, }; #[double] use crate::session::SessionManager; @@ -55,6 +55,7 @@ pub(crate) struct Client { settings: TelemetryCommand, transaction_checker: Option>, telemetry_command_tx: Option>, + shutdown_tx: Option>, } lazy_static::lazy_static! { @@ -62,6 +63,8 @@ lazy_static::lazy_static! { } const OPERATION_CLIENT_NEW: &str = "client.new"; +const OPERATION_CLIENT_START: &str = "client.start"; +const OPERATION_CLIENT_SHUTDOWN: &str = "client.shutdown"; const OPERATION_GET_SESSION: &str = "client.get_session"; const OPERATION_QUERY_ROUTE: &str = "client.query_route"; const OPERATION_HEARTBEAT: &str = "client.heartbeat"; @@ -102,11 +105,12 @@ impl Client { settings, transaction_checker: None, telemetry_command_tx: None, + shutdown_tx: None, }) } pub(crate) fn is_started(&self) -> bool { - self.telemetry_command_tx.is_some() + self.shutdown_tx.is_some() } pub(crate) fn has_transaction_checker(&self) -> bool { @@ -124,20 +128,27 @@ impl Client { let logger = self.logger.clone(); let session_manager = self.session_manager.clone(); - let group = self.option.group.to_string(); + let group = self.option.group.clone(); let namespace = self.option.namespace.to_string(); let client_type = self.option.client_type.clone(); + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); + self.shutdown_tx = Some(shutdown_tx); + // send heartbeat and handle telemetry command - let (tx, mut rx) = mpsc::channel(16); - self.telemetry_command_tx = Some(tx); - let rpc_client = self.get_session().await?; + let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16); + self.telemetry_command_tx = Some(telemetry_command_tx); + let rpc_client = self + .get_session() + .await + .map_err(|error| error.with_operation(OPERATION_CLIENT_START))?; let endpoints = self.access_endpoints.clone(); let transaction_checker = self.transaction_checker.take(); // give a placeholder if transaction_checker.is_some() { self.transaction_checker = Some(Box::new(|_, _| TransactionResolution::UNKNOWN)); } + tokio::spawn(async move { rpc_client.is_started(); let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); @@ -178,17 +189,54 @@ impl Client { debug!(logger,"send heartbeat to server success, peer={}",peer); } }, - command = rx.recv() => { + command = telemetry_command_rx.recv() => { if let Some(command) = command { let result = Self::handle_telemetry_command(rpc_client.clone(), &transaction_checker, endpoints.clone(), command).await; if let Err(error) = result { - error!(logger, "handle telemetry command failed: {:?}", error) + error!(logger, "handle telemetry command failed: {:?}", error); } } }, + _ = &mut shutdown_rx => { + debug!(logger, "receive shutdown signal, stop heartbeat task and telemetry command handler"); + break; + } } } + info!( + logger, + "heartbeat task and telemetry command handler are stopped" + ); + }); + Ok(()) + } + + fn check_started(&self, operation: &'static str) -> Result<(), ClientError> { + if !self.is_started() { + return Err(ClientError::new( + ErrorKind::ClientIsNotRunning, + "client is not started", + operation, + ) + .with_context("client_id", self.id.clone())); + } + Ok(()) + } + + pub(crate) async fn shutdown(mut self) -> Result<(), ClientError> { + self.check_started(OPERATION_CLIENT_SHUTDOWN)?; + let mut rpc_client = self.get_session().await?; + self.telemetry_command_tx = None; + if let Some(tx) = self.shutdown_tx.take() { + let _ = tx.send(()); + } + let group = self.option.group.as_ref().map(|group| Resource { + name: group.to_string(), + resource_namespace: self.option.namespace.to_string(), }); + let response = rpc_client.notify_shutdown(NotifyClientTerminationRequest { group }); + Self::handle_response_status(response.await?.status, OPERATION_CLIENT_SHUTDOWN)?; + self.session_manager.shutdown().await; Ok(()) } @@ -268,14 +316,7 @@ impl Client { } pub(crate) async fn get_session(&self) -> Result { - if !self.is_started() { - return Err(ClientError::new( - ErrorKind::ClientIsNotRunning, - "client is not started", - OPERATION_GET_SESSION, - ) - .with_context("client_id", self.id.clone())); - } + self.check_started(OPERATION_GET_SESSION)?; let session = self .session_manager .get_or_create_session( @@ -348,8 +389,8 @@ impl Client { return Ok(route); } } - self.topic_route_inner(self.get_session().await.unwrap(), topic) - .await + let rpc_client = self.get_session().await?; + self.topic_route_inner(rpc_client, topic).await } async fn query_topic_route( @@ -461,15 +502,16 @@ impl Client { async fn heart_beat_inner( mut rpc_client: T, - group: &str, + group: &Option, namespace: &str, client_type: &ClientType, ) -> Result { + let group = group.as_ref().map(|group| Resource { + name: group.to_string(), + resource_namespace: namespace.to_string(), + }); let request = HeartbeatRequest { - group: Some(Resource { - name: group.to_string(), - resource_namespace: namespace.to_string(), - }), + group, client_type: client_type.clone() as i32, }; let response = rpc_client.heartbeat(request).await?; @@ -534,7 +576,7 @@ impl Client { ) -> Result, ClientError> { let request = ReceiveMessageRequest { group: Some(Resource { - name: self.option.group.to_string(), + name: self.option.group.as_ref().unwrap().to_string(), resource_namespace: self.option.namespace.to_string(), }), message_queue: Some(message_queue), @@ -594,7 +636,7 @@ impl Client { ) -> Result, ClientError> { let request = AckMessageRequest { group: Some(Resource { - name: self.option.group.to_string(), + name: self.option.group.as_ref().unwrap().to_string(), resource_namespace: self.option.namespace.to_string(), }), topic: Some(Resource { @@ -642,7 +684,10 @@ pub(crate) mod tests { fn new_client_for_test() -> Client { Client { logger: terminal_logger(), - option: ClientOption::default(), + option: ClientOption { + group: Some("group".to_string()), + ..Default::default() + }, session_manager: Arc::new(SessionManager::default()), route_table: Mutex::new(HashMap::new()), id: Client::generate_client_id(), @@ -650,6 +695,7 @@ pub(crate) mod tests { settings: TelemetryCommand::default(), transaction_checker: None, telemetry_command_tx: None, + shutdown_tx: None, } } @@ -665,6 +711,7 @@ pub(crate) mod tests { settings: TelemetryCommand::default(), transaction_checker: None, telemetry_command_tx: Some(tx), + shutdown_tx: None, } } @@ -714,7 +761,8 @@ pub(crate) mod tests { .expect_get_or_create_session() .returning(|_, _, _| Ok(Session::mock())); - let client = new_client_with_session_manager(session_manager); + let mut client = new_client_with_session_manager(session_manager); + let _ = client.start().await; let result = client.get_session().await; assert!(result.is_ok()); let result = client @@ -893,7 +941,9 @@ pub(crate) mod tests { mock.expect_heartbeat() .return_once(|_| Box::pin(futures::future::ready(response))); - let send_result = Client::heart_beat_inner(mock, "", "", &ClientType::Producer).await; + let send_result = + Client::heart_beat_inner(mock, &Some("group".to_string()), "", &ClientType::Producer) + .await; assert!(send_result.is_ok()); } @@ -1034,7 +1084,7 @@ pub(crate) mod tests { let result = Client::handle_telemetry_command( mock, &Some(Box::new(|_, _| TransactionResolution::COMMIT)), - Endpoints::from_url("localhopst:8081").unwrap(), + Endpoints::from_url("localhost:8081").unwrap(), RecoverOrphanedTransactionCommand(pb::RecoverOrphanedTransactionCommand { message: Some(Message { topic: Some(Resource::default()), diff --git a/rust/src/conf.rs b/rust/src/conf.rs index ea27d2798..fa16a41ae 100644 --- a/rust/src/conf.rs +++ b/rust/src/conf.rs @@ -29,7 +29,7 @@ use crate::simple_consumer::SimpleConsumer; #[derive(Debug, Clone)] pub struct ClientOption { pub(crate) client_type: ClientType, - pub(crate) group: String, + pub(crate) group: Option, pub(crate) namespace: String, pub(crate) access_url: String, pub(crate) enable_tls: bool, @@ -43,7 +43,7 @@ impl Default for ClientOption { fn default() -> Self { ClientOption { client_type: ClientType::Producer, - group: "".to_string(), + group: None, namespace: "".to_string(), access_url: "localhost:8081".to_string(), enable_tls: false, diff --git a/rust/src/producer.rs b/rust/src/producer.rs index d5e768c5e..7e3f39967 100644 --- a/rust/src/producer.rs +++ b/rust/src/producer.rs @@ -275,8 +275,9 @@ impl Producer { } let topic = message.take_topic(); let receipt = self.send(message).await?; + let rpc_client = self.client.get_session().await?; Ok(TransactionImpl::new( - Box::new(self.client.get_session().await.unwrap()), + Box::new(rpc_client), Resource { resource_namespace: self.option.namespace().to_string(), name: topic, @@ -284,6 +285,10 @@ impl Producer { receipt, )) } + + pub async fn shutdown(self) -> Result<(), ClientError> { + self.client.shutdown().await + } } #[cfg(test)] diff --git a/rust/src/session.rs b/rust/src/session.rs index 5762d6625..9441f7c55 100644 --- a/rust/src/session.rs +++ b/rust/src/session.rs @@ -22,7 +22,7 @@ use ring::hmac; use slog::{debug, error, info, o, Logger}; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{mpsc, oneshot, Mutex}; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; use tonic::metadata::{AsciiMetadataValue, MetadataMap}; @@ -34,9 +34,9 @@ use crate::model::common::Endpoints; use crate::pb::telemetry_command::Command; use crate::pb::{ AckMessageRequest, AckMessageResponse, EndTransactionRequest, EndTransactionResponse, - HeartbeatRequest, HeartbeatResponse, QueryRouteRequest, QueryRouteResponse, - ReceiveMessageRequest, ReceiveMessageResponse, SendMessageRequest, SendMessageResponse, - TelemetryCommand, + HeartbeatRequest, HeartbeatResponse, NotifyClientTerminationRequest, + NotifyClientTerminationResponse, QueryRouteRequest, QueryRouteResponse, ReceiveMessageRequest, + ReceiveMessageResponse, SendMessageRequest, SendMessageResponse, TelemetryCommand, }; use crate::util::{PROTOCOL_VERSION, SDK_LANGUAGE, SDK_VERSION}; use crate::{error::ClientError, pb::messaging_service_client::MessagingServiceClient}; @@ -50,6 +50,7 @@ const OPERATION_SEND_MESSAGE: &str = "rpc.send_message"; const OPERATION_RECEIVE_MESSAGE: &str = "rpc.receive_message"; const OPERATION_ACK_MESSAGE: &str = "rpc.ack_message"; const OPERATION_END_TRANSACTION: &str = "rpc.end_transaction"; +const OPERATION_NOTIFY_CLIENT_TERMINATION: &str = "rpc.notify_client_termination"; #[async_trait] #[automock] @@ -78,17 +79,36 @@ pub(crate) trait RPCClient { &mut self, request: EndTransactionRequest, ) -> Result; + async fn notify_shutdown( + &mut self, + request: NotifyClientTerminationRequest, + ) -> Result; } #[allow(dead_code)] -#[derive(Debug, Clone)] +#[derive(Debug)] pub(crate) struct Session { logger: Logger, client_id: String, option: ClientOption, endpoints: Endpoints, stub: MessagingServiceClient, - telemetry_tx: Box>>, + telemetry_tx: Option>, + shutdown_tx: Option>, +} + +impl Clone for Session { + fn clone(&self) -> Self { + Session { + logger: self.logger.clone(), + client_id: self.client_id.clone(), + option: self.option.clone(), + endpoints: self.endpoints.clone(), + stub: self.stub.clone(), + telemetry_tx: None, + shutdown_tx: None, + } + } } impl Session { @@ -108,7 +128,8 @@ impl Session { stub: MessagingServiceClient::new( Channel::from_static("http://localhost:8081").connect_lazy(), ), - telemetry_tx: Box::new(None), + telemetry_tx: None, + shutdown_tx: None, } } @@ -159,7 +180,8 @@ impl Session { endpoints: endpoints.clone(), client_id, stub, - telemetry_tx: Box::new(None), + telemetry_tx: None, + shutdown_tx: None, }) } @@ -288,24 +310,35 @@ impl Session { .set_source(e) })?; + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); + self.shutdown_tx = Some(shutdown_tx); + let logger = self.logger.clone(); tokio::spawn(async move { let mut stream = response.into_inner(); loop { - match stream.message().await { - Ok(Some(item)) => { - debug!(logger, "receive telemetry command: {:?}", item); - if let Some(command) = item.command { - _ = telemetry_command_tx.send(command).await; + tokio::select! { + message = stream.message() => { + match message { + Ok(Some(item)) => { + debug!(logger, "receive telemetry command: {:?}", item); + if let Some(command) = item.command { + _ = telemetry_command_tx.send(command).await; + } + } + Ok(None) => { + info!(logger, "telemetry command stream closed by server"); + break; + } + Err(e) => { + error!(logger, "telemetry response error: {:?}", e); + } } } - Ok(None) => { - debug!(logger, "request stream closed"); + _ = &mut shutdown_rx => { + info!(logger, "receive shutdown signal, stop dealing with telemetry command"); break; } - Err(e) => { - error!(logger, "telemetry response error: {:?}", e); - } } } }); @@ -314,9 +347,15 @@ impl Session { Ok(()) } + pub(crate) fn shutdown(&mut self) { + if let Some(tx) = self.shutdown_tx.take() { + let _ = tx.send(()); + } + } + #[allow(dead_code)] pub(crate) fn is_started(&self) -> bool { - self.telemetry_tx.is_some() + self.shutdown_tx.is_some() } #[allow(dead_code)] @@ -465,6 +504,26 @@ impl RPCClient for Session { })?; Ok(response.into_inner()) } + + async fn notify_shutdown( + &mut self, + request: NotifyClientTerminationRequest, + ) -> Result { + let request = self.sign(request); + let response = self + .stub + .notify_client_termination(request) + .await + .map_err(|e| { + ClientError::new( + ErrorKind::ClientInternal, + "send rpc notify_client_termination failed", + OPERATION_NOTIFY_CLIENT_TERMINATION, + ) + .set_source(e) + })?; + Ok(response.into_inner()) + } } #[derive(Debug)] @@ -521,6 +580,14 @@ impl SessionManager { } Ok(sessions) } + + pub(crate) async fn shutdown(&self) { + let mut session_map = self.session_map.lock().await; + for (_, session) in session_map.iter_mut() { + session.shutdown(); + } + session_map.clear(); + } } #[cfg(test)] diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs index d4e222ea1..a87770530 100644 --- a/rust/src/simple_consumer.rs +++ b/rust/src/simple_consumer.rs @@ -67,7 +67,7 @@ impl SimpleConsumer { let client_option = ClientOption { client_type: ClientType::SimpleConsumer, - group: option.consumer_group().to_string(), + group: Some(option.consumer_group().to_string()), namespace: option.namespace().to_string(), ..client_option }; @@ -104,6 +104,10 @@ impl SimpleConsumer { Ok(()) } + pub async fn shutdown(self) -> Result<(), ClientError> { + self.client.shutdown().await + } + /// receive messages from the specified topic /// /// # Arguments @@ -115,7 +119,7 @@ impl SimpleConsumer { topic: impl AsRef, expression: &FilterExpression, ) -> Result, ClientError> { - self.receive_with_batch_size(topic.as_ref(), expression, 32, Duration::from_secs(15)) + self.receive_with(topic.as_ref(), expression, 32, Duration::from_secs(15)) .await } @@ -127,7 +131,7 @@ impl SimpleConsumer { /// * `expression` - the subscription for the topic /// * `batch_size` - max message num of server returned /// * `invisible_duration` - set the invisible duration of messages that return from the server, these messages will not be visible to other consumers unless timeout - pub async fn receive_with_batch_size( + pub async fn receive_with( &self, topic: impl AsRef, expression: &FilterExpression,