Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added retryWhen operator #781

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions reaktive/api/android/reaktive.api
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ public final class com/badoo/reaktive/completable/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/completable/Completable;
}

public final class com/badoo/reaktive/completable/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/completable/Completable;
}

public final class com/badoo/reaktive/completable/SubscribeKt {
public static final fun subscribe (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;)Lcom/badoo/reaktive/disposable/Disposable;
public static synthetic fun subscribe$default (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)Lcom/badoo/reaktive/disposable/Disposable;
Expand Down Expand Up @@ -492,6 +496,10 @@ public final class com/badoo/reaktive/maybe/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/maybe/Maybe;
}

public final class com/badoo/reaktive/maybe/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/maybe/Maybe;
}

public final class com/badoo/reaktive/maybe/SubscribeKt {
public static final fun subscribe (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/disposable/Disposable;
public static synthetic fun subscribe$default (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lcom/badoo/reaktive/disposable/Disposable;
Expand Down Expand Up @@ -861,6 +869,10 @@ public final class com/badoo/reaktive/observable/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/observable/Observable;
}

public final class com/badoo/reaktive/observable/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/observable/Observable;
}

public final class com/badoo/reaktive/observable/SampleKt {
public static final fun sample-8Mi8wO0 (Lcom/badoo/reaktive/observable/Observable;JLcom/badoo/reaktive/scheduler/Scheduler;)Lcom/badoo/reaktive/observable/Observable;
}
Expand Down Expand Up @@ -1246,6 +1258,10 @@ public final class com/badoo/reaktive/single/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/single/Single;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/single/Single;
}

public final class com/badoo/reaktive/single/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/single/Single;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/single/Single;
}

public abstract interface class com/badoo/reaktive/single/Single : com/badoo/reaktive/base/Source {
}

Expand Down
16 changes: 16 additions & 0 deletions reaktive/api/jvm/reaktive.api
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ public final class com/badoo/reaktive/completable/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/completable/Completable;
}

public final class com/badoo/reaktive/completable/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/completable/Completable;
}

public final class com/badoo/reaktive/completable/SubscribeKt {
public static final fun subscribe (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;)Lcom/badoo/reaktive/disposable/Disposable;
public static synthetic fun subscribe$default (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)Lcom/badoo/reaktive/disposable/Disposable;
Expand Down Expand Up @@ -492,6 +496,10 @@ public final class com/badoo/reaktive/maybe/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/maybe/Maybe;
}

public final class com/badoo/reaktive/maybe/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/maybe/Maybe;
}

public final class com/badoo/reaktive/maybe/SubscribeKt {
public static final fun subscribe (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/disposable/Disposable;
public static synthetic fun subscribe$default (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lcom/badoo/reaktive/disposable/Disposable;
Expand Down Expand Up @@ -861,6 +869,10 @@ public final class com/badoo/reaktive/observable/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/observable/Observable;
}

public final class com/badoo/reaktive/observable/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/observable/Observable;
}

public final class com/badoo/reaktive/observable/SampleKt {
public static final fun sample-8Mi8wO0 (Lcom/badoo/reaktive/observable/Observable;JLcom/badoo/reaktive/scheduler/Scheduler;)Lcom/badoo/reaktive/observable/Observable;
}
Expand Down Expand Up @@ -1246,6 +1258,10 @@ public final class com/badoo/reaktive/single/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/single/Single;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/single/Single;
}

public final class com/badoo/reaktive/single/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/single/Single;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/single/Single;
}

