Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Flow.all/any/none operators #4212

Open
CLOVIS-AI opened this issue Aug 12, 2024 · 3 comments
Open

Introduce Flow.all/any/none operators #4212

CLOVIS-AI opened this issue Aug 12, 2024 · 3 comments

Comments

@CLOVIS-AI
Copy link
Contributor

CLOVIS-AI commented Aug 12, 2024

Use case

I have a complex Flow that possesses many elements. I have a business rule that is, literally, to do something if any of them satisfies a condition.

If I were using List or Set, I would use any { theCondition(it) }. However, Flow doesn't seem to have any.

There is an old issue (#2239) that asks for this feature, and is closed because of alternative implementations:

suspend fun <T : Any> Flow<T>.any(predicate: suspend (T) -> Boolean): Boolean {
    return this.firstOrNull { predicate(it) } != null
}

suspend fun <T> Flow<T>.all(predicate: suspend (T) -> Boolean): Boolean {
    return this.count { !predicate(it) } == 0
}

I dislike this solution because:

  • The first one doesn't work with a Flow<Foo?> because firstOrNull is only available on non-nullable types.
  • Both options obfuscate the operation I am actually attempting to perform.

I believe it is worth having any/all/none directly in the library because the proposed implementation have downsides.

The Shape of the API

suspend fun <T> Flow<T>.any(predicate: suspend (T) -> Boolean): Boolean
suspend fun <T> Flow<T>.all(predicate: suspend (T) -> Boolean): Boolean
suspend fun <T> Flow<T>.none(predicate: suspend (T) -> Boolean): Boolean

Prior Art

These three functions are already available on Iterable and Sequence. The behavior should be the same.

all and none can use the count operator under the hood, since it already shortcuts. I believe this is a possible implementation of any, though I haven't tested it yet:

suspend fun <T> Flow<T>.any(predicate: suspend (T) -> Boolean) = this
    .transformWhile {
        if (predicate(it)) {
            emit(true)
            false // ONE element MATCHES, no need to go further
        } else {
            true // continue
        }
    }
    .onEmpty { emit(false) }
    .first()

I can submit a PR if you think this is worth pursuing.

@CLOVIS-AI
Copy link
Contributor Author

Simplified implementation:

suspend fun <T> Flow<T>.any(predicate: (T) -> Boolean) = this
    .filter { predicate(it) }
    .map { true }
    .onEmpty { emit(false) }
    .first()

@dkhalanskyjb
Copy link
Collaborator

We had a Slack discussion about the use case which prompted this (https://kotlinlang.slack.com/archives/C1CFAFJSK/p1723480063142319).

What I took away from that discussion surprised me, but I now believe that Flow is applicable everywhere Sequence is applicable, meaning that we should add Sequence API to Flow on demand without asking for use cases.

The use case is completely linear, with few hints at asynchronous behavior. A list is taken, converted to a Flow, then map { aSuspendingFunction() } is performed, and last, any { } collects the result.

Conceptually, Sequence is a good choice here: the use case is not to create cold streams of values, it's to enable short-circuiting behavior while traversing the list as a sequence. Calling a suspending function in map, however, breaks this nice concept and forces Flow into the code, even if we attempt to utilize the sequence function that allows running suspend code:

  • Structured concurrency is broken if we call suspend funfunsuspend fun. The middle fun can be runBlocking, or it can be sequence { }.any { }—in any case, cancellation stops working unless you write bespoke code to preserve it.
  • The thread is hogged by fun if we call suspend funfunsuspend fun, so functions that call suspend functions are a bit safer to use if the middle fun becomes a suspend fun as well.
  • If we have suspend fun running in a Sequence, the fairly natural desire to actually make the code run asynchronously will not work, as Sequence lacks the necessary facilities. The edit distance for adding buffering to a Flow is tiny, but adding buffering to a Sequence means rewriting everything to Flow.

It seems to me like kotlinx.coroutines users should be able to use Flow whenever a Sequence is idiomatic, if only to break the suspend funfunsuspend fun call chain.

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Aug 13, 2024

That's a really nice angle!

Originally, we thought of a Flow as an asynchronous cold (push-based) stream. It was never supposed to be a short-circuit analogy of channels or a suspendable sequence. Thus, not only was operator parity not a concern, but we also were extremely cautious about adding new ones as we made this mistake with channel operators. That's why we asked for use cases even for the trivial operators (also, the idea of "Flow operators are trivial, you can write your own in a few lines of code" is still here). We wanted to minimize API surface, minimize the potential of misuse and nudge people to a sequence where they needed a sequence.

Yet the reality begs to differ -- among other things, Flow is indeed used as a suspendable sequence, people expect all the batteries to be included and, if somethings looks like a sequence, quacks like a sequence and walks like a sequence [in trivial scenarios] it's better to have operators like a sequence.

There are not really many upsides of keeping the status quo, I think

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants