RxJava 是 Reactive Extensions 的 Java VM 实现:它是一个使用可观察序列编写异步和基于事件的程序的库。
它扩展了 观察者模式,以支持数据/事件序列,并添加了运算符,允许您以声明式的方式组合序列,同时抽象出诸如低级线程、同步、线程安全和并发数据结构等问题。
在 Wiki 主页 上了解有关 RxJava 的更多信息。
:information_source: 请阅读 3.0 中的不同之处,了解从 2.x 升级时的变化和迁移信息。
2.x 版本 将于2021 年 2 月 28 日停止服务。此后将不再进行任何开发、支持、维护、PR 和更新。最新版本 2.2.21 的 Javadoc 将保留访问权限。
1.x 版本 将于2018 年 3 月 31 日停止服务。此后将不再进行任何开发、支持、维护、PR 和更新。最新版本 1.3.8 的 Javadoc 将保持开放。
第一步是将 RxJava 3 添加到您的项目中,例如,将其作为 Gradle 编译依赖项:
implementation "io.reactivex.rxjava3:rxjava:3.x.y"
第二步是编写 Hello World 程序:
package rxjava.examples;
import io.reactivex.rxjava3.core.*;
public class HelloWorld {
public static void main(String[] args) {
Flowable.just("Hello world").subscribe(System.out::println);
}
}
请注意,RxJava 3 组件现在位于“io.reactivex.rxjava3”下,而基类和接口位于“io.reactivex.rxjava3.core”下。
RxJava 3 提供了几个基类,您可以在其中找到操作符:
io.reactivex.rxjava3.core.Flowable
:0 到 N 个流,支持 Reactive-Streams 和背压io.reactivex.rxjava3.core.Observable
:0 到 N 个流,不支持背压io.reactivex.rxjava3.core.Single
:一个包含 1 个元素的流,否则会抛出错误io.reactivex.rxjava3.core.Completable
: 一个流没有元素,但只有一个完成或错误信号。io.reactivex.rxjava3.core.Maybe
: 一个流没有元素,只有一个元素或一个错误。RxJava 中的数据流由一个源、零个或多个中间步骤以及一个数据消费者或组合器步骤(其中该步骤负责通过某种方式消费数据流)组成:
source.operator1().operator2().operator3().subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3());
这里,如果我们想象自己在 operator2
上,向左看向源头的方向称为上游。向右看向订阅者/消费者的方向称为下游。当每个元素写在单独的一行时,这一点通常更加明显:
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
在 RxJava 的文档中,emission、emits、item、event、signal、data 和 message 被视为同义词,表示沿着数据流传输的对象。
当数据流执行异步步骤时,每个步骤可能会以不同的速度执行不同的操作。为了避免这些步骤过载(这通常表现为由于临时缓冲或需要跳过/丢弃数据而导致的内存占用增加),会应用所谓的背压。背压是一种流控制形式,其中步骤可以表示它们准备处理的项目数量。这允许在通常无法知道上游将向其发送多少个项目的情况下限制数据流的内存使用量。
在 RxJava 中,专用的 Flowable
类被指定用于支持背压,而 Observable
则专用于非背压操作(短序列、GUI 交互等)。其他类型,例如 Single
、Maybe
和 Completable
,不支持背压,也不应该支持;总有空间可以临时存储一个数据项。
通过应用各种中间操作符来准备数据流,这个过程发生在所谓的组装时间中:
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;
此时,数据尚未流动,也没有任何副作用发生。
当在流上调用 subscribe()
时,这是一个临时状态,该流在内部建立了处理步骤链:
flow.subscribe(System.out::println)
此时会触发订阅副作用(参见 doOnSubscribe
)。某些数据源会在此状态下立即阻塞或开始发送数据。
当流主动发送数据、错误或完成信号时,此状态处于此状态:
Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);
实际上,这正是上面示例代码执行的时间。
RxJava 的一个常见用例是在后台线程上运行一些计算、网络请求,并在 UI 线程上显示结果(或错误):
import io.reactivex.rxjava3.schedulers.Schedulers;
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- wait for the flow to finish
这种链式方法被称为“流畅 API”,类似于“构建器模式”。然而,RxJava 的响应式类型是不可变的;每次方法调用都会返回一个带有附加行为的新 Flowable。为了说明这一点,该示例可以重写如下:
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
通常,您可以通过 subscribeOn
将计算或阻塞 IO 移至其他线程。数据准备就绪后,您可以通过 observeOn
确保它们在前台或 GUI 线程上处理。
RxJava 操作符不直接与 Thread
或 ExecutorService
交互,而是与所谓的 Scheduler
交互,这些 Scheduler
将并发源抽象到统一的 API 后面。RxJava 3 提供了几个可通过 Schedulers
实用程序类访问的标准调度器。
Schedulers.computation()
:在后台固定数量的专用线程上运行计算密集型工作。大多数异步操作符都将其用作默认的 Scheduler
。Schedulers.io()
:在动态变化的线程集上运行类似 I/O 或阻塞操作。Schedulers.single()
:以顺序和先进先出的方式在单个线程上运行工作。Schedulers.trampoline()
:在参与线程之一中以顺序和先进先出的方式运行工作,通常用于测试目的。这些方法在所有 JVM 平台上都可用,但某些特定平台(例如 Android)定义了其自己的典型 Scheduler
:AndroidSchedulers.mainThread()
、SwingScheduler.instance()
或 JavaFXScheduler.platform()
。
此外,还可以通过 Schedulers.from(Executor)
将现有的 Executor
(及其子类型,例如 ExecutorService
)包装到 Scheduler
中。例如,这可以用来获得更大但仍然固定的线程池(与 computation()
和 io()
不同)。
末尾的 Thread.sleep(2000);
并非偶然。在 RxJava 中,默认的“Scheduler”在守护线程上运行,这意味着一旦 Java 主线程退出,所有线程都会停止,后台计算可能永远不会发生。在本例中,休眠一段时间可以让您在控制台上看到流的输出,并留出一些时间。
RxJava 中的流本质上是顺序的,分为多个可以彼此并发运行的处理阶段:
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);
此示例流程在计算 Scheduler
上对 1 到 10 的数字进行平方,并在“主”线程(更准确地说,是 blockingSubscribe
的调用线程)上使用结果。然而,lambda v -> v * v
并非在此流程中并行运行;它会在同一个计算线程上依次接收 1 到 10 的值。
并行处理 1 到 10 的数字稍微复杂一些:
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);
实际上,RxJava 中的并行性意味着运行独立的 Flow,并将它们的结果合并回单个 Flow。运算符 flatMap
首先将 1 到 10 的每个数字映射到其各自的 Flowable
中,然后运行它们并合并计算出的平方。
但请注意,flatMap
不保证任何顺序,内部 Flow 中的项最终可能会交错。还有其他运算符:
concatMap
一次映射并运行一个内部 Flow;concatMapEager
一次运行所有内部 Flow,但输出 Flow 将按照这些内部 Flow 的创建顺序运行。或者,Flowable.parallel()
运算符和 ParallelFlowable
类型有助于实现相同的并行处理模式:
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
flatMap
是一个强大的运算符,在很多情况下都能派上用场。例如,给定一个返回 Flowable
的服务,我们希望使用第一个服务发出的值来调用另一个服务:
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource
.flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
.map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
.subscribe(System.out::println);
有时,当某个项目可用时,人们希望对其进行一些依赖计算。这有时被称为延续,根据应发生的情况和涉及的类型,可能涉及各种操作符来完成。
最典型的场景是输入一个值,调用另一个服务,等待并继续执行其结果:
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
通常情况下,后面的序列也需要来自前面映射的值。这可以通过将外部的 flatMap 移动到前一个 flatMap 的内部来实现,例如:
service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)
在这里,原始的 value
将在内部 flatMap
中可用,这得益于 lambda 变量捕获。
在其他情况下,第一个源/数据流的结果无关紧要,人们希望使用另一个准独立的源继续执行。在这里,flatMap
也可以正常工作:
Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
.subscribe(System.out::println, Throwable::printStackTrace);
但是,在这种情况下,继续执行仍然是 Observable
,而不是可能更合适的 Single
。(这是可以理解的,因为
从 flatMapSingle
的角度来看,sourceObservable
是一个多值源,因此映射也可能产生多个值)。
不过,通常有一种更具表现力(并且开销更低)的方法,即使用 Completable
作为中介,并使用其操作符 andThen
继续执行其他操作:
sourceObservable
.ignoreElements() // 返回 Completable
.andThen(someSingleSource)
.map(v -> v.toString())
sourceObservable
和 someSingleSource
之间的唯一依赖关系是,前者必须正常完成,后者才能被消费。
有时,前一个序列和新序列之间存在隐式数据依赖关系,由于某种原因,新序列没有通过“常规通道”传输。人们倾向于将此类延续写成如下形式:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println);
不幸的是,这会打印 0
,因为 Single.just(count.get())
是在汇编时执行的,而数据流甚至还没有运行。我们需要一些东西来将对这个 Single
源的执行推迟到运行时,即主源完成时:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println);
或
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.fromCallable(() -> count.get()))
.subscribe(System.out::println);
有时,源或服务返回的类型与预期使用的流不同。例如,在上面的库存示例中,getDemandAsync
可能返回 Single<DemandRecord>
。如果代码示例保持不变,则会导致编译时错误(但通常会显示一条误导性的错误消息,提示缺少重载)。
在这种情况下,通常有两种方法可以修复转换:1) 转换为所需类型;2) 查找并使用支持不同类型的特定运算符的重载。
每个响应式基类都具有可以执行此类转换(包括协议转换)以匹配其他类型的运算符。以下矩阵显示了可用的转换选项:
Flowable | Observable | Single | Maybe | Completable | |
---|---|---|---|---|---|
Flowable | toObservable |
first , firstOrError , single , singleOrError , last , lastOrError 1 |
firstElement , singleElement , lastElement |
ignoreElements |
|
Observable | toFlowable 2 |
first , firstOrError , single , singleOrError , last , lastOrError 1 |
firstElement , singleElement , lastElement |
ignoreElements |
|
Single | toFlowable 3 |
toObservable |
toMaybe |
ignoreElement |
|
Maybe | toFlowable 3 |
toObservable |
toSingle |
ignoreElement |
|
Completable | toFlowable |
toObservable |
toSingle |
toMaybe |
1:将多值源转换为单值源时,应该决定将众多源值中的哪一个作为结果。
2:将 Observable
转换为 Flowable
需要额外决策:如何处理源 Observable
潜在的不受约束的流?
您可以通过 BackpressureStrategy
参数或标准 Flowable
操作符(例如 onBackpressureBuffer
、onBackpressureDrop
和 onBackpressureLatest
)提供多种策略(例如缓冲、丢弃、保持最新),这些操作符也允许进一步自定义背压行为。
3:当只有(最多)一个源项时,背压不会有问题,因为它可以一直存储,直到下游准备好消费为止。
许多常用的操作符都有可以处理其他类型的重载。这些通常以目标类型的后缀命名:
Operator | Overloads |
---|---|
flatMap |
flatMapSingle , flatMapMaybe , flatMapCompletable , flatMapIterable |
concatMap |
concatMapSingle , concatMapMaybe , concatMapCompletable , concatMapIterable |
switchMap |
switchMapSingle , switchMapMaybe , switchMapCompletable |
这些操作符之所以带有后缀,而不是简单地使用相同名称但签名不同的形式,是因为类型擦除。Java 不会将诸如 operator(Function<T, Single<R>>)
和 operator(Function<T, Maybe<R>>)
之类的签名视为不同(与 C# 不同),并且由于类型擦除,这两个 operator
最终会被视为具有相同签名的重复方法。
在编程中,命名是最难的事情之一,因为名称应该不长、富有表现力、简洁易记。不幸的是,目标语言(以及现有的约定)在这方面可能没有提供太多帮助(例如不可用的关键字、类型擦除、类型歧义等)。
在原始的 Rx.NET 中,发出单个值然后完成的操作符称为 Return(T)
。由于 Java 约定以小写字母开头方法名,因此该方法名应该是 return(T)
,而这在 Java 中是一个关键字,因此无法使用。因此,RxJava 选择将此操作符命名为 just(T)
。操作符 Switch
也存在同样的限制,必须命名为 switchOnNext
。另一个例子是 Catch
,它被命名为 onErrorResumeNext
。
许多期望用户提供返回响应式类型的函数的操作符无法重载,因为 Function<T, X>
周围的类型擦除会将此类方法签名变为重复。 RxJava 也选择在命名此类操作符时附加类型作为后缀:
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)
即使某些操作符不会因类型擦除而产生问题,它们的签名也可能出现歧义,尤其是在使用 Java 8 和 lambda 表达式时。例如,concatWith
有多个重载,它们以各种其他响应式基类型作为参数(以便在底层实现中提供便利性和性能优势):
Flowable<T> concatWith(Publisher<? extends T> other);
Flowable<T> concatWith(SingleSource<? extends T> other);
Publisher
和 SingleSource
都以函数式接口(具有一个抽象方法的类型)的形式出现,可能会鼓励用户尝试提供 lambda 表达式:
someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);
不幸的是,这种方法不起作用,示例根本没有打印出 2
。事实上,从 2.1.10 版本开始,它甚至无法编译通过,
因为至少存在 4 个 concatWith
重载,编译器认为上面的代码存在歧义。
在这种情况下,用户可能希望将某些计算推迟到 someSource
完成,因此正确的
无歧义运算符应该是 defer
:
someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);
有时,需要添加后缀以避免逻辑歧义,这些歧义可能会编译通过,但在流中生成错误的类型:
Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> mergeArray(Publisher<? extends T>... sources);
当函数式接口类型作为类型参数 T
时,这种情况也会产生歧义。
数据流可能会失败,此时错误会发送给消费者。但有时,多个源可能会失败,此时可以选择等待所有源完成或失败。为了指示这种情况,许多运算符名称都带有 DelayError
后缀(而其他运算符在其某个重载中包含 delayError
或 delayErrors
布尔标志):
Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);
当然,各种后缀也可能同时出现:
Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);
基类由于其静态方法和实例方法数量众多,因此可以被认为是繁重的。RxJava 3 的设计深受 Reactive Streams 规范的影响,因此,该库为每个响应式类型都提供了一个类和一个接口:
Type | Class | Interface | Consumer |
---|---|---|---|
0..N backpressured | Flowable |
Publisher 1 |
Subscriber |
0..N unbounded | Observable |
ObservableSource 2 |
Observer |
1 element or error | Single |
SingleSource |
SingleObserver |
0..1 element or error | Maybe |
MaybeSource |
MaybeObserver |
0 element or error | Completable |
CompletableSource |
CompletableObserver |
1org.reactivestreams.Publisher
是外部 Reactive Streams 库的一部分。它是通过受 Reactive Streams 规范 约束的标准化机制与其他响应式库交互的主要类型。
2该接口的命名约定是在半传统的类名后附加 Source
。由于 Publisher
由 Reactive Streams 库提供(并且对其进行子类型化也无助于互操作),因此没有 FlowableSource
。然而,这些接口并非 Reactive Streams 规范意义上的标准接口,目前仅适用于 RxJava。
默认情况下,RxJava 本身不需要任何 ProGuard/R8 设置,应该可以正常工作。不幸的是,自 1.0.3 版本以来,Reactive Streams 依赖项在其 JAR 中嵌入了 Java 9 类文件,这可能会导致普通 ProGuard 发出警告:
Warning: org.reactivestreams.FlowAdapters$FlowPublisherFromReactive: can't find superclass or interface java.util.concurrent.Flow$Publisher
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveProcessor: can't find superclass or interface java.util.concurrent.Flow$Processor
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber: can't find superclass or interface java.util.concurrent.Flow$Subscriber
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscription: can't find superclass or interface java.util.concurrent.Flow$Subscription
Warning: org.reactivestreams.FlowAdapters: can't find referenced class java.util.concurrent.Flow$Publisher
建议在应用程序的 proguard-ruleset
文件中设置以下 -dontwarn
条目:
-dontwarn java.util.concurrent.Flow*
对于 R8,RxJava jar 包含 META-INF/proguard/rxjava3.pro
,并带有相同的 no-warning 子句,应该会自动应用。
更多详情,请参阅 wiki。
3.x 版本正在开发中。错误修复将应用于 2.x 和 3.x 分支,但新功能将仅添加到 3.x 版本。
当添加了重要的新功能、重大的增强或错误修复时,将进行 3.x 的小版本更新(例如 3.1、3.2 等),这些更新可能会带来行为上的改变,从而影响某些极端情况(例如对错误导致的行为的依赖)。一个可以归类为此类的增强示例是,为之前不支持该功能的运算符添加响应式拉取背压支持。这应该向后兼容,但行为会有所不同。
补丁 3.x.y 版本更新(例如 3.0.0 -> 3.0.1、3.3.1 -> 3.3.2 等)将用于修复错误和添加一些小功能(例如添加方法重载)。带有 [@Beta
][beta 源链接] 或 [@Experimental
][experimental 源链接] 注释的新功能也可以添加到补丁版本中,以便快速探索和迭代不稳定的新功能。
http://reactivex.io/RxJava/3.x/javadoc/3.x.y/
Maven、Ivy、Gradle 和其他库的二进制文件和依赖项信息可在 http://search.maven.org 找到。
Gradle 示例:
implementation 'io.reactivex.rxjava3:rxjava:x.y.z'
Maven 示例:
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>x.y.z</version>
</dependency>
Ivy 示例:
<dependency org="io.reactivex.rxjava3" name="rxjava" rev="x.y.z" />
2025 年 5 月 19 日之后的快照可通过以下方式获取https://central.sonatype.com/repository/maven-snapshots/io/reactivex/rxjava3/rxjava/
repositories {
maven { url 'https://central.sonatype.com/repository/maven-snapshots' }
}
dependencies {
implementation 'io.reactivex.rxjava3:rxjava:3.0.0-SNAPSHOT'
}
JavaDoc 快照可在 https://reactivex.io/RxJava/3.x/javadoc/snapshot 获取
构建方法:
$ git clone git@github.com:ReactiveX/RxJava.git
$ cd RxJava/
$ ./gradlew build
更多构建细节,请参阅 获取请参阅 Wiki 的 Started 页面。
如有错误、疑问或讨论,请使用 Github Issues。
版权所有 (c) 2016 年至今,RxJava 贡献者。
根据 Apache 许可证 2.0 版(“许可证”)授权;
除非遵守本许可证,否则您不得使用此文件。
您可以在以下位置获取许可证副本:
http://www.apache.org/licenses/LICENSE-2.0
除非适用法律要求或书面同意,否则根据本许可证分发的软件
均按“原样”分发,不附带任何明示或暗示的保证或条件。