public abstract interface class com/badoo/reaktive/single/Single : com/badoo/reaktive/base/Source {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.badoo.reaktive.completable

import com.badoo.reaktive.observable.Observable
import com.badoo.reaktive.observable.asCompletable
import com.badoo.reaktive.observable.retryWhen

/**
* Returns a [Completable] that automatically resubscribes to this [Completable] if it signals `onError`
* and the [Observable] returned by the [handler] function emits a value for that specific [Throwable].
*
* Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Completable.html#retryWhen-io.reactivex.functions.Function-).
*/
fun Completable.retryWhen(handler: (Observable<Throwable>) -> Observable<*>): Completable =
asObservable()
.retryWhen(handler)
.asCompletable()
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.badoo.reaktive.maybe

import com.badoo.reaktive.observable.Observable
import com.badoo.reaktive.observable.firstOrComplete
import com.badoo.reaktive.observable.retryWhen

/**
* Returns a [Maybe] that automatically resubscribes to this [Maybe] if it signals `onError`
* and the [Observable] returned by the [handler] function emits a value for that specific [Throwable].
*
* Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Maybe.html#retryWhen-io.reactivex.functions.Function-).
*/
fun <T> Maybe<T>.retryWhen(handler: (Observable<Throwable>) -> Observable<*>): Maybe<T> =
asObservable()
.retryWhen(handler)
.firstOrComplete()
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.badoo.reaktive.observable

import com.badoo.reaktive.base.CompleteCallback
import com.badoo.reaktive.base.ValueCallback
import com.badoo.reaktive.completable.CompletableCallbacks
import com.badoo.reaktive.disposable.CompositeDisposable
import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.disposable.SerialDisposable
import com.badoo.reaktive.disposable.plusAssign
import com.badoo.reaktive.subject.publish.PublishSubject
import com.badoo.reaktive.utils.atomic.AtomicBoolean

/**
* Returns an [Observable] that automatically resubscribes to this [Observable] if it signals `onError`
* and the [Observable] returned by the [handler] function emits a value for that specific [Throwable].
*
* Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#retryWhen-io.reactivex.functions.Function-).
*/
fun <T> Observable<T>.retryWhen(handler: (Observable<Throwable>) -> Observable<*>): Observable<T> =
observable { emitter ->
val disposables = CompositeDisposable()
emitter.setDisposable(disposables)

val errorSubject = PublishSubject<Throwable>()
val isError = AtomicBoolean()

val disposableObserver =
object : SerialDisposable(), ObservableObserver<T>, ValueCallback<T> by emitter, CompleteCallback by emitter {
override fun onSubscribe(disposable: Disposable) {
replace(disposable)
}

override fun onError(error: Throwable) {
replace(null)
isError.value = true
errorSubject.onNext(error)
}
}

disposables += disposableObserver

handler(errorSubject).subscribe(
object : ObservableObserver<Any?>, CompletableCallbacks by emitter {
override fun onSubscribe(disposable: Disposable) {
disposables += disposable
}

override fun onNext(value: Any?) {
if (isError.compareAndSet(true, false)) {
subscribe(disposableObserver)
}
}
}
)

subscribe(disposableObserver)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.badoo.reaktive.single

import com.badoo.reaktive.observable.Observable
import com.badoo.reaktive.observable.firstOrError
import com.badoo.reaktive.observable.retryWhen

/**
* Returns a [Single] that automatically resubscribes to this [Single] if it signals `onError`
* and the [Observable] returned by the [handler] function emits a value for that specific [Throwable].
*
* Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#retryWhen-io.reactivex.functions.Function-).
*/
fun <T> Single<T>.retryWhen(handler: (Observable<Throwable>) -> Observable<*>): Single<T> =
asObservable()
.retryWhen(handler)
.firstOrError()
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package com.badoo.reaktive.completable

import com.badoo.reaktive.observable.flatMap
import com.badoo.reaktive.observable.observableOf
import com.badoo.reaktive.observable.retryWhen
import com.badoo.reaktive.test.base.assertError
import com.badoo.reaktive.test.base.assertNotError
import com.badoo.reaktive.test.base.hasSubscribers
import com.badoo.reaktive.test.completable.TestCompletable
import com.badoo.reaktive.test.completable.assertComplete
import com.badoo.reaktive.test.completable.test
import com.badoo.reaktive.test.observable.TestObservable
import com.badoo.reaktive.test.observable.TestObservableObserver
import com.badoo.reaktive.test.observable.assertValue
import com.badoo.reaktive.test.observable.test
import kotlin.test.Ignore
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNull
import kotlin.test.assertTrue

class RetryWhenTest : CompletableToCompletableTests by CompletableToCompletableTestsImpl({ retryWhen { it } }) {

private val upstream = TestCompletable()
private val retryObservable = TestObservable<Any?>()

@Ignore
@Test
override fun disposes_downstream_disposable_WHEN_upstream_produced_error() {
// Not applicable
}

@Ignore
@Test
override fun produces_error_WHEN_upstream_produced_error() {
// Not applicable
}

@Test
fun calls_handler_WHEN_subscribed() {
var isCalled = false

upstream
.retryWhen {
isCalled = true
retryObservable
}
.test()

assertTrue(isCalled)
}

@Test
fun produces_error_WHEN_handler_throws() {
val exception = Exception()

val observer = upstream.retryWhen { throw exception }.test()

observer.assertError(exception)
}

@Test
fun subscribes_to_retry_observable_WHEN_subscribed() {
upstream.retryWhen { retryObservable }.test()

assertEquals(1, retryObservable.subscriptionCount)
}

@Test
fun disposes_retry_observable_WHEN_disposed() {
val observer = upstream.retryWhen { retryObservable }.test()

observer.dispose()

assertFalse(retryObservable.hasSubscribers)
}

@Test
fun disposes_retry_observable_WHEN_upstream_completed() {
upstream.retryWhen { retryObservable }.test()

upstream.onComplete()

assertFalse(retryObservable.hasSubscribers)
}

@Test
fun does_not_dispose_retry_observable_WHEN_upstream_produced_error() {
upstream.retryWhen { retryObservable }.test()

upstream.onError(Exception())

assertEquals(1, retryObservable.subscriptionCount)
}

@Test
fun does_not_produce_error_WHEN_upstream_produced_error() {
val observer = upstream.retryWhen { retryObservable }.test()

upstream.onError(Exception())

assertNull(observer.error)
}

@Test
fun does_not_complete_WHEN_upstream_produced_error() {
val observer = upstream.retryWhen { retryObservable }.test()

upstream.onError(Exception())

assertFalse(observer.isComplete)
}

@Test
fun error_observable_emits_exception_WHEN_upstream_produced_error() {
val exception = Exception()
lateinit var errorObserver: TestObservableObserver<Any?>

upstream
.retryWhen {
errorObserver = it.test()
retryObservable
}
.test()

upstream.onError(exception)

errorObserver.assertValue(exception)
}

@Test
fun completes_WHEN_retry_observable_completed() {
val observer = upstream.retryWhen { retryObservable }.test()

retryObservable.onComplete()

observer.assertComplete()
}

@Test
fun produces_error_WHEN_retry_observable_produced_error() {
val exception = Exception()
val observer = upstream.retryWhen { retryObservable }.test()

retryObservable.onError(exception)

observer.assertError(exception)
}

@Test
fun does_not_produce_error_WHEN_retry_observable_emitted_value() {
val observer = upstream.retryWhen { retryObservable }.test()

retryObservable.onNext(1)

observer.assertNotError()
}

@Test
fun subscribes_to_upstream_WHEN_upstream_produced_error_and_retry_observable_emitted_value() {
upstream.retryWhen { retryObservable }.test()
upstream.onError(Exception())
upstream.reset()

retryObservable.onNext(1)

assertEquals(1, upstream.subscriptionCount)
}

@Test
fun completes_WHEN_resubscribed_after_error_and_upstream_completed() {
val observer = upstream.retryWhen { retryObservable }.test()
upstream.onError(Exception())
upstream.reset()
retryObservable.onNext(1)

upstream.onComplete()

observer.assertComplete()
}

@Test
fun resubscribes_to_upstream_only_once_WHEN_upstream_produced_error_and_retry_observable_emitted_multiple_values_synchronously() {
upstream
.retryWhen { errors ->
errors.flatMap {
upstream.reset()
observableOf(1, 2)
}
}.test()

upstream.onError(Exception())

assertEquals(1, upstream.subscriptionCount)
}
}
Loading