From 1b4aae45cbbd9fc0f6c49ba8a72203c501723b28 Mon Sep 17 00:00:00 2001 From: Alexsander Falcucci Date: Mon, 30 Oct 2023 14:38:57 +0100 Subject: [PATCH] implement GetStakeDistribution local state query w examples fixex #316 --- examples/n2c-miniprotocols/src/main.rs | 4 + pallas-network/tests/protocols.rs | 145 +++++++++++++++++++++++++ 2 files changed, 149 insertions(+) diff --git a/examples/n2c-miniprotocols/src/main.rs b/examples/n2c-miniprotocols/src/main.rs index 0f1b177b..84c397ec 100644 --- a/examples/n2c-miniprotocols/src/main.rs +++ b/examples/n2c-miniprotocols/src/main.rs @@ -80,10 +80,14 @@ async fn main() { let get_system_start_query = localstate::queries::Request::GetSystemStart; let get_epoch_query = localstate::queries::Request::BlockQuery(localstate::queries::BlockQuery::GetEpochNo); + let get_stake_distribution_query = localstate::queries::Request::BlockQuery( + localstate::queries::BlockQuery::GetStakeDistribution, + ); // execute an arbitrary "Local State" query against the node do_localstate_query(&mut client, get_system_start_query).await; do_localstate_query(&mut client, get_epoch_query).await; + do_localstate_query(&mut client, get_stake_distribution_query).await; client.statequery().send_done().await.unwrap(); diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index e003fa86..38aa1297 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -545,6 +545,151 @@ pub async fn local_state_query_server_and_block_query_get_epoch_client_happy_pat _ = tokio::join!(client, server); } +#[tokio::test] +pub async fn local_state_query_server_and_block_query_get_stake_distribution_client_happy_path() { + let server = tokio::spawn({ + async move { + // server setup + let socket_path = Path::new("node.socket"); + + if socket_path.exists() { + fs::remove_file(socket_path).unwrap(); + } + + let unix_listener = UnixListener::bind(socket_path).unwrap(); + + let (bearer, _) = Bearer::accept_unix(&unix_listener).await.unwrap(); + + let mut server_plexer = Plexer::new(bearer); + + let mut server_hs: handshake::Server = + handshake::Server::new(server_plexer.subscribe_server(PROTOCOL_N2C_HANDSHAKE)); + + let mut server_sq: localstate::Server = + localstate::Server::new(server_plexer.subscribe_server(PROTOCOL_N2C_STATE_QUERY)); + + tokio::spawn(async move { server_plexer.run().await }); + + server_hs.receive_proposed_versions().await.unwrap(); + server_hs + .accept_version(10, n2c::VersionData::new(0, Some(false))) + .await + .unwrap(); + + // server receives range from client, sends blocks + + let ClientAcquireRequest(maybe_point) = + server_sq.recv_while_idle().await.unwrap().unwrap(); + + assert_eq!(maybe_point, Some(Point::Origin)); + assert_eq!(*server_sq.state(), localstate::State::Acquiring); + + // server_bf.send_block_range(bodies).await.unwrap(); + + server_sq.send_acquired().await.unwrap(); + + assert_eq!(*server_sq.state(), localstate::State::Acquired); + + // server receives query from client + + let query = match server_sq.recv_while_acquired().await.unwrap() { + ClientQueryRequest::Query(q) => q, + x => panic!("unexpected message from client: {x:?}"), + }; + + assert_eq!( + query, + Request::BlockQuery(localstate::queries::BlockQuery::GetStakeDistribution) + ); + + assert_eq!(*server_sq.state(), localstate::State::Querying); + + server_sq + .send_result(GenericResponse::new( + hex::decode("8B188118BF1858181C031218EA188E183D18E11821").unwrap(), + )) + .await + .unwrap(); + + assert_eq!(*server_sq.state(), localstate::State::Acquired); + + // server receives reaquire from the client + + let maybe_point = match server_sq.recv_while_acquired().await.unwrap() { + ClientQueryRequest::ReAcquire(p) => p, + x => panic!("unexpected message from client: {x:?}"), + }; + + assert_eq!(maybe_point, Some(Point::Specific(1337, vec![1, 2, 3]))); + assert_eq!(*server_sq.state(), localstate::State::Acquiring); + + server_sq.send_acquired().await.unwrap(); + + // server receives release from the client + + match server_sq.recv_while_acquired().await.unwrap() { + ClientQueryRequest::Release => (), + x => panic!("unexpected message from client: {x:?}"), + }; + + assert!(server_sq.recv_while_idle().await.unwrap().is_none()); + + assert_eq!(*server_sq.state(), localstate::State::Done); + } + }); + + let client = tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(1)).await; + + // client setup + + let socket_path = "node.socket"; + + let mut client_to_server_conn = NodeClient::connect(socket_path, 0).await.unwrap(); + + let client_sq = client_to_server_conn.statequery(); + + // client sends acquire + + client_sq.send_acquire(Some(Point::Origin)).await.unwrap(); + + client_sq.recv_while_acquiring().await.unwrap(); + + assert_eq!(*client_sq.state(), localstate::State::Acquired); + + // client sends a BlockQuery + + client_sq + .send_query(Request::BlockQuery( + localstate::queries::BlockQuery::GetStakeDistribution, + )) + .await + .unwrap(); + + let resp = client_sq.recv_while_querying().await.unwrap(); + + assert_eq!( + resp, + GenericResponse::new( + hex::decode("8B188118BF1858181C031218EA188E183D18E11821").unwrap() + ) + ); + + client_sq + .send_reacquire(Some(Point::Specific(1337, vec![1, 2, 3]))) + .await + .unwrap(); + + client_sq.recv_while_acquiring().await.unwrap(); + + client_sq.send_release().await.unwrap(); + + client_sq.send_done().await.unwrap(); + }); + + _ = tokio::join!(client, server); +} + #[tokio::test] #[ignore] pub async fn chainsync_server_and_client_happy_path_n2n() {