Skip to content

Commit

Permalink
add GetCurrentPParams local state query
Browse files Browse the repository at this point in the history
  • Loading branch information
falcucci committed Oct 30, 2023
1 parent 1b4aae4 commit 5be7382
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 1 deletion.
6 changes: 5 additions & 1 deletion examples/n2c-miniprotocols/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use pallas::network::{
facades::NodeClient,
miniprotocols::{chainsync, localstate, Point, MAINNET_MAGIC},
miniprotocols::{chainsync, localstate, Point, MAINNET_MAGIC, PRE_PRODUCTION_MAGIC},
};
use tracing::info;

Expand Down Expand Up @@ -83,11 +83,15 @@ async fn main() {
let get_stake_distribution_query = localstate::queries::Request::BlockQuery(
localstate::queries::BlockQuery::GetStakeDistribution,
);
let get_current_protocol_params_query = localstate::queries::Request::BlockQuery(
localstate::queries::BlockQuery::GetCurrentPParams,
);

// execute an arbitrary "Local State" query against the node
do_localstate_query(&mut client, get_system_start_query).await;
do_localstate_query(&mut client, get_epoch_query).await;
do_localstate_query(&mut client, get_stake_distribution_query).await;
do_localstate_query(&mut client, get_current_protocol_params_query).await;

client.statequery().send_done().await.unwrap();

Expand Down
148 changes: 148 additions & 0 deletions pallas-network/tests/protocols.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,154 @@ pub async fn local_state_query_server_and_block_query_get_stake_distribution_cli
_ = tokio::join!(client, server);
}

#[tokio::test]
pub async fn local_state_query_server_and_block_query_get_current_protocol_params_client_happy_path(
) {
let server = tokio::spawn({
async move {
// server setup
let socket_path = Path::new("node.socket");

if socket_path.exists() {
fs::remove_file(socket_path).unwrap();
}

let unix_listener = UnixListener::bind(socket_path).unwrap();

let (bearer, _) = Bearer::accept_unix(&unix_listener).await.unwrap();

let mut server_plexer = Plexer::new(bearer);

let mut server_hs: handshake::Server<n2c::VersionData> =
handshake::Server::new(server_plexer.subscribe_server(PROTOCOL_N2C_HANDSHAKE));

let mut server_sq: localstate::Server =
localstate::Server::new(server_plexer.subscribe_server(PROTOCOL_N2C_STATE_QUERY));

tokio::spawn(async move { server_plexer.run().await });

server_hs.receive_proposed_versions().await.unwrap();
server_hs
.accept_version(10, n2c::VersionData::new(0, Some(false)))
.await
.unwrap();

// server receives range from client, sends blocks

let ClientAcquireRequest(maybe_point) =
server_sq.recv_while_idle().await.unwrap().unwrap();

assert_eq!(maybe_point, Some(Point::Origin));
assert_eq!(*server_sq.state(), localstate::State::Acquiring);

// server_bf.send_block_range(bodies).await.unwrap();

server_sq.send_acquired().await.unwrap();

assert_eq!(*server_sq.state(), localstate::State::Acquired);

// server receives query from client

let query = match server_sq.recv_while_acquired().await.unwrap() {
ClientQueryRequest::Query(q) => q,
x => panic!("unexpected message from client: {x:?}"),
};

assert_eq!(
query,
Request::BlockQuery(localstate::queries::BlockQuery::GetCurrentPParams)
);

assert_eq!(*server_sq.state(), localstate::State::Querying);

server_sq
.send_result(GenericResponse::new(
hex::decode("92188118971818182C181A0002185E18F5181A000118600018191840001819")
.unwrap(),
))
.await
.unwrap();

assert_eq!(*server_sq.state(), localstate::State::Acquired);

// server receives reaquire from the client

let maybe_point = match server_sq.recv_while_acquired().await.unwrap() {
ClientQueryRequest::ReAcquire(p) => p,
x => panic!("unexpected message from client: {x:?}"),
};

assert_eq!(maybe_point, Some(Point::Specific(1337, vec![1, 2, 3])));
assert_eq!(*server_sq.state(), localstate::State::Acquiring);

server_sq.send_acquired().await.unwrap();

// server receives release from the client

match server_sq.recv_while_acquired().await.unwrap() {
ClientQueryRequest::Release => (),
x => panic!("unexpected message from client: {x:?}"),
};

assert!(server_sq.recv_while_idle().await.unwrap().is_none());

assert_eq!(*server_sq.state(), localstate::State::Done);
}
});

let client = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;

// client setup

let socket_path = "node.socket";

let mut client_to_server_conn = NodeClient::connect(socket_path, 0).await.unwrap();

let client_sq = client_to_server_conn.statequery();

// client sends acquire

client_sq.send_acquire(Some(Point::Origin)).await.unwrap();

client_sq.recv_while_acquiring().await.unwrap();

assert_eq!(*client_sq.state(), localstate::State::Acquired);

// client sends a BlockQuery

client_sq
.send_query(Request::BlockQuery(
localstate::queries::BlockQuery::GetCurrentPParams,
))
.await
.unwrap();

let resp = client_sq.recv_while_querying().await.unwrap();

assert_eq!(
resp,
GenericResponse::new(
hex::decode("92188118971818182C181A0002185E18F5181A000118600018191840001819")
.unwrap()
)
);

client_sq
.send_reacquire(Some(Point::Specific(1337, vec![1, 2, 3])))
.await
.unwrap();

client_sq.recv_while_acquiring().await.unwrap();

client_sq.send_release().await.unwrap();

client_sq.send_done().await.unwrap();
});

_ = tokio::join!(client, server);
}

#[tokio::test]
#[ignore]
pub async fn chainsync_server_and_client_happy_path_n2n() {
Expand Down

0 comments on commit 5be7382

Please sign in to comment.