Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New API #9

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .envrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
use flake '.?submodules=1'
use flake '.?submodules=1' --override-input linera-protocol ./linera-protocol
22 changes: 12 additions & 10 deletions client-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,21 @@ license = "Apache-2.0"
crate-type = ["cdylib", "rlib"]

[dependencies]
wasm-bindgen = "0.2"
js-sys = "0.3"
wasm-bindgen-futures = "0.4"
console_error_panic_hook = "0.1.6"
anyhow = "1.0.82"
chrono = "0.4.38"
console_error_panic_hook = "0.1.6"
console_log = "1.0.0"
futures = "0.3.30"
js-sys = "0.3"
log = "0.4.21"
serde = "1.0.198"
rand = "0.8.5"
chrono = "0.4.38"
serde = "1.0.198"
serde-wasm-bindgen = "0.6.5"
serde_json = "1.0.116"
tokio = "1.40.0"
tracing = "0.1.40"
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"

[dependencies.linera-base]
path = "../linera-protocol/linera-base"
Expand All @@ -34,17 +38,15 @@ features = ["web"]
[dependencies.linera-client]
path = "../linera-protocol/linera-client"
default-features = false
features = ["web", "local-storage"]
features = ["web", "indexed-db"]

[dependencies.linera-core]
path = "../linera-protocol/linera-core"
features = ["web", "wasmer"]

# TODO I'm not totally convinced this should be here — used just for
# some network config that I'm not sure belongs in `linera-execution`
[dependencies.linera-execution]
path = "../linera-protocol/linera-execution"
features = ["web"]
features = ["web", "wasmer"]

[dependencies.linera-rpc]
path = "../linera-protocol/linera-rpc"
Expand Down
222 changes: 222 additions & 0 deletions client-worker/src/chain_listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
use std::{collections::btree_map, sync::Arc};

use futures::{
future::{self, Either},
lock::Mutex,
StreamExt,
};
use linera_base::{
crypto::KeyPair,
data_types::Timestamp,
identifiers::{ChainId, Destination},
time::Duration,
};
use linera_chain::data_types::OutgoingMessage;
use linera_core::{
client::{ChainClient, ChainClientError},
node::{LocalValidatorNodeProvider, LocalValidatorNode, ValidatorNodeProvider},
worker::Reason,
};
use linera_execution::{Message, SystemMessage};
use linera_storage::{Clock as _, Storage};
use tracing::{debug, error, info, warn, Instrument as _};

use linera_client::{chain_clients::ChainClients, chain_listener::{ClientContext, ChainListenerConfig}, wallet::Wallet, Error};

/// A `ChainListener` is a process that listens to notifications from validators and reacts
/// appropriately.
pub struct ChainListener<P, S>
where
S: Storage,
{
config: ChainListenerConfig,
clients: ChainClients<P, S>,
}

