Skip to content

Commit

Permalink
[ISSUE #575] Gracefully shutdown for Rust client (#576)
Browse files Browse the repository at this point in the history
* feat(rust): add gracefully shutdown for rust sdk

Signed-off-by: SSpirits <admin@lv5.moe>

* fix(rust): fix test

Signed-off-by: SSpirits <admin@lv5.moe>

* fix(rust): apply reviewer changes

Signed-off-by: SSpirits <admin@lv5.moe>

---------

Signed-off-by: SSpirits <admin@lv5.moe>
  • Loading branch information
ShadowySpirits committed Jul 25, 2023
1 parent 60115cf commit 4b99c03
Show file tree
Hide file tree
Showing 10 changed files with 316 additions and 106 deletions.
29 changes: 23 additions & 6 deletions rust/examples/delay_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use rocketmq::Producer;

#[tokio::main]
async fn main() {
// recommend to specify which topic(s) you would like to send message to
// producer will prefetch topic route when start and failed fast if topic not exist
// It's recommended to specify the topics that applications will publish messages to
// because the producer will prefetch topic routes for them on start and fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["delay_test"]);

Expand All @@ -34,7 +34,11 @@ async fn main() {

// build and start producer
let mut producer = Producer::new(producer_option, client_option).unwrap();
producer.start().await.unwrap();
let start_result = producer.start().await;
if start_result.is_err() {
eprintln!("producer start failed: {:?}", start_result.unwrap_err());
return;
}

// build message
let message = MessageBuilder::delay_message_builder(
Expand All @@ -51,10 +55,23 @@ async fn main() {
.unwrap();

// send message to rocketmq proxy
let result = producer.send(message).await;
debug_assert!(result.is_ok(), "send message failed: {:?}", result);
let send_result = producer.send(message).await;
if send_result.is_err() {
eprintln!("send message failed: {:?}", send_result.unwrap_err());
return;
}
println!(
"send message success, message_id={}",
result.unwrap().message_id()
send_result.unwrap().message_id()
);

// shutdown the producer when you don't need it anymore.
// recommend shutdown manually to gracefully stop and unregister from server
let shutdown_result = producer.shutdown().await;
if shutdown_result.is_err() {
eprintln!(
"producer shutdown failed: {:?}",
shutdown_result.unwrap_err()
);
}
}
29 changes: 23 additions & 6 deletions rust/examples/fifo_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use rocketmq::Producer;

#[tokio::main]
async fn main() {
// recommend to specify which topic(s) you would like to send message to
// producer will prefetch topic route when start and failed fast if topic not exist
// It's recommended to specify the topics that applications will publish messages to
// because the producer will prefetch topic routes for them on start and fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["fifo_test"]);

Expand All @@ -31,7 +31,11 @@ async fn main() {

// build and start producer
let mut producer = Producer::new(producer_option, client_option).unwrap();
producer.start().await.unwrap();
let start_result = producer.start().await;
if start_result.is_err() {
eprintln!("producer start failed: {:?}", start_result.unwrap_err());
return;
}

// build message
let message = MessageBuilder::fifo_message_builder(
Expand All @@ -44,10 +48,23 @@ async fn main() {
.unwrap();

// send message to rocketmq proxy
let result = producer.send(message).await;
debug_assert!(result.is_ok(), "send message failed: {:?}", result);
let send_result = producer.send(message).await;
if send_result.is_err() {
eprintln!("send message failed: {:?}", send_result.unwrap_err());
return;
}
println!(
"send message success, message_id={}",
result.unwrap().message_id()
send_result.unwrap().message_id()
);

// shutdown the producer when you don't need it anymore.
// you should shutdown it manually to gracefully stop and unregister from server
let shutdown_result = producer.shutdown().await;
if shutdown_result.is_err() {
eprintln!(
"producer shutdown failed: {:?}",
shutdown_result.unwrap_err()
);
}
}
29 changes: 23 additions & 6 deletions rust/examples/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use rocketmq::Producer;

#[tokio::main]
async fn main() {
// recommend to specify which topic(s) you would like to send message to
// producer will prefetch topic route when start and failed fast if topic not exist
// It's recommended to specify the topics that applications will publish messages to
// because the producer will prefetch topic routes for them on start and fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["test_topic"]);

Expand All @@ -31,7 +31,11 @@ async fn main() {

// build and start producer
let mut producer = Producer::new(producer_option, client_option).unwrap();
producer.start().await.unwrap();
let start_result = producer.start().await;
if start_result.is_err() {
eprintln!("producer start failed: {:?}", start_result.unwrap_err());
return;
}

// build message
let message = MessageBuilder::builder()
Expand All @@ -42,10 +46,23 @@ async fn main() {
.unwrap();

// send message to rocketmq proxy
let result = producer.send(message).await;
debug_assert!(result.is_ok(), "send message failed: {:?}", result);
let send_result = producer.send(message).await;
if send_result.is_err() {
eprintln!("send message failed: {:?}", send_result.unwrap_err());
return;
}
println!(
"send message success, message_id={}",
result.unwrap().message_id()
send_result.unwrap().message_id()
);

// shutdown the producer when you don't need it anymore.
// you should shutdown it manually to gracefully stop and unregister from server
let shutdown_result = producer.shutdown().await;
if shutdown_result.is_ok() {
eprintln!(
"producer shutdown failed: {:?}",
shutdown_result.unwrap_err()
);
}
}
72 changes: 44 additions & 28 deletions rust/examples/simple_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use rocketmq::SimpleConsumer;

#[tokio::main]
async fn main() {
// recommend to specify which topic(s) you would like to send message to
// simple consumer will prefetch topic route when start and failed fast if topic not exist
// It's recommended to specify the topics that applications will publish messages to
// because the simple consumer will prefetch topic routes for them on start and fail fast in case they do not exist
let mut consumer_option = SimpleConsumerOption::default();
consumer_option.set_topics(vec!["test_topic"]);
consumer_option.set_consumer_group("SimpleConsumerGroup");
Expand All @@ -33,38 +33,54 @@ async fn main() {

// build and start simple consumer
let mut consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
consumer.start().await.unwrap();

loop {
// pop message from rocketmq proxy
let receive_result = consumer
.receive(
"test_topic".to_string(),
&FilterExpression::new(FilterType::Tag, "test_tag"),
)
.await;
debug_assert!(
receive_result.is_ok(),
"receive message failed: {:?}",
receive_result.unwrap_err()
let start_result = consumer.start().await;
if start_result.is_err() {
eprintln!(
"simple consumer start failed: {:?}",
start_result.unwrap_err()
);
return;
}

let messages = receive_result.unwrap();
// pop message from rocketmq proxy
let receive_result = consumer
.receive(
"test_topic".to_string(),
&FilterExpression::new(FilterType::Tag, "test_tag"),
)
.await;
if receive_result.is_err() {
eprintln!("receive message failed: {:?}", receive_result.unwrap_err());
return;
}

if messages.is_empty() {
println!("no message received");
return;
}
let messages = receive_result.unwrap();

for message in messages {
println!("receive message: {:?}", message);
// ack message to rocketmq proxy
let ack_result = consumer.ack(&message).await;
debug_assert!(
ack_result.is_ok(),
"ack message failed: {:?}",
if messages.is_empty() {
println!("no message received");
return;
}

for message in messages {
println!("receive message: {:?}", message);
// ack message to rocketmq proxy
let ack_result = consumer.ack(&message).await;
if ack_result.is_err() {
eprintln!(
"ack message {} failed: {:?}",
message.message_id(),
ack_result.unwrap_err()
);
}
}

// shutdown the simple consumer when you don't need it anymore.
// you should shutdown it manually to gracefully stop and unregister from server
let shutdown_result = consumer.shutdown().await;
if shutdown_result.is_err() {
eprintln!(
"simple consumer shutdown failed: {:?}",
shutdown_result.unwrap_err()
);
}
}
25 changes: 21 additions & 4 deletions rust/examples/transaction_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ lazy_static::lazy_static! {

#[tokio::main]
async fn main() {
// recommend to specify which topic(s) you would like to send message to
// producer will prefetch topic route when start and failed fast if topic not exist
// It's recommended to specify the topics that applications will publish messages to
// because the producer will prefetch topic routes for them on start and fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["transaction_test"]);

Expand Down Expand Up @@ -62,7 +62,11 @@ async fn main() {
}),
)
.unwrap();
producer.start().await.unwrap();
let start_result = producer.start().await;
if start_result.is_err() {
eprintln!("producer start failed: {:?}", start_result.unwrap_err());
return;
}

// build message
let message = MessageBuilder::transaction_message_builder(
Expand Down Expand Up @@ -93,5 +97,18 @@ async fn main() {
// commit transaction manually
// delete following two lines so that RocketMQ server will check transaction status periodically
let result = transaction.commit().await;
debug_assert!(result.is_ok(), "commit transaction failed: {:?}", result);
if result.is_err() {
eprintln!("commit transaction failed: {:?}", result.unwrap_err());
return;
}

// shutdown the producer when you don't need it anymore.
// you should shutdown it manually to gracefully stop and unregister from server
let shutdown_result = producer.shutdown().await;
if shutdown_result.is_err() {
eprintln!(
"transaction producer shutdown failed: {:?}",
shutdown_result.unwrap_err()
);
}
}
Loading

0 comments on commit 4b99c03

Please sign in to comment.