Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share running futures #81

Closed
tagantroy opened this issue May 12, 2021 · 3 comments
Closed

Share running futures #81

tagantroy opened this issue May 12, 2021 · 3 comments

Comments

@tagantroy
Copy link

Hello,
Thanks for this awesome crate!

My team use this crate to cache results of async function to prevent expensive io/computations. We run processing in parallel, but caching works only for sequential requests. For example:

static COUNTER: AtomicI64 = AtomicI64::new(0); 

#[tokio::main]
async fn main() {
    let result: Vec<i64> = (0..=20).into_iter() // run 20 iterations
        .map(|_| process_input("test".to_string()))  // async processing
        .collect::<FuturesUnordered<_>>()
        .collect::<Vec<_>>().await;
    println!("actual: {:?}", result);
    println!("expected: {:?}", vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
}

#[cached]
async fn process_input(input: String) -> i64 {
    let data = COUNTER.fetch_add(1, Ordering::SeqCst);
    sleep(Duration::from_secs(2)).await; // simulate io/processing
    return data;
}

prints:

actual: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
expected: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

Is this desired behavior?

To workaround this problem we added wrapper function:

static COUNTER: AtomicI64 = AtomicI64::new(0);

#[tokio::main]
async fn main() {
    let result: Vec<i64> = (0..=20).into_iter() // run 20 iterations
        .map(|_| process_input_wrapper("test".to_string())) // async processing
        .collect::<FuturesUnordered<_>>()
        .collect::<Vec<_>>().await;
    println!("actual: {:?}", result);
    println!("expected: {:?}", vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
}

#[cached]
fn process_input_wrapper(input: String) -> Shared<
    Pin<Box<dyn Future<Output=i64> + std::marker::Send>>,
> {
    return process_input(input).boxed().shared();
}

async fn process_input(input: String) -> i64 {
    // heavy input processing
    let data = COUNTER.fetch_add(1, Ordering::SeqCst);
    sleep(Duration::from_secs(2)).await; // simulate io/processing
    return data;
}

prints:

actual: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
expected: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

What is the correct way to solve it? Is it possible to implement support for sharing running futures in this crate?

@jaemk
Copy link
Owner

jaemk commented May 12, 2021

I believe what you're looking for is an option to force the cached macro to lock the underlying cache based on input keys to force concurrent calls with the same key to happen synchronously? #62 is also asking for this. I looks like your wrapper gives you what you want since you're caching the first future instead of caching the result of the futures. Support could definitely be added for this (I just don't have much bandwidth right now) - maybe a macro arg like synchronize_keys or something.

@tagantroy
Copy link
Author

@jaemk thanks for quick reply.
Yes, exactly as you described.
I’ll try implement it on weekend.
Closing this issue as duplicate of #62

@jaemk
Copy link
Owner

jaemk commented Jan 23, 2022

@tagantroy I'm not sure where you ended up with this, but support for this was added a couple months ago:

#62 (comment)

The latest version (0.26.1) adds an option sync_writes to the #[cached] macro to support this - see fb88d7f

So when you pass sync_writes=true, then on cache misses the shared lock is acquired before any computation happens to calculate and set the new value. For your example above, that would mean that any number of your futures may see the cache is empty, but only one would be able to re-acquire the lock to compute and set the cache value - then all remaining futures would either see the cached value; or finish re-acquiring the lock to do their computation, but would then see the cache value and return before re-computing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants