Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dart StreamController.stream -> Rust #2195

Open
normalllll opened this issue Jul 10, 2024 · 12 comments
Open

Dart StreamController.stream -> Rust #2195

normalllll opened this issue Jul 10, 2024 · 12 comments
Labels
awaiting Waiting for responses, PR, further discussions, upstream release, etc enhancement New feature or request wontfix This will not be worked on

Comments

@normalllll
Copy link

I need to implement a function that transmits stream from Dart to Rust, such as gRPC client stream. Is there a way to achieve this without using additional "fn add(...)"?

@normalllll normalllll added the enhancement New feature or request label Jul 10, 2024
Copy link

welcome bot commented Jul 10, 2024

Hi! Thanks for opening your first issue here! 😄

@fzyzcjy
Copy link
Owner

fzyzcjy commented Jul 10, 2024

Currently I guess you can do something like:

Stream<String> myStream = ...; // suppose this is your stream
myStream.listen(handle_my_stream_event);
pub fn handle_my_stream_event(event: String) {
  // ...
}

Or, if you want the Rust side to be stateful, another simple example:

Stream<String> myStream = ...; // suppose this is your stream
var a = MyRustSideWorker(); // suppose you create the rust object somewhere in the code
myStream.listen(a.handle_my_stream_event);
pub struct MyRustSideWorker { ... }

impl MyRustSideWorker {
  pub fn handle_my_stream_event(&self, event: String) {
    // ...
  }
}

@fzyzcjy fzyzcjy added the awaiting Waiting for responses, PR, further discussions, upstream release, etc label Jul 10, 2024
@normalllll
Copy link
Author

I mean something like this:

  StreamController<int> streamController = StreamController<int>();
  final sink = streamController.sink;

  final stream = streamController.stream;

  final result = await rust_client_stream_func(stream);

  sink.add(1);
  sink.add(2);
  sink.add(3);
  sink.close();
pub async fn rust_client_stream_func(mut stream: impl Stream<Item=i32> + Send + Unpin) -> Result<i32, String> {
    let mut sum = 0;
    while let Some(value) = stream.next().await {
        sum += value;
    }
    Ok(sum)
}

@fzyzcjy
Copy link
Owner

fzyzcjy commented Jul 11, 2024

I see. That looks implementable, and feel free to PR for this! Alternatively, I may work on it later.

Currently, one simple way to workaround may be using tokio oneshot channels. The sender is like stream sink and receiver is like stream. Then, the rust_client_stream_func can get a oneshot receiver.

I may think about it later a bit more.

@normalllll
Copy link
Author

Is there a way to convert these 3 Rust examples to Dart at the same time?

struct HelloRequest {
    name: String,
}

struct HelloResponse {
    message: String,
}

pub async fn rust_client_stream_func(mut stream: impl Stream<Item=HelloRequest> + Send + Unpin) -> Result<HelloResponse, String> {
    while let Some(value) = stream.next().await {
        println!("Received: {}", value.name);
    }

    Ok(HelloResponse {
        message: "Hello".to_string()
    })
}


pub async fn rust_server_stream_func(request: HelloRequest) -> Result<impl Stream<Item=HelloResponse>, Box<dyn std::error::Error>> {
    let mut count = 0;
    let interval = tokio::time::interval(Duration::from_secs(1));
    let name = request.name.clone();

    let stream = stream::unfold(interval, move |mut interval| {
        let name = name.clone();
        async move {
            if count < 5 {
                interval.tick().await;
                count += 1;
                Some((
                    HelloResponse {
                        message: format!("Hello, {}!", name),
                    },
                    interval,
                ))
            } else {
                None
            }
        }
    });

    Ok(stream)
}

