Kotlin多平台响应式框架reaktive
Kotlin跨平台Reactive Extensions实现。
安装配置
这里有几个已经发布到 Maven Central 的模块:
reaktive
:Reaktive 的主要库(多平台)reaktive-annotations
:注解集合(多平台)reaktive-testing
:测试工具(多平台)utils
:一些实用工具,例如Clock、AtomicReference、Lock
等(多平台)coroutines-interop
:Kotlin 协程的互操作性辅助工具(多平台)rxjava2-interop
:RxJava v2 的互操作性辅助工具(JVM 和 Android)rxjava3-interop
:RxJava v3 的互操作性辅助工具(JVM 和 Android)
配置依赖
kotlin { sourceSets { commonMain { dependencies { implementation 'com.badoo.reaktive:reaktive:<version>' implementation 'com.badoo.reaktive:reaktive-annotations:<version>' implementation 'com.badoo.reaktive:coroutines-interop:<version>' // For interop with coroutines implementation 'com.badoo.reaktive:rxjava2-interop:<version>' // For interop with RxJava v2 implementation 'com.badoo.reaktive:rxjava3-interop:<version>' // For interop with RxJava v3 } } commonTest { dependencies { implementation 'com.badoo.reaktive:reaktive-testing:<version>' } } }}
特征
跨平台支持:JVM、Android、iOS、macOS、watchOS、tvOS、JavaScript、Linux X64、Linux ARM 32 hfp
调度程序支持:
computationScheduler
– 固定线程池,线程数等于核心数ioScheduler
– 无限制线程池,使用缓存策略newThreadScheduler
– 为每个任务创建一个新线程singleScheduler
– 在单个共享后台线程上执行任务trampolineScheduler
– 将任务排队,并在参与的线程之一上执行它们mainScheduler
– 在主线程上执行任务- Kotlin/Native的真正多线程支持(有一些限制)
- 针对Kotlin/Native的线程本地订阅而不会冻结
- 支持的来源:
Observable、Maybe、Single、Completable
- Subjects:
PublishSubject、BehaviorSubject、ReplaySubject、UnicastSubject
- 与Kotlin协程的互操作性:在Reaktive和协程之间(包括Flow)之间进行转换
- 与RxJava2和RxJava3的互操作性:在Reaktive和RxJava之间进行源的转换,能够重用RxJava的调度程序
Reaktive和旧(严格的)Kotlin/Native内存模型
旧的(严格的)Kotlin Native内存模型和并发是非常特殊的。一般来说,线程之间不允许共享可变状态。由于Reaktive支持Kotlin Native中的多线程,请在使用之前阅读以下文档:
并发性
不变性
对象分离相对较难实现,当对象从外部创建且不完全由库管理时非常容易出错。这就是为什么Reaktive更喜欢冻结状态的原因。以下是一些提示:任何提交给调度程序的回调(以及任何被捕获的对象)都将被冻结
subscribeOn
会冻结其上游源和下游观察者,所有的Disposables
(上游和下游的)也都被冻结,所有的值(包括错误)都不是由该操作符冻结的observeOn
仅会冻结其下游观察者和通过它传递的所有值(包括错误),加上所有的Disposables
,上游源不会被该操作符冻结使用调度程序的其他操作符(如
debounce
、timer
、delay
等)在大多数情况下与observeOn
的行为相同
线程本地技巧以避免冻结
有时候不能接受冻结,例如我们可能希望在后台加载一些数据,然后更新UI。显然,UI 不能被冻结。使用Reaktive可以通过两种方式实现这种行为:
使用threadLocal
运算符:
val values = mutableListOf<Any>()var isFinished = falseobservable<Any> { emitter -> // Background job} .subscribeOn(ioScheduler) .observeOn(mainScheduler) .threadLocal() .doOnBeforeNext { values += it } // Callback is not frozen, we can update the mutable list .doOnBeforeFinally { isFinished = true } // Callback is not frozen, we can change the flag .subscribe()
在 subscribe
运算符中将 isThreadLocal
标志设置为 true
val values = mutableListOf<Any>()var isComplete = falseobservable<Any> { emitter -> // Background job} .subscribeOn(ioScheduler) .observeOn(mainScheduler) .subscribe( isThreadLocal = true, onNext = { values += it }, // Callback is not frozen, we can update the mutable list onComplete = { isComplete = true } // Callback is not frozen, we can change the flag )
在这两种情况下,订阅(subscribe
调用)必须在主线程上执行。
Reaktive和新的(宽松的)Kotlin/Native内存模型
新的(宽松的)Kotlin/Native内存模型允许在线程之间传递对象而无需冻结。使用此内存模型时,不再需要使用threadLocal
运算符/参数。请确保按照文档中所述禁用冻结。
协程互操作性
这个功能由coroutines-interop
模块提供,分为两个版本发布:
coroutines-interop:
基于稳定的kotlinx.coroutines
– 使用稳定的协程版本和旧的(严格)内存模型使用此变体。
coroutines-interop:-nmtc
基于正在进行的多线程kotlinx.coroutines
– 使用此变体,无论是协程的多线程版本还是新的(放松的)内存模型。
基于稳定kotlinx.coroutines
的协程互操作性
有一些重要的限制:
直到多线程协程的发布,Job
和CoroutineContext
都无法被冻结。
由于第一个限制,在Kotlin/Native中,所有的xxxFromCoroutine {}
构建器和Flow.asObservable()
转换器都在runBlocking
块内执行,并应该在后台Scheduler
上进行订阅。
考虑以下coroutines-interop
示例:
singleFromCoroutine { // This block will be executed inside `runBlocking` in Kotlin/Native} .subscribeOn(ioScheduler) // Switching to a background thread is necessary .observeOn(mainScheduler) .subscribe { /* Get the result here */ }
请注意,Ktor
默认使用多线程协程。如果您正在使用Ktor
,请使用基于多线程协程的coroutines-interop
模块,并继续阅读下一个自述文件部分。
基于多线程kotlinx.coroutines
的协程Interop
多线程kotlinx.coroutines
变体消除了一些不愉快的限制- Job
和CoroutineContext
都可以被冻结。
因此,有一个关键的区别-所有的xxxFromCoroutine {}
构建器和Flow.asObservable()
转换器在所有目标中(包括Kotlin/Native)都是异步执行的,因此可以在任何调度器上进行订阅。
注:
- 由于多线程协程仍在进行中,可能会出现问题。
Ktor
可以直接使用,没有已知的限制
协程Interop
的通用限制
转换器Scheduler.asCoroutineDispatcher()
和CoroutineContext.asScheduler()
目前仅在JVM和JS中可用。
使用DisposableScope
进行订阅管理
Reaktive提供了一种方便的订阅管理方式:DisposableScope
。
请查看以下示例:
val scope = disposableScope { observable.subscribeScoped(...) // Subscription will be disposed when the scope is disposed doOnDispose { // Will be called when the scope is disposed } someDisposable.scope() // `someDisposable` will be disposed when the scope is disposed }// At some point laterscope.dispose()
class MyPresenter( private val view: MyView, private val longRunningAction: Completable) : DisposableScope by DisposableScope() { init { doOnDispose { // Will be called when the presenter is disposed } } fun load() { view.showProgressBar() // Subscription will be disposed when the presenter is disposed longRunningAction.subscribeScoped(onComplete = view::hideProgressBar) }}class MyActivity : AppCompatActivity(), DisposableScope by DisposableScope() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) MyPresenter(...).scope() } override fun onDestroy() { dispose() super.onDestroy() }}
Reaktive 和 Swift 的互操作性
请参阅相应的文档页面:Reaktive 和 Swift 的互操作性。
https://github.com/badoo/Reaktive/blob/master/docs/SwiftInterop.md
插件
Reaktive 提供了 Plugin API,类似于 RxJava 插件。Plugin API 提供了一种装饰 Reaktive 源的方式。插件应该实现 ReaktivePlugin
接口,并可以使用 registerReaktivePlugin
函数进行注册,并使用 unregisterReaktivePlugin
函数进行取消注册。
object MyPlugin : ReaktivePlugin { override fun <T> onAssembleObservable(observable: Observable<T>): Observable<T> = object : Observable<T> { private val traceException = TraceException() override fun subscribe(observer: ObservableObserver<T>) { observable.subscribe( object : ObservableObserver<T> by observer { override fun onError(error: Throwable) { observer.onError(error, traceException) } } ) } } override fun <T> onAssembleSingle(single: Single<T>): Single<T> = TODO("Similar to onAssembleSingle") override fun <T> onAssembleMaybe(maybe: Maybe<T>): Maybe<T> = TODO("Similar to onAssembleSingle") override fun onAssembleCompletable(completable: Completable): Completable = TODO("Similar to onAssembleSingle") private fun ErrorCallback.onError(error: Throwable, traceException: TraceException) { if (error.suppressedExceptions.lastOrNull() !is TraceException) { error.addSuppressed(traceException) } onError(error) } private class TraceException : Exception()}
github项目主页
https://github.com/badoo/Reaktive
一些代码示例
MPP module
https://github.com/badoo/Reaktive/tree/master/sample-mpp-module
Android app
https://github.com/badoo/Reaktive/tree/master/sample-android-app
iOS app
https://github.com/badoo/Reaktive/tree/master/sample-ios-app
JavaScript browser app
https://github.com/badoo/Reaktive/tree/master/sample-js-browser-app
Linux x64 app
https://github.com/badoo/Reaktive/tree/master/sample-linuxx64-app