impl<P, S> ChainListener<P, S>
where
P: LocalValidatorNodeProvider + Send + Sync + 'static,
S: Storage + Clone + Send + Sync + 'static,
{
/// Creates a new chain listener given client chains.
pub fn new(config: ChainListenerConfig, clients: ChainClients<P, S>) -> Self {
Self { config, clients }
}

/// Runs the chain listener.
pub async fn run<C>(self, context: Arc<Mutex<C>>, storage: S)
where
C: ClientContext<ValidatorNodeProvider = P, Storage = S> + Send + 'static,
{
let chain_ids = context.lock().await.wallet().chain_ids();
for chain_id in chain_ids {
Self::run_with_chain_id(
chain_id,
self.clients.clone(),
context.clone(),
storage.clone(),
self.config.clone(),
);
}
}

#[tracing::instrument(level = "trace", skip_all, fields(?chain_id))]
fn run_with_chain_id<C>(
chain_id: ChainId,
clients: ChainClients<P, S>,
context: Arc<Mutex<C>>,
storage: S,
config: ChainListenerConfig,
) where
C: ClientContext<ValidatorNodeProvider = P, Storage = S> + Send + 'static,
{
let _handle = linera_base::task::spawn(
async move {
if let Err(err) =
Self::run_client_stream(chain_id, clients, context, storage, config).await
{
error!("Stream for chain {} failed: {}", chain_id, err);
}
}
.in_current_span(),
);
}

#[tracing::instrument(level = "trace", skip_all, fields(?chain_id))]
async fn run_client_stream<C>(
chain_id: ChainId,
clients: ChainClients<P, S>,
context: Arc<Mutex<C>>,
storage: S,
config: ChainListenerConfig,
) -> Result<(), Error>
where
C: ClientContext<ValidatorNodeProvider = P, Storage = S> + Send + 'static,
{
let client = {
let mut map_guard = clients.map_lock().await;
let context_guard = context.lock().await;
let btree_map::Entry::Vacant(entry) = map_guard.entry(chain_id) else {
// For every entry in the client map we are already listening to notifications, so
// there's nothing to do. This can happen if we download a child before the parent
// chain, and then process the OpenChain message in the parent.
return Ok(());
};
let client = context_guard.make_chain_client(chain_id);
entry.insert(client.clone());
client
};
let (listener, _listen_handle, mut local_stream) = client.listen().await?;
client.synchronize_from_validators().await?;
drop(linera_base::task::spawn(listener.in_current_span()));
let mut timeout = storage.clock().current_time();
loop {
let sleep = Box::pin(storage.clock().sleep_until(timeout));
let notification = match future::select(local_stream.next(), sleep).await {
Either::Left((Some(notification), _)) => notification,
Either::Left((None, _)) => break,
Either::Right(((), _)) => {
if config.skip_process_inbox {
debug!("Not processing inbox due to listener configuration");
timeout = Timestamp::from(u64::MAX);
continue;
}
debug!("Processing inbox");
match client.process_inbox_without_prepare().await {
Err(ChainClientError::CannotFindKeyForChain(_)) => continue,
Err(error) => {
warn!(%error, "Failed to process inbox.");
timeout = Timestamp::from(u64::MAX);
}
Ok((certs, None)) => {
info!("Done processing inbox ({} blocks created)", certs.len());
timeout = Timestamp::from(u64::MAX);
}
Ok((certs, Some(new_timeout))) => {
info!("Done processing inbox ({} blocks created)", certs.len());
info!("I will try processing the inbox later based on the given round timeout: {:?}", new_timeout);
timeout = new_timeout.timestamp;
}
}
context.lock().await.update_wallet(&client).await?;
continue;
}
};
info!("Received new notification: {:?}", notification);
Self::maybe_sleep(config.delay_before_ms).await;
match &notification.reason {
Reason::NewIncomingBundle { .. } => timeout = storage.clock().current_time(),
Reason::NewBlock { .. } | Reason::NewRound { .. } => {
if let Err(error) = client.update_validators().await {
warn!(
"Failed to update validators about the local chain after \
receiving notification {:?} with error: {:?}",
notification, error
);
}
}
}
Self::maybe_sleep(config.delay_after_ms).await;
let Reason::NewBlock { hash, .. } = notification.reason else {
continue;
};
{
context.lock().await.update_wallet(&client).await?;
}
let value = storage.read_hashed_certificate_value(hash).await?;
let Some(executed_block) = value.inner().executed_block() else {
error!("NewBlock notification about value without a block: {hash}");
continue;
};
let new_chains = executed_block
.messages()
.iter()
.flatten()
.filter_map(|outgoing_message| {
if let OutgoingMessage {
destination: Destination::Recipient(new_id),
message: Message::System(SystemMessage::OpenChain(open_chain_config)),
..
} = outgoing_message
{
let keys = open_chain_config
.ownership
.all_public_keys()
.cloned()
.collect::<Vec<_>>();
let timestamp = executed_block.block.timestamp;
Some((*new_id, keys, timestamp))
} else {
None
}
})
.collect::<Vec<_>>();
if new_chains.is_empty() {
continue;
}
let mut context_guard = context.lock().await;
for (new_id, owners, timestamp) in new_chains {
let key_pair = owners
.iter()
.find_map(|public_key| context_guard.wallet().key_pair_for_pk(public_key));
context_guard
.update_wallet_for_new_chain(new_id, key_pair, timestamp)
.await?;
Self::run_with_chain_id(
new_id,
clients.clone(),
context.clone(),
storage.clone(),
config.clone(),
);
}
}
Ok(())
}

async fn maybe_sleep(delay_ms: u64) {
if delay_ms > 0 {
linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
}
}
}
Loading