Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/refactor_subscription' into Silk…
Browse files Browse the repository at this point in the history
…ovAlexander/fix_alternativ_params
  • Loading branch information
SilkovAlexander committed Oct 7, 2022
2 parents 6b8afcf + 437e081 commit 8f01f1f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 36 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 20 additions & 26 deletions src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* See the License for the specific TON DEV software governing permissions and
* limitations under the License.
*/
use std::sync::Arc;
use crate::helpers::{check_dir, create_client_verbose, json_account, print_account, query_account_field};
use crate::config::Config;
use serde_json::{json, Value};
Expand Down Expand Up @@ -302,17 +303,6 @@ pub async fn dump_accounts(config: &Config, addresses: Vec<String>, path: Option
Ok(())
}

lazy_static::lazy_static! {
static ref TX: tokio::sync::Mutex<Option<tokio::sync::mpsc::Sender<Result<(), String>>>> =
tokio::sync::Mutex::new(None);
}

async fn terminate(res: Result<(), String>) {
let lock = TX.lock().await;
let tx = lock.as_ref().unwrap().clone();
tx.send(res).await.unwrap();
}

fn extract_last_trans_lt(v: &serde_json::Value) -> Option<&str> {
v.as_object()?["last_trans_lt"].as_str()
}
Expand Down Expand Up @@ -340,22 +330,26 @@ pub async fn wait_for_change(config: &Config, account_address: &str, wait_secs:
.ok_or_else(|| format!("Failed to parse query result: {}", query.result[0]))?;

let (s, mut r) = tokio::sync::mpsc::channel(1);
*TX.lock().await = Some(s);
let s = Arc::new(s);

let callback = |result: Result<ResultOfSubscription, ClientError>| async {
let res = match result {
Ok(res) => {
if extract_last_trans_lt(&res.result).is_some() {
Ok(())
} else {
Err(format!("Can't parse the result: {}", res.result))
let ss = s.clone();
let callback = move |result: Result<ResultOfSubscription, ClientError>| {
let s = ss.clone();
async move {
let res = match result {
Ok(res) => {
if extract_last_trans_lt(&res.result).is_some() {
Ok(())
} else {
Err(format!("Can't parse the result: {}", res.result))
}
}
}
Err(e) => {
Err(format!("Client error: {}", e))
}
};
terminate(res).await
Err(e) => {
Err(format!("Client error: {}", e))
}
};
s.send(res).await.unwrap()
}
};

let subscription = ton_client::net::subscribe_collection(
Expand All @@ -378,7 +372,7 @@ pub async fn wait_for_change(config: &Config, account_address: &str, wait_secs:

tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(wait_secs)).await;
terminate(Err("Timeout".to_owned())).await
s.send(Err("Timeout".to_owned())).await.unwrap()
});

let res = r.recv().await.ok_or_else(|| "Sender has dropped".to_owned())?;
Expand Down

0 comments on commit 8f01f1f

Please sign in to comment.