Skip to content
Sean Parent edited this page Apr 7, 2016 · 2 revisions
#include <stlab/channel.hpp>
#include <stlab/future.hpp>

#include <iostream>
#include <string>
#include <utility>
#include <vector>

using namespace std;
using namespace stlab;

/*
    sum is an example of an accumulating "co-routine". It will await for values, keeping an 
    internal sum, until the channel is closed and then it will yield the result as a string.
*/
struct sum {
    process_state _state = process_state::await;
    int _sum = 0;

    void await(int n) { _sum += n; }

    int yield() { _state = process_state::await; return _sum; }

    void close() { _state = process_state::yield; }

    auto state() const { return _state; }
};

int main() {
    /*
        Create a channel to aggregate our values.
    */
    sender<int> aggregate;
    receiver<int> receiver;
    tie(aggregate, receiver) = channel<int>();

    /*
        Create a vector to hold all the futures for each result as it is piped to channel.
        The future is of type <void> because the value is passed into the channel.
    */
    vector<stlab::future<void>> results;

    for (int n = 0; n != 10; ++n) {
        // Asynchronously generate a bunch of values.
        results.emplace_back(async(default_scheduler(), [_n = n]{ return _n; })
            // Then send those values into a copy of the channel
            .then([_aggregate = aggregate](int n) {
                _aggregate(n);
            }));
    }
    // Now it is safe to close (or destruct) this channel, all the copies remain open.
    aggregate.close();

    auto pipe = receiver
        /*
            The receiver is our common end point - we attach the vector of futures to it (another)
            inefficiency here - this is a lambda whose only purpose is to hold the vector of
            futures.
        */
        | [ _results = move(results) ](auto x){ return x; }
        // Then we can pipe the values to our accumulator
        | sum()
        // And pipe the final value to a lambda to print it.
        // Returning void from the pipe will mark it as ready.
        | [](auto x){ cout << x << endl; };

    receiver.set_ready(); // close this end of the pipe

    // Wait for everything to execute (just for demonstration)
    sleep(100);
}
Clone this wiki locally