-
Notifications
You must be signed in to change notification settings - Fork 286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dart StreamController.stream -> Rust #2195
Comments
Hi! Thanks for opening your first issue here! 😄 |
Currently I guess you can do something like: Stream<String> myStream = ...; // suppose this is your stream
myStream.listen(handle_my_stream_event); pub fn handle_my_stream_event(event: String) {
// ...
} Or, if you want the Rust side to be stateful, another simple example: Stream<String> myStream = ...; // suppose this is your stream
var a = MyRustSideWorker(); // suppose you create the rust object somewhere in the code
myStream.listen(a.handle_my_stream_event); pub struct MyRustSideWorker { ... }
impl MyRustSideWorker {
pub fn handle_my_stream_event(&self, event: String) {
// ...
}
} |
I mean something like this: StreamController<int> streamController = StreamController<int>();
final sink = streamController.sink;
final stream = streamController.stream;
final result = await rust_client_stream_func(stream);
sink.add(1);
sink.add(2);
sink.add(3);
sink.close(); pub async fn rust_client_stream_func(mut stream: impl Stream<Item=i32> + Send + Unpin) -> Result<i32, String> {
let mut sum = 0;
while let Some(value) = stream.next().await {
sum += value;
}
Ok(sum)
} |
I see. That looks implementable, and feel free to PR for this! Alternatively, I may work on it later. Currently, one simple way to workaround may be using tokio oneshot channels. The sender is like stream sink and receiver is like stream. Then, the I may think about it later a bit more. |
Is there a way to convert these 3 Rust examples to Dart at the same time? struct HelloRequest {
name: String,
}
struct HelloResponse {
message: String,
}
pub async fn rust_client_stream_func(mut stream: impl Stream<Item=HelloRequest> + Send + Unpin) -> Result<HelloResponse, String> {
while let Some(value) = stream.next().await {
println!("Received: {}", value.name);
}
Ok(HelloResponse {
message: "Hello".to_string()
})
}
pub async fn rust_server_stream_func(request: HelloRequest) -> Result<impl Stream<Item=HelloResponse>, Box<dyn std::error::Error>> {
let mut count = 0;
let interval = tokio::time::interval(Duration::from_secs(1));
let name = request.name.clone();
let stream = stream::unfold(interval, move |mut interval| {
let name = name.clone();
async move {
if count < 5 {
interval.tick().await;
count += 1;
Some((
HelloResponse {
message: format!("Hello, {}!", name),
},
interval,
))
} else {
None
}
}
});
Ok(stream)
}
pub async fn rust_bidirectional_stream_func(
mut in_stream: impl Stream<Item = HelloRequest> + Send + Unpin,
) -> Result<impl Stream<Item = HelloResponse>, Box<dyn std::error::Error>> {
let interval = tokio::time::interval(Duration::from_secs(1));
let out_stream = stream::unfold((interval, in_stream), move |(mut interval, mut stream)| async move {
if let Some(request) = stream.next().await {
interval.tick().await;
let response = HelloResponse {
message: format!("Hello, {}!", request.name),
};
Some((response, (interval, stream)))
} else {
None
}
});
Ok(out_stream)
}
#[tokio::test]
async fn test(){
use futures::stream;
let requests = stream::iter(vec![
HelloRequest { name: "Alice".to_string() },
HelloRequest { name: "Bob".to_string() },
HelloRequest { name: "Charlie".to_string() },
HelloRequest { name: "Dave".to_string() },
HelloRequest { name: "Eve".to_string() },
]);
let responses = rust_bidirectional_stream_func(requests).await.unwrap();
tokio::pin!(responses);
while let Some(response) = responses.next().await {
println!("{}", response.message);
}
} |
I created an RPC framework for Rust (still in development) and currently use proc_macro to generate code, hoping it can be used seamlessly in Dart. |
Firstly, if we add support for Streams, then this will be doable, and feel free to PR for this. Secondly, without that, a temporary workaround is roughly like: pub struct Dart2RustStreamSink(mpsc::Sender);
fn create_stream() -> (Dart2RustStreamSink, mpsc::Receiver) {
tokio::sync::mpsc::channel()
}
impl Dart2RustStreamSink {
pub async fn add(&self, data) { self.0.send(data) }
}
fn rust_client_stream_func(stream: oneshot::Receiver) {
while let Some(value) = stream.next().await {
println!("Received: {}", value.name);
}
} Dart2RustStream createDart2RustStream(Stream pureDartStream) {
var (dart2rustSink, dart2rustStream) = createStream();
pureDartStream.listen(dart2rustSink.add);
}
rust_client_stream_func(createDart2RustStream(yourExistingStream)); since generic is not supported yet, we can use macros to generate for each type. (again feel free to PR for generics and that will make it easier to done. And for rust -> dart stream, check the doc for one. |
use anyhow::Result;
use std::{thread::sleep, time::Duration};
use crate::frb_generated::StreamSink;
const ONE_SECOND: Duration = Duration::from_secs(1);
// can't omit the return type yet, this is a bug
pub fn tick(sink: StreamSink<i32>) -> Result<()> {
let mut ticks = 0;
loop {
sink.add(ticks);
sleep(ONE_SECOND);
if ticks == i32::MAX {
break;
}
ticks += 1;
}
Ok(())
} I checked the documentation about Rust -> Dart Stream, and there is a problem with this, that is, the For code with |
Yes, currently you may need to write a wrapper function, again maybe with help of Rust macros to avoid repeating. However, feel free to PR to support returning a |
I tried this: use std::sync::Arc;
use rrpc_core::client::{Client, QuicClient, QuicConfig};
use rrpc_core::codec::BincodeCodec;
// mod hello{
// use serde::{Deserialize, Serialize};
// use rrpc_core::transport::*;
// use rrpc_core::client::{Client, QuicClient, QuicConfig};
// use rrpc_core::codec::{BincodeCodec, Codec};
// use futures::{Stream, StreamExt};
// use std::marker::PhantomData;
// #[derive(Debug, PartialEq, Serialize, Deserialize)]
// pub struct HelloRequest {
// pub name: String,
// pub id: i32,
// pub nicknames: Vec<String>,
// pub is_active: Option<bool>,
// pub attributes: std::collections::HashMap<String, i32>,
// pub nested: NestedMessage,
// pub aa: i8,
// }
// #[derive(Debug, PartialEq, Serialize, Deserialize)]
// pub struct HelloReply {
// pub message: String,
// pub responses: Vec<String>,
// }
// #[derive(Debug, PartialEq, Serialize, Deserialize)]
// pub struct NestedMessage {
// pub nested_field: String,
// }
//
// pub struct HelloService<'a, C: Client<T>, T: Codec> {
// client: &'a C,
// codec: PhantomData<T>,
// }
//
// impl<'a, C: Client<T>, T: Codec> HelloService<'a, C, T> {
// pub fn new(client: &'a C) -> Self {
// Self { client, codec: PhantomData }
// }
//
// pub async fn say_hello(
// &self,
// request: HelloRequest,
// ) -> Result<HelloReply, Box<dyn std::error::Error>> {
// let (mut send, recv) = self.client.open_transport().await;
// send.send_metadata(RequestContext::new(0u32, 0u32, 0)).await?;
// let mut send_unary = send.into_unary();
// send_unary.send(request).await?;
// let mut recv_unary = recv.into_unary();
// let response: HelloReply = recv_unary.recv().await?;
// return Ok(response);
// }
// pub async fn stream_server(
// &self,
// request: HelloRequest,
// ) -> Result<
// StreamReceiver<
// <C::RecvTransport as IntoRecvStreamTransport>::StreamTarget,
// HelloReply,
// >,
// Box<dyn std::error::Error>,
// > {
// let (mut send, recv) = self.client.open_transport().await;
// send.send_metadata(RequestContext::new(0u32, 1u32, 0)).await?;
// let mut send_unary = send.into_unary();
// send_unary.send(request).await?;
// let recv_stream = recv.into_stream();
// let receiver = StreamReceiver::<
// <C::RecvTransport as IntoRecvStreamTransport>::StreamTarget,
// HelloReply,
// >::new(recv_stream);
// Ok(receiver)
// }
//
// pub async fn stream_client(
// &self,
// mut client_stream: impl Stream<Item=HelloRequest> + Send + Unpin,
// ) -> Result<HelloReply, Box<dyn std::error::Error>> {
// let (mut send, recv) = self.client.open_transport().await;
// send.send_metadata(RequestContext::new(0u32, 2u32, 0)).await?;
// let send_stream = send.into_stream();
// let mut recv_unary_transport = recv.into_unary();
// let mut sender = StreamSender::<
// <C::SendTransport as IntoSendStreamTransport>::StreamTarget,
// HelloRequest,
// >::new(send_stream);
// while let Some(request) = client_stream.next().await {
// sender.send(request).await?;
// }
// sender.finish().await?;
// let response: HelloReply = recv_unary_transport.recv().await?;
// Ok(response)
// }
//
// pub async fn stream_both(
// &self,
// ) -> Result<
// (
// StreamSender<
// <C::SendTransport as IntoSendStreamTransport>::StreamTarget,
// HelloRequest,
// >,
// StreamReceiver<
// <C::RecvTransport as IntoRecvStreamTransport>::StreamTarget,
// HelloReply,
// >,
// ),
// Box<dyn std::error::Error>,
// > {
// let (mut send, recv) = self.client.open_transport().await;
// send.send_metadata(RequestContext::new(0u32, 3u32, 0)).await?;
// let send_stream = send.into_stream();
// let recv_stream = recv.into_stream();
// let sender = StreamSender::<
// <C::SendTransport as IntoSendStreamTransport>::StreamTarget,
// HelloRequest,
// >::new(send_stream);
// let receiver = StreamReceiver::<
// <C::RecvTransport as IntoRecvStreamTransport>::StreamTarget,
// HelloReply,
// >::new(recv_stream);
// Ok((sender, receiver))
// }
// }
//
// }
mod hello{
rrpc_proc::proto_client!("hello.proto");
}
pub async fn new_hello_service_client() -> hello::HelloService<QuicClient<BincodeCodec>, BincodeCodec> {
let codec = Arc::new(BincodeCodec::new(bincode::config::standard()));
let client = QuicClient::connect(codec, "127.0.0.1:4433".parse().unwrap(), QuicConfig {
client_key_pem: include_bytes!("../cert/client.key").to_vec(),
client_cert_pem: include_bytes!("../cert/client.crt").to_vec(),
root_cert_pem: include_bytes!("../cert/ca.crt").to_vec(),
server_name: "127.0.0.1",
}).await.unwrap();
let hello_client = hello::HelloService::new(&client);
hello_client
} The generated Dart code is as follows: // This file is automatically generated, so please do not edit it.
// Generated by `flutter_rust_bridge`@ 2.1.0.
// ignore_for_file: invalid_use_of_internal_member, unused_import, unnecessary_import
import '../../frb_generated.dart';
import 'package:flutter_rust_bridge/flutter_rust_bridge_for_generated.dart';
// These types are ignored because they are not used by any `pub` functions: `HelloReply`, `HelloRequest`, `NestedMessage`
// These function are ignored because they are on traits that is not defined in current crate (put an empty `#[frb]` on it to unignore): `eq`, `eq`, `eq`, `fmt`, `fmt`, `fmt`
// These functions are ignored (category: IgnoreBecauseOwnerTyShouldIgnore): `new`, `say_hello`, `stream_both`, `stream_client`, `stream_server`
// Rust type: RustOpaqueMoi<flutter_rust_bridge::for_generated::RustAutoOpaqueInner< HelloService < QuicClient < BincodeCodec > , BincodeCodec >>>
abstract class HelloServiceQuicClientBincodeCodecBincodeCodec
implements RustOpaqueInterface {}
I want to export these functions I noticed that using #[frb(external)] directly will export some irrelevant functions and properties. For example, for this struct, I only want it to have a function pub struct StreamReceiver<R: RecvStreamTransport, T: TypeDecode> {
recv_transport: R,
_marker: PhantomData<T>,
}
impl<R: RecvStreamTransport, T: TypeDecode> StreamReceiver<R, T> {
pub fn new(recv_transport: R) -> Self {
Self {
recv_transport,
_marker: PhantomData,
}
}
pub async fn recv(&mut self) -> Result<Option<T>, TransportError> {
match self.recv_transport.recv().await {
Ok(data) => Ok(Some(data)),
Err(TransportError::EOF) => Ok(None),
Err(e) => Err(e),
}
}
} |
Hmm, given the message, the first guess is that, HelloService has complex generics, but generics is not yet supported by frb (feel free to PR!). |
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
I need to implement a function that transmits stream from Dart to Rust, such as gRPC client stream. Is there a way to achieve this without using additional "fn add(...)"?
The text was updated successfully, but these errors were encountered: