前言

我们在查看 Spring Cloud 源码的时候,会发现已经引入了 Mono 或者 Flux 相关的代码,如果对这些代码不熟悉,就会觉得有些 Spring Cloud 源码将会变得晦涩难懂。Mono 和 Flux 为 ProjectReactor 响应式框架中的核心类。其相关概念可以参考 Flux、Mono、Reactor 实战(史上最全)和

响应式编程入门之 Project Reactor。我在参考了这些文章后,查看了相应的源码,这里是将自己的理解记录下来,希望可以帮助到初学者理解 ProjectReactor 。本文的目标是可以让大家理解以下者行代码的实现逻辑。

Mono.just("hello").map(e->e+" world").map(e->e+"!").subscribe(System.out::println);

核心接口

本文使用的是 java 8 ,项目中需要引入以下依赖

io.projectreactorreactor-core3.4.24

ProjectReactor 其核心思想观察者模式,定义了发布者和订阅者以及订阅三个接口。其核心接口如下

/** * 当将一个 Subscriber 传递给 Publisher#subscribe(Subscriber) 方法的时候,Subscriber#onSubscribe(Subscription) 会被调用 */public interface Subscriber {/** * 在 Publisher#subscribe(Subscriber) 调用时,被调用. * 他的职责是去调用 Subscription#request(long) 方法 */public void onSubscribe(Subscription s);/** * 当 Subscription#request(long)调用的时候被调用,数据T来自 Publisher */public void onNext(T t);/** * 发生异常时被触发 */public void onError(Throwable t);/** * 正常完成时被触发 */public void onComplete();}/** * Subscription 代表 Subscriber 订阅一个 Publisher 的生命周期,只能被一个 Subscriber 使用一次。 */public interface Subscription {/** * 通常是 Subscriber#onSubscribe(Subscription)时被触发,然后调用 Subscriber#onNext(T)方法 */public void request(long n);/** * 停止发送数据和清理相关资源 */public void cancel();}/** * 一个发布者可以发布无数个元素给订阅者,被多个 Subscriber 进行订阅。 */public interface Publisher {/** * 这个方法用于开始一个数据流,可以被调用多次,每次都会生成一个新的 Subscription 对象,每个 Subscription 对象只能被一个 Subscriber 使用。每个 Subscriber 最多订阅一次一个 Publisher。但是 Publisher 可以被多个 Subscriber 订阅。 */public void subscribe(Subscriber s);}

仔细看官方接口的文档,可以看到核心调用逻辑如下:

  1. 首先调用 Publisher#subscribe(Subscriber)方法,传入了一个 Subscriber。

  1. 然后 Subscriber#onSubscribe(Subscription),传入了一个 Subscription。

  1. 然后 Subscription#request(long) 会被触发。

  1. 然后 Subscriber#onNext(T) 会被触发。

每个方法,都规定了具体的职责。

看到这里我就有一个疑问了:

为什么需要这样弯弯绕绕,而且需要引入一个中间的对象 Subscription 传递数据?
Publisher 提供数据,Subscriber 直接去获取不就好了吗?

这里先给出我的理解:

形成数据流,中间可以引入多个处理过程。
可以在 Subscriber 订阅的时候,整个处理流程才动起来。

这里先有一个印象,后续可以再回头看就理解了。

按照核心逻辑,我们对以上三个接口进行简单实现。

简单实现