pub async fn rust_bidirectional_stream_func(
    mut in_stream: impl Stream<Item = HelloRequest> + Send + Unpin,
) -> Result<impl Stream<Item = HelloResponse>, Box<dyn std::error::Error>> {
    let interval = tokio::time::interval(Duration::from_secs(1));

    let out_stream = stream::unfold((interval, in_stream), move |(mut interval, mut stream)| async move {
        if let Some(request) = stream.next().await {
            interval.tick().await;
            let response = HelloResponse {
                message: format!("Hello, {}!", request.name),
            };
            Some((response, (interval, stream)))
        } else {
            None
        }
    });

    Ok(out_stream)
}

#[tokio::test]
async fn test(){
    use futures::stream;
    let requests = stream::iter(vec![
        HelloRequest { name: "Alice".to_string() },
        HelloRequest { name: "Bob".to_string() },
        HelloRequest { name: "Charlie".to_string() },
        HelloRequest { name: "Dave".to_string() },
        HelloRequest { name: "Eve".to_string() },
    ]);

    let responses = rust_bidirectional_stream_func(requests).await.unwrap();
    tokio::pin!(responses);

    while let Some(response) = responses.next().await {
        println!("{}", response.message);
    }
}

@normalllll
Copy link
Author

I created an RPC framework for Rust (still in development) and currently use proc_macro to generate code, hoping it can be used seamlessly in Dart.

@fzyzcjy
Copy link
Owner

fzyzcjy commented Jul 11, 2024

Firstly, if we add support for Streams, then this will be doable, and feel free to PR for this.

Secondly, without that, a temporary workaround is roughly like:

pub struct Dart2RustStreamSink(mpsc::Sender);

fn create_stream() -> (Dart2RustStreamSink, mpsc::Receiver) {
  tokio::sync::mpsc::channel()
}

impl Dart2RustStreamSink {
  pub async fn add(&self, data) { self.0.send(data) }
}

fn rust_client_stream_func(stream: oneshot::Receiver) {
    while let Some(value) = stream.next().await {
        println!("Received: {}", value.name);
    }
}
Dart2RustStream createDart2RustStream(Stream pureDartStream) {
  var (dart2rustSink, dart2rustStream) = createStream();
  pureDartStream.listen(dart2rustSink.add);
}

rust_client_stream_func(createDart2RustStream(yourExistingStream));

