diff --git a/CHANGELOG.md b/CHANGELOG.md index 4472aca98..410164a10 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## Pending changes -– +- [#289](https://github.com/bumble-tech/appyx/issues/289) – **Added**: Introduced `interop-rx3` for RxJava 3 support. This has identical functionality to `interop-rx2`. --- diff --git a/documentation/releases/downloads.md b/documentation/releases/downloads.md index 5a47d1e3d..322f482d0 100644 --- a/documentation/releases/downloads.md +++ b/documentation/releases/downloads.md @@ -38,8 +38,9 @@ dependencies { ```groovy dependencies { - // Optional support for RxJava 2 + // Optional support for RxJava 2/3 implementation "com.bumble.appyx:interop-rx2:$version" + implementation "com.bumble.appyx:interop-rx3:$version" // Optional interoperability layer between Appyx and badoo/RIBs // You have to add https://jitpack.io repository to use it because badoo/RIBs is hosted there diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index e346e0a80..386f83c26 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -13,8 +13,6 @@ ribs = "0.36.1" mvicore = "1.2.6" coroutines = "1.6.4" kotlin = "1.7.10" -rxjava2 = "2.2.21" -rxandroid = "2.1.1" junit5 = "5.8.2" detekt = "1.21.0" dependencyAnalysis = "1.13.1" @@ -57,11 +55,13 @@ ribs-base-test = { module = "com.github.badoo.RIBs:rib-base-test", version.ref = ribs-base-test-activity = { module = "com.github.badoo.RIBs:rib-base-test-activity", version.ref = "ribs" } ribs-base-test-rx2 = { module = "com.github.badoo.RIBs:rib-base-test-rx2", version.ref = "ribs" } ribs-compose = { module = "com.github.badoo.RIBs:rib-compose", version.ref = "ribs" } -ribs-rx = { module = "com.github.badoo.RIBs:rib-rx2", version.ref = "ribs" } -rxjava2 = { module = "io.reactivex.rxjava2:rxjava", version.ref = "rxjava2" } -rxandroid = { module = "io.reactivex.rxjava2:rxandroid", version.ref = "rxandroid" } -rxrelay = "com.jakewharton.rxrelay2:rxrelay:2.1.1" +rxjava2 = "io.reactivex.rxjava2:rxjava:2.2.21" +rxjava3 = "io.reactivex.rxjava3:rxjava:3.1.5" +rxandroid2 = "io.reactivex.rxjava2:rxandroid:2.1.1" +rxandroid3 = "io.reactivex.rxjava3:rxandroid:3.0.2" +rxrelay2 = "com.jakewharton.rxrelay2:rxrelay:2.1.1" +rxrelay3 = "com.jakewharton.rxrelay3:rxrelay:3.0.1" kotlin-test = { module = "org.jetbrains.kotlin:kotlin-test", version.ref = "kotlin" } junit-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit5" } diff --git a/libraries/interop-rx2/build.gradle.kts b/libraries/interop-rx2/build.gradle.kts index 5006f3984..86db0b1bd 100644 --- a/libraries/interop-rx2/build.gradle.kts +++ b/libraries/interop-rx2/build.gradle.kts @@ -26,7 +26,7 @@ android { dependencies { api(project(":libraries:core")) api(libs.rxjava2) - api(libs.rxrelay) + api(libs.rxrelay2) implementation(libs.kotlin.coroutines.rx2) implementation(libs.androidx.lifecycle.java8) diff --git a/libraries/interop-rx3/build.gradle.kts b/libraries/interop-rx3/build.gradle.kts new file mode 100644 index 000000000..e86b27b40 --- /dev/null +++ b/libraries/interop-rx3/build.gradle.kts @@ -0,0 +1,35 @@ +plugins { + id("com.android.library") + id("kotlin-android") + id("appyx-publish-android") + id("appyx-lint") + id("appyx-detekt") +} + +android { + namespace = "com.bumble.appyx.interop.rx3" + compileSdk = libs.versions.androidCompileSdk.get().toInt() + + defaultConfig { + minSdk = libs.versions.androidMinSdk.get().toInt() + targetSdk = libs.versions.androidTargetSdk.get().toInt() + + testInstrumentationRunner = "androidx.test.runner.AndroidJUnitRunner" + } + testOptions { + unitTests.all { + it.useJUnitPlatform() + } + } +} + +dependencies { + api(project(":libraries:core")) + api(libs.rxjava3) + api(libs.rxrelay3) + + implementation(libs.androidx.lifecycle.java8) + + testImplementation(libs.junit.api) + testRuntimeOnly(libs.junit.engine) +} diff --git a/libraries/interop-rx3/lint-baseline.xml b/libraries/interop-rx3/lint-baseline.xml new file mode 100644 index 000000000..27ab162a6 --- /dev/null +++ b/libraries/interop-rx3/lint-baseline.xml @@ -0,0 +1,4 @@ + + + + diff --git a/libraries/interop-rx3/src/main/kotlin/com/bumble/appyx/interop/rx3/connectable/Connectable.kt b/libraries/interop-rx3/src/main/kotlin/com/bumble/appyx/interop/rx3/connectable/Connectable.kt new file mode 100644 index 000000000..8dd9f604b --- /dev/null +++ b/libraries/interop-rx3/src/main/kotlin/com/bumble/appyx/interop/rx3/connectable/Connectable.kt @@ -0,0 +1,9 @@ +package com.bumble.appyx.interop.rx3.connectable + +import com.bumble.appyx.core.plugin.NodeLifecycleAware +import com.jakewharton.rxrelay3.Relay + +interface Connectable : NodeLifecycleAware { + val input: Relay + val output: Relay +} diff --git a/libraries/interop-rx3/src/main/kotlin/com/bumble/appyx/interop/rx3/connectable/NodeConnector.kt b/libraries/interop-rx3/src/main/kotlin/com/bumble/appyx/interop/rx3/connectable/NodeConnector.kt new file mode 100644 index 000000000..f610176e3 --- /dev/null +++ b/libraries/interop-rx3/src/main/kotlin/com/bumble/appyx/interop/rx3/connectable/NodeConnector.kt @@ -0,0 +1,60 @@ +package com.bumble.appyx.interop.rx3.connectable + +import androidx.lifecycle.Lifecycle +import com.bumble.appyx.core.lifecycle.subscribe +import com.jakewharton.rxrelay3.PublishRelay +import com.jakewharton.rxrelay3.Relay +import io.reactivex.rxjava3.core.Observer + +class NodeConnector( + override val input: Relay = PublishRelay.create(), +) : Connectable { + + private val intake: Relay = PublishRelay.create() + private val exhaust: Relay = PublishRelay.create() + private var isFlushed = false + private val outputCache = mutableListOf() + + override val output: Relay = object : Relay() { + + override fun subscribeActual(observer: Observer) { + exhaust.subscribe(observer) + } + + override fun accept(value: Output) { + intake.accept(value) + } + + override fun hasObservers() = exhaust.hasObservers() + + } + + override fun onCreate(lifecycle: Lifecycle) { + lifecycle.subscribe(onCreate = { flushOutputCache() }) + } + + private val cacheSubscription = intake.subscribe { + synchronized(this) { + if (!isFlushed) { + outputCache.add(it) + } else { + exhaust.accept(it) + switchToExhaust() + } + } + } + + private fun flushOutputCache() { + synchronized(this) { + if (isFlushed) error("Already flushed") + isFlushed = true + outputCache.forEach { exhaust.accept(it) } + outputCache.clear() + } + } + + private fun switchToExhaust() { + intake.subscribe { exhaust.accept(it) } + cacheSubscription.dispose() + } +} diff --git a/libraries/interop-rx3/src/main/kotlin/com/bumble/appyx/interop/rx3/plugin/DisposeOnDestroy.kt b/libraries/interop-rx3/src/main/kotlin/com/bumble/appyx/interop/rx3/plugin/DisposeOnDestroy.kt new file mode 100644 index 000000000..f0f1b5da4 --- /dev/null +++ b/libraries/interop-rx3/src/main/kotlin/com/bumble/appyx/interop/rx3/plugin/DisposeOnDestroy.kt @@ -0,0 +1,17 @@ +package com.bumble.appyx.interop.rx3.plugin + +import com.bumble.appyx.core.plugin.Destroyable +import com.bumble.appyx.core.plugin.Plugin +import io.reactivex.rxjava3.disposables.CompositeDisposable +import io.reactivex.rxjava3.disposables.Disposable + +private class DisposeOnDestroy(disposables: List) : Destroyable { + private val disposable = CompositeDisposable(disposables) + + override fun destroy() { + disposable.dispose() + } +} + +fun disposeOnDestroyPlugin(vararg disposables: Disposable): Plugin = + DisposeOnDestroy(disposables.toList()) diff --git a/libraries/interop-rx3/src/test/kotlin/com/bumble/appyx/interop/rx3/connectable/Rx3NodeConnectorTest.kt b/libraries/interop-rx3/src/test/kotlin/com/bumble/appyx/interop/rx3/connectable/Rx3NodeConnectorTest.kt new file mode 100644 index 000000000..53b907569 --- /dev/null +++ b/libraries/interop-rx3/src/test/kotlin/com/bumble/appyx/interop/rx3/connectable/Rx3NodeConnectorTest.kt @@ -0,0 +1,269 @@ +package com.bumble.appyx.interop.rx3.connectable + +import androidx.lifecycle.DefaultLifecycleObserver +import androidx.lifecycle.Lifecycle +import androidx.lifecycle.LifecycleObserver +import androidx.lifecycle.LifecycleOwner +import androidx.lifecycle.LifecycleRegistry +import com.bumble.appyx.interop.rx3.connectable.Rx3NodeConnectorTest.Output.Output1 +import com.bumble.appyx.interop.rx3.connectable.Rx3NodeConnectorTest.Output.Output2 +import com.bumble.appyx.interop.rx3.connectable.Rx3NodeConnectorTest.Output.Output3 +import io.reactivex.rxjava3.observers.TestObserver +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertThrows +import org.junit.jupiter.api.RepeatedTest +import org.junit.jupiter.api.Test +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + +internal class Rx3NodeConnectorTest { + + private val firstTestObserver = TestObserver() + private val secondTestObserver = TestObserver() + private var lifecycleState = Lifecycle.State.CREATED + private val lifecycle = object : Lifecycle() { + + private val lifecycleOwner = object : LifecycleOwner { + override fun getLifecycle(): Lifecycle = LifecycleRegistry(this) + } + + override fun addObserver(observer: LifecycleObserver) { + if (lifecycleState == State.CREATED) { + (observer as DefaultLifecycleObserver).onCreate(lifecycleOwner) + } + } + + override fun removeObserver(observer: LifecycleObserver) { + // Deliberately empty + } + + override fun getCurrentState() = lifecycleState + } + + sealed class Output { + object Output1 : Output() + object Output2 : Output() + object Output3 : Output() + } + + @AfterEach + fun tearDown() { + firstTestObserver.dispose() + secondTestObserver.dispose() + } + + @Test + fun `GIVEN nodeConnector onAttached is not called WHEN output is accepted THEN accepted output do not reach observer`() { + val nodeConnector = NodeConnector() + nodeConnector.output.subscribe(firstTestObserver) + + nodeConnector.output.accept(Output1) + + firstTestObserver.assertValueCount(0) + } + + @Test + fun `GIVEN an output is accepted before onAttached WHEN nodeConnector onAttached is called THEN accepted output reach the observer`() { + val nodeConnector = NodeConnector() + nodeConnector.output.subscribe(firstTestObserver) + + nodeConnector.output.accept(Output1) + nodeConnector.onCreate(lifecycle) + + firstTestObserver.assertValues(Output1) + } + + @Test + fun `GIVEN nodeConnector is attached WHEN output is accepted THEN every accepted output reach the observer`() { + val nodeConnector = NodeConnector() + nodeConnector.output.subscribe(firstTestObserver) + + nodeConnector.onCreate(lifecycle) + nodeConnector.output.accept(Output1) + + firstTestObserver.assertValues(Output1) + } + + @Test + fun `GIVEN outputs accepted before and after onAttached WHEN node is attached THEN every accepted output reach the observer`() { + val nodeConnector = NodeConnector() + nodeConnector.output.subscribe(firstTestObserver) + + nodeConnector.output.accept(Output1) + nodeConnector.onCreate(lifecycle) + nodeConnector.output.accept(Output2) + nodeConnector.output.accept(Output3) + + firstTestObserver.assertValues(Output1, Output2, Output3) + } + + @Test + fun `WHEN nodeConnector onAttached is called twice THEN error is raised`() { + val nodeConnector = NodeConnector() + + nodeConnector.onCreate(lifecycle) + assertThrows(IllegalStateException::class.java) { + nodeConnector.onCreate(lifecycle) + } + } + + @Test + fun `GIVEN multiple observers and output is accepted before OnAttached WHEN nodeConnector onAttached is called THEN every accepted output reach the observers`() { + val nodeConnector = NodeConnector() + nodeConnector.output.subscribe(firstTestObserver) + nodeConnector.output.subscribe(secondTestObserver) + + nodeConnector.output.accept(Output1) + nodeConnector.onCreate(lifecycle) + + firstTestObserver.assertValues(Output1) + secondTestObserver.assertValues(Output1) + } + + @Test + fun `GIVEN multiple observers and nodeConnector is attached WHEN output is accepted THEN every accepted output reach the observer`() { + val nodeConnector = NodeConnector() + nodeConnector.output.subscribe(firstTestObserver) + nodeConnector.output.subscribe(secondTestObserver) + + nodeConnector.onCreate(lifecycle) + nodeConnector.output.accept(Output1) + + firstTestObserver.assertValues(Output1) + secondTestObserver.assertValues(Output1) + } + + @Test + fun `GIVEN multiple observers that subscribe before and after onAttached and outputs accepted before and after onAttached WHEN node is attached THEN every accepted output reach the observer`() { + val nodeConnector = NodeConnector() + //First subscriber subscribe BEFORE onAttached + nodeConnector.output.subscribe(firstTestObserver) + + //Output accepted BEFORE onAttached + nodeConnector.output.accept(Output1) + nodeConnector.onCreate(lifecycle) + + //Second subscriber subscribe AFTER onAttached + nodeConnector.output.subscribe(secondTestObserver) + + //Outputs accepted AFTER onAttached + nodeConnector.output.accept(Output2) + nodeConnector.output.accept(Output3) + + firstTestObserver.assertValues(Output1, Output2, Output3) + secondTestObserver.assertValues(Output2, Output3) + + } + + + @Test + fun `WHEN multiple output are accepted from multiple threads THEN output is correctly received when onAttached is called`() { + val nodeConnector = NodeConnector() + val threadNumber = 100 + val iterations = 10000 + val barrier = CyclicBarrier(threadNumber + 1) + val executor = Executors.newFixedThreadPool(threadNumber).apply { + repeat(threadNumber) { + submit { + barrier.awaitWithTimeOut() + repeat(iterations) { + nodeConnector.output.accept(Output1) + nodeConnector.output.accept(Output2) + nodeConnector.output.accept(Output3) + + } + } + } + } + + //Unlock threads(trip barrier) and wait for them to complete + barrier.awaitWithTimeOut() + executor.shutdown() + executor.awaitWithTimeOut() + + nodeConnector.output.subscribe(firstTestObserver) + nodeConnector.onCreate(lifecycle) + + firstTestObserver.assertValueCount(threadNumber * iterations * 3) + } + + + /** + * Why is this test repeated 1000 times? + * This test exist to ensure that there is no race condition issues under the threading scenario described by the test. + * To do so, a high amount of repetitions must be executed as when the race condition is happening, it could only produce + * an unexpected result in some % of the case. + * E.g: Race condition between Thread1 and Thread2 + * When Thread1 is executed first -> Desired Scenario + * When Thread2 is executed first -> Undesired Scenario + * Under this example if both threads get the lock with the same priority, the % of each case would be 50%. + * + * Why is the test logic duplicated? + * Seems that CyclicBarrier is a bit biased and is giving preference to the first thread awaiting to be executed first. + * So to balance this weighed thread execution priority, we switch order and test both in the same test to increase the + * % of failure when race condition issue is present. + */ + @RepeatedTest(1000) + fun `WHEN accept and onAttached are called by different thread at the same time THEN output is the expected`() { + val nodeConnector1 = NodeConnector() + val nodeConnector2 = NodeConnector() + val threadNumber = 2 + val barrier1 = CyclicBarrier(threadNumber + 1) + val barrier2 = CyclicBarrier(threadNumber + 1) + + val executor = Executors.newFixedThreadPool(threadNumber).apply { + //Emitter thread + submit { + barrier1.awaitWithTimeOut() + nodeConnector1.output.accept(Output1) + } + //Attacher thread + submit { + barrier1.awaitWithTimeOut() + nodeConnector1.onCreate(lifecycle) + } + } + val executor2 = Executors.newFixedThreadPool(threadNumber).apply { + //Attacher thread + submit { + barrier2.awaitWithTimeOut() + nodeConnector2.onCreate(lifecycle) + } + //Emitter thread + submit { + barrier2.awaitWithTimeOut() + nodeConnector2.output.accept(Output1) + } + } + + //subscribe nodes + nodeConnector1.output.subscribe(firstTestObserver) + nodeConnector2.output.subscribe(secondTestObserver) + + //Unlock threads(trip barrier) and wait for them to complete + barrier1.awaitWithTimeOut() + barrier2.awaitWithTimeOut() + executor.shutdown() + executor2.shutdown() + executor.awaitWithTimeOut() + executor2.awaitWithTimeOut() + + firstTestObserver.assertValues(Output1) + secondTestObserver.assertValues(Output1) + } + + + private fun CyclicBarrier.awaitWithTimeOut() { + await(TIME_OUT_S, TimeUnit.SECONDS) + } + + private fun ExecutorService.awaitWithTimeOut() { + awaitTermination(TIME_OUT_S, TimeUnit.SECONDS) + } + + companion object { + private const val TIME_OUT_S = 30L + } +} diff --git a/libraries/interop-rx3/src/test/kotlin/com/bumble/appyx/interop/rx3/plugin/DisposeOnDestroyTest.kt b/libraries/interop-rx3/src/test/kotlin/com/bumble/appyx/interop/rx3/plugin/DisposeOnDestroyTest.kt new file mode 100644 index 000000000..ce83d673b --- /dev/null +++ b/libraries/interop-rx3/src/test/kotlin/com/bumble/appyx/interop/rx3/plugin/DisposeOnDestroyTest.kt @@ -0,0 +1,45 @@ +package com.bumble.appyx.interop.rx3.plugin + +import com.bumble.appyx.core.plugin.Destroyable +import io.reactivex.rxjava3.disposables.Disposable +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertInstanceOf +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test + +internal class DisposeOnDestroyTest { + @Test + fun `WHEN dispose on destroy plugin created THEN verify is destroyable type`() { + assertInstanceOf(Destroyable::class.java, disposeOnDestroyPlugin()) + } + + @Test + fun `GIVEN dispose on destroy plugin created with disposable WHEN destroy THEN disposable is disposed`() { + val disposable = Disposable.empty() + val disposeOnDestroyPlugin = disposeOnDestroyPlugin(disposable) + + (disposeOnDestroyPlugin as Destroyable).destroy() + + assertTrue(disposable.isDisposed) + } + + @Test + fun `GIVEN dispose on destroy plugin created with multiple disposables WHEN destroy THEN all disposables are disposed`() { + val disposable1 = Disposable.empty() + val disposable2 = Disposable.empty() + val disposeOnDestroyPlugin = disposeOnDestroyPlugin(disposable1, disposable2) + + (disposeOnDestroyPlugin as Destroyable).destroy() + + assertTrue(disposable1.isDisposed) + assertTrue(disposable2.isDisposed) + } + + @Test + fun `WHEN dispose on destroy plugin created with disposable THEN disposable is not disposed`() { + val disposable = Disposable.empty() + disposeOnDestroyPlugin(disposable) + + assertFalse(disposable.isDisposed) + } +} diff --git a/samples/sandbox/build.gradle.kts b/samples/sandbox/build.gradle.kts index aa03fe072..539fb3151 100644 --- a/samples/sandbox/build.gradle.kts +++ b/samples/sandbox/build.gradle.kts @@ -68,8 +68,8 @@ dependencies { implementation(libs.mvicore.android) implementation(libs.mvicore.binder) implementation(libs.rxjava2) - implementation(libs.rxandroid) - implementation(libs.rxrelay) + implementation(libs.rxandroid2) + implementation(libs.rxrelay2) testImplementation(libs.androidx.arch.core.testing) testImplementation(libs.junit) diff --git a/settings.gradle.kts b/settings.gradle.kts index 5fb4adc2e..572d5c682 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -21,6 +21,7 @@ include( ":libraries:customisations", ":libraries:interop-ribs", ":libraries:interop-rx2", + ":libraries:interop-rx3", ":libraries:testing-junit4", ":libraries:testing-junit5", ":libraries:testing-ui",