一、什么是响应式编程
1.1 什么是WebFlux
WebFlux是从Spring Framework5.0以后开始引入的响应式web编程框架。与传统的Spring mvc不同WebFlux不需要Servlet API,在完全异步且无阻塞的通过Reactor项目实现Reactive Streams 规范。
WebFlux可以在有限资源下提高系统的吞吐量和伸缩性,这意味着在资源相同的情况下WebFlux可以处理更多的请求。
1.2 MVC和WebFlux的比较
(1) 工作方式:
mvc的工作流程:主线程接收到请求(request)-> ….. -> 返回数据. 整个过程是单线程阻塞的,在处理好数据后才返回数据,如果用户请求比较多,那么吞吐量就比较低。
(2)WebFlux:
WebFlux的工作流程:主线程得到请求-> 立即返回数据与函数的组合(Mono或Flux) -> 开启一个新Work线程准备数据 -> 执行业务操作 –> Work线程工作完成 —> 返回数据。
区别 | Spring mvc | Spring WebFlux |
地址映射 | @Controller @RequestMapping 等 | Router Functions 提供函数式的API,用于创建Router Handler Filter |
数据流 | Servlet API | Reactive Streams: 一种支持背压的异步数据量标准。WebFlux默认使用的是Reactor。 |
容器 | Tomcat Jetty Undertow | Tomcat Jetty Netty Undertow |
IO 模型 | 同步的、阻塞的IO | 异步非阻塞的IO |
吞吐量 | 低 | 高 |
数据库 | Sql NoSql | 支持NoSql,不支持Sql |
请求和响应 | HttpServletRequest和HttpServletResponse | ServletResponse和ServletRequest |
业务处理性能 | 相同 | 相同 |
2. 认识Mono和Flux
2.1 什么是Mono和Flux
Mono和Flux是Reactor中的两个基本概念。
Mono和Flux都是事件发布者,为消费者提供订阅接口。当有事件发生时,Mono和Flux会回调消费者的相应方法,然后通知消费者相应的事件。这也是响应式编程模型。
Mono和Flux用于处理异步数据流,它不是MVC中直接返回String或Object,而是将异步数据流包装成Mono或Flux对象。
2.2 Mono和Flux的区别
Flux可以发送多个item(例如:列表)。这些item可以经过若干算子(operators)后才被订阅。Mono只能发送一个item(例如:根据id查询)
Mono 主要用于返回单个数据,Flux用于返回多个数据。
二、开发WebFlux的流程:
2.1 配置WebFlux依赖:
org.springframework.boot spring-boot-starter-webflux
2.2 编写控制器:
mport org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import reactor.core.publisher.Mono;@RestControllerpublic class HelloController { @GetMapping("/") public Mono hello(){ return Mono.just("Hello WebFlux Test dev!"); }}
2.3 添加mongo依赖并创建实体类:
org.springframework.boot spring-boot-starter-data-mongodb-reactive
@Data@AllArgsConstructor@NoArgsConstructor@Documentpublic class User { @Id private String id; private String name; private Integer age;}
2.4 创建DAO
import com.example.webfluxdemo.entity.User;import org.springframework.data.mongodb.repository.ReactiveMongoRepository;public interface UserMongoDao extends ReactiveMongoRepository {}
2.5 编写Handler:
import org.springframework.http.MediaType;import org.springframework.stereotype.Component;import org.springframework.web.reactive.function.server.ServerRequest;import org.springframework.web.reactive.function.server.ServerResponse;import reactor.core.publisher.Mono;@Componentpublic class HelloWorldHandler { public Mono sayHello(ServerRequest serverRequest){ return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN) .body(Mono.just("This is WebFlux Demo!"),String.class); }}
2.6 编写路由:
@Configurationpublic class Router { @Resource private HelloWorldHandler helloWorldHandler; @Bean public RouterFunction getString(){ return route(GET("/hello"),req->helloWorldHandler.sayHello(req)); }}
2.7 编写控制器:
mport com.example.webfluxdemo.dao.UserMongoDao;import com.example.webfluxdemo.entity.User;import org.springframework.http.HttpStatus;import org.springframework.http.MediaType;import org.springframework.http.ResponseEntity;import org.springframework.web.bind.annotation.*;import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;import javax.annotation.PostConstruct;import javax.annotation.Resource;import java.time.Duration;import java.util.HashMap;import java.util.Map;import java.util.stream.Collectors;@RestController@RequestMapping("/user")public class UserController { @Resource private UserMongoDao userMongoDao; @GetMapping(value ="/list") public Flux findAll(){ return userMongoDao.findAll(); } @PostMapping("") public Mono create( User user){ return this.userMongoDao.save(user); } @GetMapping("/{id}") public Mono<ResponseEntity> getUserById(@PathVariable("id") String id){ return this.userMongoDao.findById(id) .map(getUser -> ResponseEntity.ok(getUser)) .defaultIfEmpty(ResponseEntity.notFound().build()); } @DeleteMapping("/{id}") public Mono<ResponseEntity> delete(@PathVariable("id")String id){ return userMongoDao.findById(id) .flatMap(existingUser -> userMongoDao.delete(existingUser) .then(Mono.just(new ResponseEntity(HttpStatus.OK))) ) .defaultIfEmpty(new ResponseEntity(HttpStatus.NOT_FOUND)); } @PutMapping("/{id}") public Mono update(@PathVariable("id")String id,User user){ return this.userMongoDao.findById(id) .flatMap(existingUser-> { existingUser.setName(user.getName()); existingUser.setAge(user.getAge()); return userMongoDao.save(user); }) .map(update-> new ResponseEntity(update,HttpStatus.OK)) .defaultIfEmpty(new ResponseEntity(HttpStatus.NOT_FOUND)); } @GetMapping(value ="/listdelay",produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux getAll(){ return userMongoDao.findAll().delayElements(Duration.ofSeconds(1)); }}