since generic is not supported yet, we can use macros to generate for each type. (again feel free to PR for generics and that will make it easier to done.

And for rust -> dart stream, check the doc for one.

@normalllll
Copy link
Author

use anyhow::Result;
use std::{thread::sleep, time::Duration};

use crate::frb_generated::StreamSink;

const ONE_SECOND: Duration = Duration::from_secs(1);

// can't omit the return type yet, this is a bug
pub fn tick(sink: StreamSink<i32>) -> Result<()> {
    let mut ticks = 0;
    loop {
        sink.add(ticks);
        sleep(ONE_SECOND);
        if ticks == i32::MAX {
            break;
        }
        ticks += 1;
    }
    Ok(())
}

I checked the documentation about Rust -> Dart Stream, and there is a problem with this, that is, the StreamSink in frb_generate must be used, and it needs to be placed in the parameter.

For code with Result<impl Stream<...>,...> generated by external library cannot be automatically processed. It seems that additional code must be written to manually nest.

@fzyzcjy
Copy link
Owner

fzyzcjy commented Jul 13, 2024

Yes, currently you may need to write a wrapper function, again maybe with help of Rust macros to avoid repeating. However, feel free to PR to support returning a impl Stream! I may work on it later, but cannot guarantee since this may not be of super high priority.

@normalllll
Copy link
Author

normalllll commented Jul 19, 2024

I tried this:

use std::sync::Arc;
use rrpc_core::client::{Client, QuicClient, QuicConfig};
use rrpc_core::codec::BincodeCodec;

// mod hello{
//     use serde::{Deserialize, Serialize};
//     use rrpc_core::transport::*;
//     use rrpc_core::client::{Client, QuicClient, QuicConfig};
//     use rrpc_core::codec::{BincodeCodec, Codec};
//     use futures::{Stream, StreamExt};
//     use std::marker::PhantomData;
//     #[derive(Debug, PartialEq, Serialize, Deserialize)]
//     pub struct HelloRequest {
//         pub name: String,
//         pub id: i32,
//         pub nicknames: Vec<String>,
//         pub is_active: Option<bool>,
//         pub attributes: std::collections::HashMap<String, i32>,
//         pub nested: NestedMessage,
//         pub aa: i8,
//     }
//     #[derive(Debug, PartialEq, Serialize, Deserialize)]
//     pub struct HelloReply {
//         pub message: String,
//         pub responses: Vec<String>,
//     }
//     #[derive(Debug, PartialEq, Serialize, Deserialize)]
//     pub struct NestedMessage {
//         pub nested_field: String,
//     }
//
//     pub struct HelloService<'a, C: Client<T>, T: Codec> {
//         client: &'a C,
//         codec: PhantomData<T>,
//     }
//
//     impl<'a, C: Client<T>, T: Codec> HelloService<'a, C, T> {
//         pub fn new(client: &'a C) -> Self {
//             Self { client, codec: PhantomData }
//         }
//
//         pub async fn say_hello(
//             &self,
//             request: HelloRequest,
//         ) -> Result<HelloReply, Box<dyn std::error::Error>> {
//             let (mut send, recv) = self.client.open_transport().await;
//             send.send_metadata(RequestContext::new(0u32, 0u32, 0)).await?;
//             let mut send_unary = send.into_unary();
//             send_unary.send(request).await?;
//             let mut recv_unary = recv.into_unary();
//             let response: HelloReply = recv_unary.recv().await?;
//             return Ok(response);
//         }
//         pub async fn stream_server(
//             &self,
//             request: HelloRequest,
//         ) -> Result<
//             StreamReceiver<
//                 <C::RecvTransport as IntoRecvStreamTransport>::StreamTarget,
//                 HelloReply,
//             >,
//             Box<dyn std::error::Error>,
//         > {
//             let (mut send, recv) = self.client.open_transport().await;
//             send.send_metadata(RequestContext::new(0u32, 1u32, 0)).await?;
//             let mut send_unary = send.into_unary();
//             send_unary.send(request).await?;
//             let recv_stream = recv.into_stream();
//             let receiver = StreamReceiver::<
//                 <C::RecvTransport as IntoRecvStreamTransport>::StreamTarget,
//                 HelloReply,
//             >::new(recv_stream);
//             Ok(receiver)
//         }
//
//         pub async fn stream_client(
//             &self,
//             mut client_stream: impl Stream<Item=HelloRequest> + Send + Unpin,
//         ) -> Result<HelloReply, Box<dyn std::error::Error>> {
//             let (mut send, recv) = self.client.open_transport().await;
//             send.send_metadata(RequestContext::new(0u32, 2u32, 0)).await?;
//             let send_stream = send.into_stream();
//             let mut recv_unary_transport = recv.into_unary();
//             let mut sender = StreamSender::<
//                 <C::SendTransport as IntoSendStreamTransport>::StreamTarget,
//                 HelloRequest,
//             >::new(send_stream);
//             while let Some(request) = client_stream.next().await {
//                 sender.send(request).await?;
//             }
//             sender.finish().await?;
//             let response: HelloReply = recv_unary_transport.recv().await?;
//             Ok(response)
//         }
//
//         pub async fn stream_both(
//             &self,
//         ) -> Result<
//             (
//                 StreamSender<
//                     <C::SendTransport as IntoSendStreamTransport>::StreamTarget,
//                     HelloRequest,
//                 >,
//                 StreamReceiver<
//                     <C::RecvTransport as IntoRecvStreamTransport>::StreamTarget,
//                     HelloReply,
//                 >,
//             ),
//             Box<dyn std::error::Error>,
//         > {
//             let (mut send, recv) = self.client.open_transport().await;
//             send.send_metadata(RequestContext::new(0u32, 3u32, 0)).await?;
//             let send_stream = send.into_stream();
//             let recv_stream = recv.into_stream();
//             let sender = StreamSender::<
//                 <C::SendTransport as IntoSendStreamTransport>::StreamTarget,
//                 HelloRequest,
//             >::new(send_stream);
//             let receiver = StreamReceiver::<
//                 <C::RecvTransport as IntoRecvStreamTransport>::StreamTarget,
//                 HelloReply,
//             >::new(recv_stream);
//             Ok((sender, receiver))
//         }
//     }
//
// }

mod hello{
    rrpc_proc::proto_client!("hello.proto");
}

pub async fn new_hello_service_client() -> hello::HelloService<QuicClient<BincodeCodec>, BincodeCodec> {
    let codec = Arc::new(BincodeCodec::new(bincode::config::standard()));
    let client = QuicClient::connect(codec, "127.0.0.1:4433".parse().unwrap(), QuicConfig {
        client_key_pem: include_bytes!("../cert/client.key").to_vec(),
        client_cert_pem: include_bytes!("../cert/client.crt").to_vec(),
        root_cert_pem: include_bytes!("../cert/ca.crt").to_vec(),
        server_name: "127.0.0.1",
    }).await.unwrap();


    let hello_client = hello::HelloService::new(&client);
    hello_client
}

The generated Dart code is as follows:

// This file is automatically generated, so please do not edit it.
// Generated by `flutter_rust_bridge`@ 2.1.0.

// ignore_for_file: invalid_use_of_internal_member, unused_import, unnecessary_import

import '../../frb_generated.dart';
import 'package:flutter_rust_bridge/flutter_rust_bridge_for_generated.dart';

// These types are ignored because they are not used by any `pub` functions: `HelloReply`, `HelloRequest`, `NestedMessage`
// These function are ignored because they are on traits that is not defined in current crate (put an empty `#[frb]` on it to unignore): `eq`, `eq`, `eq`, `fmt`, `fmt`, `fmt`
// These functions are ignored (category: IgnoreBecauseOwnerTyShouldIgnore): `new`, `say_hello`, `stream_both`, `stream_client`, `stream_server`

// Rust type: RustOpaqueMoi<flutter_rust_bridge::for_generated::RustAutoOpaqueInner< HelloService < QuicClient < BincodeCodec > , BincodeCodec >>>
abstract class HelloServiceQuicClientBincodeCodecBincodeCodec
    implements RustOpaqueInterface {}

I want to export these functions say_hello, stream_both, stream_client, stream_server. How can I do this?

I noticed that using #[frb(external)] directly will export some irrelevant functions and properties.

For example, for this struct, I only want it to have a function recv, and I don't need the attributes recv_transport _marker and the function new.

pub struct StreamReceiver<R: RecvStreamTransport, T: TypeDecode> {
    recv_transport: R,
    _marker: PhantomData<T>,
}

impl<R: RecvStreamTransport, T: TypeDecode> StreamReceiver<R, T> {
    pub fn new(recv_transport: R) -> Self {
        Self {
            recv_transport,
            _marker: PhantomData,
        }
    }

    pub async fn recv(&mut self) -> Result<Option<T>, TransportError> {
        match self.recv_transport.recv().await {
            Ok(data) => Ok(Some(data)),
            Err(TransportError::EOF) => Ok(None),
            Err(e) => Err(e),
        }
    }
}

@fzyzcjy
Copy link
Owner

fzyzcjy commented Jul 19, 2024

Hmm, given the message, the first guess is that, HelloService has complex generics, but generics is not yet supported by frb (feel free to PR!).

Copy link

stale bot commented Sep 19, 2024

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the wontfix This will not be worked on label Sep 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
awaiting Waiting for responses, PR, further discussions, upstream release, etc enhancement New feature or request wontfix This will not be worked on
Projects
None yet
Development

No branches or pull requests

2 participants