webflux介绍
Spring Boot 2.0
spring.io 官网有句醒目的话是:
BUILD ANYTHING WITH SPRING BOOT
Spring Boot (Boot 顾名思义,是引导的意思)框架是用于简化 Spring 应用从搭建到开发的过程。应用开箱即用,只要通过一个指令,包括命令行 java -jar 、SpringApplication 应用启动类 、 Spring Boot Maven 插件等,就可以启动应用了。另外,Spring Boot 强调只需要很少的配置文件,所以在开发生产级 Spring 应用中,让开发变得更加高效和简易。目前,Spring Boot 版本是 2.x 版本。Spring Boot 包括 WebFlux。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-z0QiBq9N-1647350877518)(//upload-images.jianshu.io/upload_images/13587608-2108800ca0602799.png?imageMogr2/auto-orient/strip|imageView2/2/w/1200/format/webp)]
传统的以SpringMVC为代表的webmvc技术使用的是同步阻塞式IO模型
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-we3JkRy0-1647350877522)(//upload-images.jianshu.io/upload_images/13587608-e87cc1031dff253b.png?imageMogr2/auto-orient/strip|imageView2/2/w/891/format/webp)]
而Spring WebFlux是一个异步非阻塞式IO模型,可以用少量的容器线程支撑大量的并发访问,所以Spring WebFlux可以提升吞吐量和伸缩性,但是接口的响应时间并不会缩短,其处理结果还是得由worker线程处理完成之后在返回给请求。想学习交流HashMap,nginx、dubbo、Spring MVC,分布式、高性能高可用、MySQL,redis、jvm、多线程、netty、kafka、的加尉xin(同英):1253431195 扩列获取资料学习,无工作经验不要加哦!
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rtkYH6rj-1647350877523)(//upload-images.jianshu.io/upload_images/13587608-0c9ab223c04faeb0.png?imageMogr2/auto-orient/strip|imageView2/2/w/1200/format/webp)]
webflux应用场景
适合IO密集型、磁盘IO密集、网络IO密集等服务场景,比如微服务网关,就可以使用webflux技术来显著的提升网关对下游服务的吞吐量,spring cloud gateway就使用了webflux这门技术
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-o3urQyoN-1647350877524)(//upload-images.jianshu.io/upload_images/13587608-371f3ac43c0227b1.png?imageMogr2/auto-orient/strip|imageView2/2/w/861/format/webp)]
Spring Boot 2.0 WebFlux
了解 WebFlux,首先了解下什么是 Reactive Streams。Reactive Streams 是 JVM 中面向流的库标准和规范:
- 处理可能无限数量的元素
- 按顺序处理
- 组件之间异步传递
- 强制性非阻塞背压(Backpressure)
Backpressure(背压)
背压是一种常用策略,使得发布者拥有无限制的缓冲区存储元素,用于确保发布者发布元素太快时,不会去压制订阅者。
Reactive Streams(响应式流)
一般由以下组成:
- publisher:发布者,发布元素到订阅者
- subscriber:订阅者,消费元素
- subscription:订阅,在发布者中,订阅被创建时,将与订阅者共享
- processor:处理器,发布者与订阅者之间处理数据,包含了发布者与订阅者的共同体
publisher接口规范
public interface Publisher {void subscribe(Subscriber var1);}
subscriber接口规范
public interface Subscriber {void onSubscribe(Subscription var1);void onNext(T var1);void onError(Throwable var1);void onComplete();}
subscription接口规范
public interface Subscription {void request(long var1);void cancel();}
processor接口规范
public interface Processor extends Subscriber, Publisher {}
响应式编程
有了 Reactive Streams 这种标准和规范,利用规范可以进行响应式编程。那再了解下什么是 Reactive programming 响应式编程。响应式编程是基于异步和事件驱动的非阻塞程序,只是垂直通过在 JVM 内启动少量线程扩展,而不是水平通过集群扩展。这就是一个编程范例,具体项目中如何体现呢?
响应式项目编程实战中,通过基于 Reactive Streams 规范实现的框架 Reactor 去实战。Reactor 一般提供两种响应式 API :
- Mono:实现发布者,并返回 0 或 1 个元素
- Flux:实现发布者,并返回 N 个元素
Spring Webflux
Spring Boot Webflux 就是基于 Reactor 实现的。Spring Boot 2.0 包括一个新的 spring-webflux 模块。该模块包含对响应式 HTTP 和 WebSocket 客户端的支持,以及对 REST,HTML 和 WebSocket 交互等程序的支持。一般来说,Spring MVC 用于同步处理,Spring Webflux 用于异步处理。
Spring Boot Webflux 有两种编程模型实现,一种类似 Spring MVC 注解方式,另一种是使用其功能性端点方式。
Spring Boot 2.0 WebFlux 特性
常用的 Spring Boot 2.0 WebFlux 生产的特性如下:
- 响应式 API
- 编程模型
- 适用性
- 内嵌容器
- Starter 组件
还有对日志、Web、消息、测试及扩展等支持。
响应式 API
Reactor 框架是 Spring Boot Webflux 响应库依赖,通过 Reactive Streams 并与其他响应库交互。提供了 两种响应式 API:Mono 和 Flux。一般是将 Publisher 作为输入,在框架内部转换成 Reactor 类型并处理逻辑,然后返回 Flux 或 Mono 作为输出。想学习交流HashMap,nginx、dubbo、Spring MVC,分布式、高性能高可用、MySQL,redis、jvm、多线程、netty、kafka、的加尉xin(同英):1253431195 扩列获取资料学习,无工作经验不要加哦!
spring webflux和spring mvc的异同点
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ENh7FPTU-1647350877526)(//upload-images.jianshu.io/upload_images/13587608-bd7cea70050a4c66.png?imageMogr2/auto-orient/strip|imageView2/2/w/800/format/webp)]
一图就很明确了,WebFlux 和 MVC 有交集,方便大家迁移。但是注意:
- MVC 能满足场景的,就不需要更改为 WebFlux。
- 要注意容器的支持,可以看看下面内嵌容器的支持。
- 微服务体系结构,WebFlux 和 MVC 可以混合使用。尤其开发 IO 密集型服务的时候,选择 WebFlux 去实现。
- spring mvc是一个命令式的编程方式采用同步阻塞方式,方便开发人员编写代码和调试;spring webflux调试会非常不方便
- JDBC连接池和JPA等技术还是阻塞模型,传统的关系型数据库如MySQL也不支持非阻塞的方式获取数据,目前只有非关系型数据库如Redis、Mongodb支持非阻塞方式获取数据
编程模型
Spring 5 web 模块包含了 Spring WebFlux 的 HTTP 抽象。类似 Servlet API , WebFlux 提供了 WebHandler API 去定义非阻塞 API 抽象接口。可以选择以下两种编程模型实现:
- 注解控制层。和 MVC 保持一致,WebFlux 也支持响应性 @RequestBody 注解。
- 功能性端点。基于 lambda 轻量级编程模型,用来路由和处理请求的小工具。和上面最大的区别就是,这种模型,全程控制了请求 – 响应的生命流程
内嵌容器
跟 Spring Boot 大框架一样启动应用,但 WebFlux 默认是通过 Netty 启动,并且自动设置了默认端口为 8080。另外还提供了对 Jetty、Undertow 等容器的支持。开发者自行在添加对应的容器 Starter 组件依赖,即可配置并使用对应内嵌容器实例。
但是要注意,必须是 Servlet 3.1+ 容器,如 Tomcat、Jetty;或者非 Servlet 容器,如 Netty 和 Undertow。
Netty优点
- API使用简单、易上手
- 功能强大、支持多种主流协议
- 定制能力强、可扩展性高
- 性能高、综合性能最优
- 成熟稳定、久经考验
- 社区活跃、学习资料多
Netty selector模型
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mRe8zuMb-1647350877527)(//upload-images.jianshu.io/upload_images/13587608-6439849871f5493c.png?imageMogr2/auto-orient/strip|imageView2/2/w/647/format/webp)]
Reactor指南
- Reactor 框架是 Pivotal 公司(开发 Spring 等技术的公司)开发的
- 实现了 Reactive Programming 思想,符合Reactive Streams 规范(Reactive Streams 是由 Netflix、TypeSafe、Pivotal 等公司发起的)的一项技术
- 侧重于server端的响应式编程框架
- Reactor 框架主要有两个主要的模块:reactor-core 和 reactor-ipc。前者主要负责 Reactive Programming 相关的核心 API 的实现,后者负责高性能网络通信的实现,目前是基于 Netty 实现的。想学习交流HashMap,nginx、dubbo、Spring MVC,分布式、高性能高可用、MySQL,redis、jvm、多线程、netty、kafka、的加尉xin(同英):1253431195 扩列获取资料学习,无工作经验不要加哦!
Java原有的异步编程方式
- Callback:异步方法采用一个callback作为参数,当结果出来后回调这个callback,例如swings的EventListener
- Future:异步方法返回一个Future,此时结果并不是立刻可以拿到,需要处理结束之后才可以使用
Future局限
- 多个Future组合不易
- 调用Future#get时仍然会阻塞
- 缺乏对多个值以及进一步的出错处理
Reactor的Publisher
- Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的响应式序列。
- Flux 同样实现了 org.reactivestreams.Publisher 接口,代表0到N个元素的结果。
Flux介绍
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Rs3Mqhos-1647350877528)(//upload-images.jianshu.io/upload_images/13587608-2635442505128957.png?imageMogr2/auto-orient/strip|imageView2/2/w/640/format/webp)]
- Flux是一个标准Publisher,表示0到N个发射项的异步序列,可选地以完成信号或错误终止。与Reactive Streams规范中一样,这三种类型的信号转换为对下游订阅者的onNext、onComplete或onError方法的调用。
- 在这种大范围的可能信号中,Flux是通用的reactive 类型。注意,所有事件,甚至终止事件,都是可选的:没有onNext事件,但是onComplete事件表示一个空的有限序列,但是移除onComplete并且您有一个无限的空序列(除了关于取消的测试之外,没有特别有用)。同样,无限序列不一定是空的。例如,Flux.interval(Duration) 产生一个Flux,它是无限的,从时钟发出规则的数据。
Mono介绍
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tuGMtq94-1647350877529)(//upload-images.jianshu.io/upload_images/13587608-dd24951492c7fe52.png?imageMogr2/auto-orient/strip|imageView2/2/w/640/format/webp)]
- Mono是一个专门的Publisher,它最多发出一个项,然后可选地以onComplete信号或onError信号结束。
- 它只提供了可用于Flux的操作符的子集,并且一些操作符(特别是那些将Mono与另一个发布者组合的操作符)切换到Flux。
- 例如,Mono#concatWith(Publisher)返回一个Flux ,而Mono#then(Mono)则返回另一个Mono。
- 注意,Mono可以用于表示只有完成概念(类似于Runnable)的无值异步进程。若要创建一个,请使用Mono。
publisher订阅
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NOt0wadI-1647350877530)(//upload-images.jianshu.io/upload_images/13587608-24769a4237bf39c0.png?imageMogr2/auto-orient/strip|imageView2/2/w/1123/format/webp)]
reactor实践
- 首先maven工厂引入pom
org.springframework.bootspring-boot-starter-webflux
@RunWith(SpringRunner.class)@SpringBootTestpublic class ApplicationTest {@Testpublic void testReactor(){Flux flux = Flux.just(1, 2, 3, 4, 5, 6);Mono mono = Mono.just(1);Integer[] arr = {1,2,3,4,5,6};Flux flux1 = Flux.fromArray(arr);List list = Arrays.asList(1, 2, 3, 4, 5, 6);Flux flux2 = Flux.fromIterable(list);Flux flux3 = Flux.from(flux);Flux flux4 = Flux.fromStream(Stream.of(1, 2, 3, 4, 5, 6));flux.subscribe();flux1.subscribe(System.out::println);flux2.subscribe(System.out::println,System.err::println);flux3.subscribe(System.out::println,System.err::println,() -> System.out.println("complete"));flux4.subscribe(System.out::println,System.err::println,() -> System.out.println("complete"),subscription -> subscription.request(3));flux4.subscribe(new DemoSubscriber());}class DemoSubscriber extends BaseSubscriber{@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("Subscribe");subscription.request(1);}@Overrideprotected void hookOnNext(Integer value) {if(value == 4){//背压,通知数据源,不要发送数据了cancel();}System.out.println(value);request(1);}}}
Reactor操作符
map – 元素映射为新元素
map操作可以将数据元素进行转换/映射,得到一个新元素。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7mIXxVVZ-1647350877531)(//upload-images.jianshu.io/upload_images/13587608-368b8ab8b5a692cd.png?imageMogr2/auto-orient/strip|imageView2/2/w/640/format/webp)]
flatMap – 元素映射为流
flatMap操作可以将每个数据元素转换/映射为一个流,然后将这些流合并为一个大的数据流。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nGHXbvoO-1647350877532)(//upload-images.jianshu.io/upload_images/13587608-cdfb29dbd34ffe8f.png?imageMogr2/auto-orient/strip|imageView2/2/w/640/format/webp)]
filter – 过滤
filter操作可以对数据元素进行筛选。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Vdv4loSy-1647350877532)(//upload-images.jianshu.io/upload_images/13587608-033ebaeee6d17642.png?imageMogr2/auto-orient/strip|imageView2/2/w/640/format/webp)]
zip – 一对一合并
看到zip这个词可能会联想到拉链,它能够将多个流一对一的合并起来。zip有多个方法变体,我们介绍一个最常见的二合一的。想学习交流HashMap,nginx、dubbo、Spring MVC,分布式、高性能高可用、MySQL,redis、jvm、多线程、netty、kafka、的加尉xin(同英):1253431195 扩列获取资料学习,无工作经验不要加哦!
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-45QXlchE-1647350877533)(//upload-images.jianshu.io/upload_images/13587608-84fafc95687bd6ca.png?imageMogr2/auto-orient/strip|imageView2/2/w/640/format/webp)]
更多
Reactor中提供了非常丰富的操作符,除了以上几个常见的,还有:
- 用于编程方式自定义生成数据流的
create
和generate
等及其变体方法; - 用于“无副作用的peek”场景的
doOnNext
、doOnError
、doOncomplete
、doOnSubscribe
、doOnCancel
等及其变体方法; - 用于数据流转换的
when
、and/or
、merge
、concat
、collect
、count
、repeat
等及其变体方法; - 用于过滤/拣选的
take
、first
、last
、sample
、skip
、limitRequest
等及其变体方法; - 用于错误处理的
timeout
、onErrorReturn
、onErrorResume
、doFinally
、retryWhen
等及其变体方法; - 用于分批的
window
、buffer
、group
等及其变体方法; - 用于线程调度的
publishOn
和subscribeOn
方法。
使用这些操作符,你几乎可以搭建出能够进行任何业务需求的数据处理管道/流水线。
抱歉以上这些暂时不能一一介绍,更多详情请参考JavaDoc
reactor和java8 stream区别
- 形似而神不似
- reactor:push模式,服务端推送数据给客户端
- java8 stream:pull模式,客户端主动向服务端请求数据
Reactor线程模型
Reactor创建线程的方式
- Schedulers.immediate():当前线程
- Schedulers.single():可重用的单线程,注意,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器被废弃。如果你想使用独占的线程,请使用Schedulers.newSingle();
- Schedulers.elastic():弹性线程池,它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。Schedulers.elastic()能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源;
- Schedulers.parallel():固定大小线程池,所创建线程池的大小与CPU个数等同
- Schedulers.fromExecutorService(ExecutorService):自定义线程池,基于自定义的ExecutorService创建 Scheduler(虽然不太建议,不过你也可以使用Executor来创建)
线程模型
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OoFgSIEN-1647350877533)(//upload-images.jianshu.io/upload_images/13587608-b39ed01cd59ba6e1.png?imageMogr2/auto-orient/strip|imageView2/2/w/1172/format/webp)]
线程切换实践
@RunWith(SpringRunner.class)@SpringBootTestpublic class ApplicationTest {@Testpublic void testReactor() throws InterruptedException {Flux flux = Flux.just(1, 2, 3, 4, 5, 6);flux.map(i -> {System.out.println(Thread.currentThread().getName()+"-map1");return i * 3;}).publishOn(Schedulers.elastic()).map(i -> {System.out.println(Thread.currentThread().getName()+"-map2");return i / 3;}).subscribeOn(Schedulers.parallel()).subscribe(i -> System.out.println(Thread.currentThread().getName()+"-" + i));Thread.sleep(10000);}}
线程切换总结
- publishOn:它将上游信号传给下游,同时改变后续的操作符的执行所在线程,直到下一个publishOn出现在这个链上
- subscribeOn:作用于向上的订阅链,无论处于操作链的什么位置,它都会影响到源头的线程执行环境,但不会影响到后续的publishOn