import org.reactivestreams.Publisher;import org.reactivestreams.Subscriber;import org.reactivestreams.Subscription;public class Demo {static class MySubscription implements Subscription{private String data;private Subscriber subscriber;public MySubscription(String data, Subscriber subscriber) {this.data = data;this.subscriber = subscriber;}private boolean isCanceled;@Overridepublic void request(long l) {if (!isCanceled){try {subscriber.onNext(data);subscriber.onComplete();} catch (Exception e) {subscriber.onError(e);}}}@Overridepublic void cancel() {isCanceled = true;}}staticclass MyPublisher implements Publisher{private String data;@Overridepublic void subscribe(Subscriber subscriber) {subscriber.onSubscribe(new MySubscription(data, subscriber));}public static Publisher just(String a){MyPublisher myPublisher = new MyPublisher();myPublisher.data = a;return myPublisher;}}static class MySubscriber implements Subscriber{@Overridepublic void onSubscribe(Subscription subscription) {subscription.request(1L);}@Overridepublic void onNext(Object o) {System.out.println(o);}@Overridepublic void onError(Throwable throwable) {System.out.println("error");}@Overridepublic void onComplete() {System.out.println("completed");}}public static void main(String[] args) {MyPublisher.just("MyPublisher1").subscribe(new MySubscriber());// 打印如下:// MyPublisher1//completed}}

是不是特别简单,一个简单的发布订阅流程就完成了。

但很明显扩展性不强,比如我想在 MyPublisher 和 MySubscriber 之间,做一些不限次数的处理逻辑,而且写法跟Stream类似,怎么办呢?

为了完成这个逻辑,需要引入一些操作类(operator)的对象,包括 Publisher,Subscriber 以及 Subscription 的实现类。

引入中间操作类的实现

import org.reactivestreams.Publisher;import org.reactivestreams.Subscriber;import org.reactivestreams.Subscription;import java.util.function.Function;public class OperatorPublisherTest {static class MySubscription implements Subscription{private String data;private Subscriber subscriber;public MySubscription(String data, Subscriber subscriber) {this.data = data;this.subscriber = subscriber;}private boolean isCanceled;@Overridepublic void request(long l) {System.out.println("MySubscription:调用 MySubscription#request(long)");if (!isCanceled){try {subscriber.onNext(data);subscriber.onComplete();} catch (Exception e) {subscriber.onError(e);}}}@Overridepublic void cancel() {isCanceled = true;}}/** * 抽象类,用于引入操作类的 Publisher */static abstract class AbstractPublisher implements Publisher{public OperatorPublisher operator(Function function,String name ){return new OperatorPublisher(this,function,name);}}/** * SourcePublisher 用于获取最初始的数据 */staticclass SourcePublisher extends AbstractPublisher{private String name;private String data;public SourcePublisher(String name) {this.name = name;println("生成 SourcePublisher ");}@Overridepublic void subscribe(Subscriber subscriber) {println("SourcePublisher#subscribe(Subscriber), 生成 MySubscription 对象");subscriber.onSubscribe(new MySubscription(data, subscriber));}public static AbstractPublisher just(String a){SourcePublisher myPublisher = new SourcePublisher("数据源Publisher");myPublisher.data = a;return myPublisher;}void println(String context){System.out.println(name+": "+context);}}static class OperatorPublisher extends AbstractPublisher{private String name;private Publisher source;private OperatorPublisher prePublisher;Function mapper;public OperatorPublisher(Publisher prePublisher,Function function,String name) {this.name = name;this.source = prePublisher;if (prePublisher instanceof OperatorPublisher){this.prePublisher = (OperatorPublisher) prePublisher;}mapper = function;print("生成 OperatorPublisher 对象");}@Overridepublic void subscribe(Subscriber s) {print("订阅:OperatorPublisher#subscribe(Subscriber)");OperatorPublisher currentPublisher = this;Subscriber currentSubscriber = s;while (true){currentSubscriber = currentPublisher.subscribeOrReturn(currentSubscriber);OperatorPublisher nextPublisher = currentPublisher.prePublisher;if (nextPublisher == null){currentPublisher.source.subscribe(currentSubscriber);return;}currentPublisher = nextPublisher;}}public OperatorSubscriptionSubscriber subscribeOrReturn(Subscriber s){print("OperatorPublisher#subscribeOrReturn(Subscriber), 生成 OperatorSubscriptionSubscriber 对象");return new OperatorSubscriptionSubscriber(mapper,s,name);}public void print(String text){System.out.println(name+": "+text);}}static class OperatorSubscriptionSubscriber implements Subscriber,Subscription{private String name;private Subscriber preSubscriber;private Function function;private Subscription preSubscription;public OperatorSubscriptionSubscriber(Function f, Subscriber preSubscriber, String name) {this.function = f;this.preSubscriber = preSubscriber;this.name = name+"_OperatorSubscriptionSubscriber";}private boolean isCanceled;@Overridepublic void request(long l) {println("OperatorSubscriptionSubscriber#request(long)");preSubscription.request(l);}@Overridepublic void onSubscribe(Subscription preSubscription) {println("OperatorSubscriptionSubscriber#onSubscribe(Subscription)");this.preSubscription = preSubscription;preSubscriber.onSubscribe(this);}@Overridepublic void onNext(Object o) {println("OperatorSubscriptionSubscriber#onNext(Object)");Object apply = function.apply(o);println("处理后的值为:"+apply);preSubscriber.onNext(apply);}@Overridepublic void onError(Throwable t) {preSubscriber.onError(t);}@Overridepublic void onComplete() {preSubscriber.onComplete();}@Overridepublic void cancel() {isCanceled = true;}void println(String context){System.out.println(name+": "+context);}}static class MySubscriber implements Subscriber{private String name = "最终订阅者MySubscriber:";@Overridepublic void onSubscribe(Subscription subscription) {println("MySubscriber#onSubscribe(Subscription)");subscription.request(1L);}@Overridepublic void onNext(Object o) {println("最终订阅者MySubscriber: MySubscriber onNext");println("最终订阅者MySubscriber:结果输出 "+o);}@Overridepublic void onError(Throwable throwable) {println("error");}@Overridepublic void onComplete() {println("MySubscriber#onComplete()");}void println(String context){System.out.println(name+context);}}public static void main(String[] args) {SourcePublisher.just("MyPublisher1")// 生成 SourcePublisher.operator(e->e+" operator1 ","操作对象1")// 生成 OperatorPublisher.operator(e->e+" operator2 ","操作对象2") // 生成 OperatorPublisher.subscribe(new MySubscriber()); // 生成了多个嵌套的Subscriber 回到 SourcePublisher 进行 onSubscribe 调用链}}

涉及的接口总结如下

接口

普通实现

操作类实现

Publisher

SourcePublisher

OperatorPublisher

Subscriber

MySubscriber

OperatorSubscriptionSubscriber

Subscription

MySubscription

OperatorSubscriptionSubscriber

接口对比如下

逻辑如下:

引入了操作者的概念(OperatorPublisher),本身是一个 Publisher ,可以通过 source 和 parentPublisher 变量将所有的 Publisher 组织成为一个 Publisher 单向链表,注意的是OperatorPublisher还包含一个函数式接口Function,这个是真正处理数据的地方。
引入了新的操作类型的 Subscriber(同时也是一个 Subscription),当 Publisher 链表被订阅的时候启动遍历 Publisher ,在遍历的过程中也形成了一个 Subscriber 的链表,链表最后一个 Subscriber 为真正的订阅者。
当遍历至 Publisher 链表最后一个元素的时候, Subscriber 的队列也已经构建完成。
调用最后一个 Publisher 的 subscribe 方法,执行 onSubscribe 方法,这个方法又会将 Subscriber 的链表遍历一遍,形成一个 Subscription 链表。
当 Subscriber 的链表遍历至最后一个的时候,通过调用 Subscriber 的 onSubscribe 方法,将会执行 subscription#request(long),这里也是一个遍历的方法,遍历 Subscription 到最后一个元素。
执行最后一个 Subscription 的 request 方法,依次执行 Subscriber 链表中的 Subscriber 的 onNext方法。
在操作类型的 Subscriber 中,含有对数据的处理逻辑,数据依次被处理后,最终的 Subscriber,完成最后的输出。

流程图如下:

打印效果如下(为了方便描述加上了行号):

  1. 数据源 Publisher: 生成 SourcePublisher

  1. 操作对象1: 生成 OperatorPublisher 对象

  1. 操作对象2: 生成 OperatorPublisher 对象

  1. 操作对象2: 订阅:OperatorPublisher#subscribe(Subscriber)

  1. 操作对象2: OperatorPublisher#subscribeOrReturn(Subscriber), 生成 OperatorSubscriptionSubscriber 对象

  1. 操作对象1: OperatorPublisher#subscribeOrReturn(Subscriber), 生成 OperatorSubscriptionSubscriber 对象

  1. 数据源Publisher: SourcePublisher#subscribe(Subscriber), 生成 MySubscription 对象

  1. 操作对象1_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#onSubscribe(Subscription)

  1. 操作对象2_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#onSubscribe(Subscription)

  1. 最终订阅者MySubscriber:MySubscriber#onSubscribe(Subscription)

  1. 操作对象2_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#request(long)

  1. 操作对象1_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#request(long)

  1. MySubscription: 调用 MySubscription#request(long)

  1. 操作对象1_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#onNext(Object)

  1. 操作对象1_OperatorSubscriptionSubscriber: 处理后的值为:MyPublisher1 operator1

  1. 操作对象2_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#onNext(Object)

  1. 操作对象2_OperatorSubscriptionSubscriber: 处理后的值为:MyPublisher1 operator1 operator2

  1. 最终订阅者MySubscriber:最终订阅者MySubscriber: MySubscriber onNext

  1. 最终订阅者MySubscriber:最终订阅者MySubscriber:结果输出 MyPublisher1 operator1 operator2

  1. 最终订阅者MySubscriber:MySubscriber#onComplete()

以上,一个完整的发布订阅逻辑已经处理完成,通过查看流程,可以看到

Publisher 在调用 subscribe 方法之前,只是构建了一个 Publisher 链表,没有发生任何数据处理逻辑。
查看 Publisher 是不存在任何状态的,也就是说,这个 Publisher 链表是可以重复使用的。
当调用了 subscribe 方法之后,数据处理逻辑开始开动,而且查看 Subscriber,

Mono 的工作流程

当你看完前面的逻辑并已经理解了之后,查看 Mono 的工作流程就会变得比较简单

首先一行最简单的代码如下:

import reactor.core.publisher.Mono;public class MonoTest {public static void main(String[] args) {Mono.just("hello").map(e->e+" world").map(e->e+"!").subscribe(System.out::println);}}

首先,查看Mono类,发现他是一个抽象类

public abstract class Mono implements CorePublisher {}

是一个 Publisher 的实现类,当他调用 just(T) 方法时,会返回一个 MonoJust,MonoJust 继承了 Mono。

public static  Mono just(T data) {return onAssembly(new MonoJust(data));}

然后调用 map(Function) 时,会返回一个 MonoMapFuseable ,MonoMapFuseable 也是继承了 Mono。继续调用 map(Function),也是如此。

public final  Mono map(Function 

至此,Publisher 的两个实现类已经出现 MonoJust 和 MonoMapFuseable。MonoJust 负责存储原始数据,MonoMapFuseable 用于处理数据,并负责构建 Publisher 链表。看下两个类的实现

final class MonoJust extends Monoimplements Fuseable.ScalarCallable, Fuseable, SourceProducer{final T value;// 将数据存储在 value 变量中。MonoJust(T value) {this.value = Objects.requireNonNull(value, "value");}@Overridepublic void subscribe(CoreSubscriber subscriber) {OptimizableOperator operator = this;try {while (true) {subscriber = operator.subscribeOrReturn(subscriber);if (subscriber == null) {// null means "I will subscribe myself", returning...return;}OptimizableOperator newSource = operator.nextOptimizableSource();if (newSource == null) {operator.source().subscribe(subscriber);return;}operator = newSource;}}catch (Throwable e) {Operators.reportThrowInSubscribe(subscriber, e);return;}}@Override@SuppressWarnings("unchecked")public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) {if (actual instanceof ConditionalSubscriber) {ConditionalSubscriber cs = (ConditionalSubscriber) actual;return new FluxMapFuseable.MapFuseableConditionalSubscriber(cs, mapper);}return new FluxMapFuseable.MapFuseableSubscriber(actual, mapper);}}

注意在 subscribeOrReturn 方法中返回了一个 MapFuseableConditionalSubscriber

可以看到 MapFuseableConditionalSubscriber 既是一个 Subscrition 也是一个 Subscriber。用来构建 Subscriber 链表和 Subscrition 链表,而它存有 MonoMapFuseable 传进来的 Functionstatic final class MapFuseableConditionalSubscriberimplements ConditionalSubscriber, InnerOperator, QueueSubscription {final ConditionalSubscriber actual;final Function mapper;boolean done;QueueSubscription s;MapFuseableConditionalSubscriber(ConditionalSubscriber actual,Function mapper) {this.actual = actual;this.mapper = mapper;}@SuppressWarnings("unchecked")@Overridepublic void onSubscribe(Subscription s) {if (Operators.validate(this.s, s)) {this.s = (QueueSubscription) s;actual.onSubscribe(this);}}@Overridepublic void onNext(T t) {if (sourceMode == ASYNC) {actual.onNext(null);}else {R v;try {v = mapper.apply(t);if (v == null) {throw new NullPointerException("The mapper [" + mapper.getClass().getName() + "] returned a null value.");}}catch (Throwable e) {return;}actual.onNext(v);}}@Overridepublic void request(long n) {s.request(n);}}

当调用 .subscribe(System.out::println) 的时候,这里传入的是一个方法引用。调用的是父类Mono的多个重载方法

public final Disposable subscribe(Consumer consumer) {Objects.requireNonNull(consumer, "consumer");return subscribe(consumer, null, null);}public final Disposable subscribe(@Nullable Consumer consumer,@Nullable Consumer errorConsumer,@Nullable Runnable completeConsumer) {return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);}public final Disposable subscribe(@Nullable Consumer consumer,@Nullable Consumer errorConsumer,@Nullable Runnable completeConsumer,@Nullable Context initialContext) {return subscribeWith(new LambdaMonoSubscriber(consumer, errorConsumer,completeConsumer, null, initialContext));}public final <E extends Subscriber> E subscribeWith(E subscriber) {// 这里将会调用子类的重写 subscribe 方法。subscribe(subscriber);return subscriber;}

传入的 Consumer 被封装成为了 LambdaMonoSubscriber,最终在 onNext 方法中调用 Consumer 的逻辑。

final class LambdaMonoSubscriber implements InnerConsumer, Disposable {// 两个核心方法@Overridepublic final void onSubscribe(Subscription s) {if (Operators.validate(subscription, s)) {this.subscription = s;if (subscriptionConsumer != null) {//。。。}else {s.request(Long.MAX_VALUE);}}}@Overridepublic final void onNext(T x) {if (consumer != null) {try {consumer.accept(x);}catch (Throwable t) {Exceptions.throwIfFatal(t);s.cancel();doError(t);}}// 省略其他。。。}}

以上就是所有的关键类,跟我们前面的实现逻辑是一致的,流程也是一致,因此不再做过多的描述。

总结

本文通过自己实现接口的方式,揭示了响应式编程核心类 Mono 的基本原理。其核心接口为三个 Publisher,Subscriber,Subsription。核心方法为 Publisher#onSubscribe(Subscription),Subscriber#onSubscribe(Subscription),Subsription#request(long), Subscriber#onNext(T),围绕Publisher的构造函数以及这四个方法,依次构建了Publisher 链表, Subscriber 链表和 Subsription 链表。通过这三个链表构建和遍历,完成了数据的发布,订阅和流式处理逻辑。

但是本文未涉及如何完成"响应式",这个在后续的文章中会涉及。

参考

projectreactor官方网站

官方文档

Flux、Mono、Reactor 实战(史上最全)

二、subscribeOn和publishOn源码解析

响应式编程入门之 Project Reactor