当前位置: 首页 > news >正文

SpringBoot响应式编程(2)WebFlux入门

一、概述

1.1简介

简单来说,Webflux 是响应式编程的框架,与其对等的概念是 SpringMVC。两者的不同之处在于 Webflux 框架是异步非阻塞的,其可以通过较少的线程处理高并发请求。

WebFlux:底层完全基于netty+reactor+springweb 完成一个全异步非阻塞的web响应式框架

底层:异步 + 消息队列(内存) + 事件回调机制 = 整套系统

优点:能使用少量资源处理大量请求;

以前: 浏览器 --> Controller --> Service --> Dao: 阻塞式编程

现在: Dao(数据源查询对象【数据发布者】) --> Service --> Controller --> 浏览器: 响应式

1.2什么是异步 Servlet

在 Servlet3.0 之前,Servlet 采用 Thread-Per-Request 的方式处理 Http 请求,即每一次请求都是由某一个线程从头到尾负责处理。

如果一个请求需要进行 IO 操作,比如访问数据库、调用第三方服务接口等,那么其所对应的线程将同步地等待 IO 操作完成, 而 IO 操作是非常慢的,所以此时的线程并不能及时地释放回线程池以供后续使用,如果并发量很大的话,那肯定会造性能问题。

有了异步 Servlet 之后,后台 Servlet 的线程会被及时释放,释放之后又可以去接收新的请求,进而提高应用的并发能力。

1.3SSE

SSE 全称是 Server-Sent Events,它的作用和 WebSocket 的作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息,不同的是,WebSocket 是一种全双工通信协议,而 SSE 则是一种单工通信协议,即使用 SSE 只能服务器向浏览器推送信息流,浏览器如果向服务器发送信息,就是一个普通的 HTTP 请求。

使用 SSE,当服务端给客户端响应的时候,他不是发送一个一次性数据包,而是会发送一个数据流,这个时候客户端的连接不会关闭,会一直等待服务端发送过来的数据流,我们常见的视频播放其实就是这样的例子。

SSE 和 WebSocket 主要有如下区别:

  • SSE 使用 HTTP 协议,现有的服务器软件都支持。WebSocket 是一个独立协议。

  • SSE 属于轻量级,使用简单;WebSocket 协议相对复杂。

  • SSE 默认支持断线重连,WebSocket 需要自己实现。

  • SSE 一般只用来传送文本,二进制数据需要编码后传送,WebSocket 默认支持传送二进制数据。

  • SSE 支持自定义发送的消息类型。

二、快速入门

2.0创建工程

为了演示方便,松哥这里就直接采用 Spring Boot 工程了,首先我们创建一个 Spring Boot 工程,需要注意的是,以往创建 Spring Boot 时我们都是选择 Spring Web 依赖,但是这次我们选择 Spring Reactive Web 依赖,如下图:

添加上这一个依赖就 OK 了。

这个时候创建好的 Spring Boot 项目,底层容器是 Netty 而不是我们之前广泛使用的 Tomcat 了。

2.1添加依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>

2.2HttpHandler、HttpServer

public class FluxMainApplication {public static void main(String[] args) throws IOException {//快速自己编写一个能处理请求的服务器//1、创建一个能处理Http请求的处理器。 参数:请求、响应; 返回值:Mono<Void>:代表处理完成的信号HttpHandler handler = (ServerHttpRequest request,ServerHttpResponse response)->{URI uri = request.getURI();System.out.println(Thread.currentThread()+"请求进来:"+uri);//编写请求处理的业务,给浏览器写一个内容 URL + "Hello~!"
//            response.getHeaders(); //获取响应头
//            response.getCookies(); //获取Cookie
//            response.getStatusCode(); //获取响应状态码;
//            response.bufferFactory(); //buffer工厂
//            response.writeWith() //把xxx写出去
//            response.setComplete(); //响应结束//数据的发布者:Mono<DataBuffer>、Flux<DataBuffer>//创建 响应数据的 DataBufferDataBufferFactory factory = response.bufferFactory();//数据BufferDataBuffer buffer = factory.wrap(new String(uri.toString() + " ==> Hello!").getBytes());// 需要一个 DataBuffer 的发布者return response.writeWith(Mono.just(buffer));};//2、启动一个服务器,监听8080端口,接受数据,拿到数据交给 HttpHandler 进行请求处理ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);//3、启动Netty服务器HttpServer.create().host("localhost").port(8080).handle(adapter) //用指定的处理器处理请求.bindNow(); //现在就绑定System.out.println("服务器启动完成....监听8080,接受请求");System.in.read();System.out.println("服务器停止....");}
}

2.3DispatcherHandler

SpringMVC: DispatcherServlet;

SpringWebFlux: DispatcherHandler

package com.yanyu.webflux.controller;import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.reactive.result.view.Rendering;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.time.Duration;/*** @author lfy* @Description* @create 2023-12-01 20:52*/
@ResponseBody@Controller
public class HelloController {//WebFlux: 向下兼容原来SpringMVC的大多数注解和API;@GetMapping("/hello")public String hello(@RequestParam(value = "key",required = false,defaultValue = "哈哈") String key,ServerWebExchange exchange,WebSession webSession,HttpMethod method,HttpEntity<String> entity,@RequestBody String s,FilePart file){//        file.transferTo() //零拷贝技术;ServerHttpRequest request = exchange.getRequest();ServerHttpResponse response = exchange.getResponse();String name = method.name();Object aaa = webSession.getAttribute("aaa");webSession.getAttributes().put("aa","nn");return "Hello World!!! key="+key;}// Rendering:一种视图对象。@GetMapping("/bai")public Rendering render(){
//        Rendering.redirectTo("/aaa"); //重定向到当前项目根路径下的 aaareturn   Rendering.redirectTo("http://www.baidu.com").build();}//现在推荐的方式//1、返回单个数据Mono: Mono<Order>、User、String、Map//2、返回多个数据Flux: Flux<Order>//3、配合Flux,完成SSE: Server Send Event; 服务端事件推送@GetMapping("/haha")public Mono<String> haha(){//        ResponseEntity.status(305)
//                .header("aaa","bbb")
//                .contentType(MediaType.APPLICATION_CBOR)
//                .body("aaaa")
//                .return Mono.just(0).map(i-> 10/i).map(i->"哈哈-"+i);}@GetMapping("/hehe")public Flux<String> hehe(){return Flux.just("hehe1","hehe2");}//text/event-stream//SSE测试; chatgpt都在用; 服务端推送@GetMapping(value = "/sse",produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<ServerSentEvent<String>> sse(){return Flux.range(1,10).map(i-> {//构建一个SSE对象return    ServerSentEvent.builder("ha-" + i).id(i + "").comment("hei-" + i).event("haha").build();}).delayElements(Duration.ofMillis(500));}//SpringMVC 以前怎么用,基本可以无缝切换。// 底层:需要自己开始编写响应式代码}

2.4错误处理

 @GetMapping("/haha")public Mono<String> haha(){//        ResponseEntity.status(305)
//                .header("aaa","bbb")
//                .contentType(MediaType.APPLICATION_CBOR)
//                .body("aaaa")
//                .return Mono.just(0).map(i-> 10/i).map(i->"哈哈-"+i);}
    @ExceptionHandler(ArithmeticException.class)public String error(ArithmeticException exception){System.out.println("发生了数学运算异常"+exception);//返回这些进行错误处理;
//        ProblemDetail:  建造者:声明式编程、链式调用
//        ErrorResponse : return "炸了,哈哈...";}

2.6常用注解

1、目标方法传参

Method Arguments :: Spring Framework

Controller method argument

Description

ServerWebExchange

封装了请求和响应对象的对象; 自定义获取数据、自定义响应

ServerHttpRequest, ServerHttpResponse

请求、响应

WebSession

访问Session对象

java.security.Principal

org.springframework.http.HttpMethod

请求方式

java.util.Locale

国际化

java.util.TimeZone + java.time.ZoneId

时区

@PathVariable

路径变量

@MatrixVariable

矩阵变量

@RequestParam

请求参数

@RequestHeader

请求头;

@CookieValue

获取Cookie

@RequestBody

获取请求体,Post、文件上传

HttpEntity<B>

封装后的请求对象

@RequestPart

获取文件上传的数据 multipart/form-data.

java.util.Map, org.springframework.ui.Model, and org.springframework.ui.ModelMap.

Map、Model、ModelMap

@ModelAttribute

Errors, BindingResult

数据校验,封装错误

SessionStatus + class-level @SessionAttributes

UriComponentsBuilder

For preparing a URL relative to the current request’s host, port, scheme, and context path. See URI Links.

@SessionAttribute

@RequestAttribute

转发请求的请求域数据

Any other argument

所有对象都能作为参数:

1、基本类型 ,等于标注@RequestParam

2、对象类型,等于标注 @ModelAttribute

2、返回值写法

sse和websocket区别:

  • SSE:单工;请求过去以后,等待服务端源源不断的数据
  • websocket:双工: 连接建立后,可以任何交互;

Controller method return value

Description

@ResponseBody

把响应数据写出去,如果是对象,可以自动转为json

HttpEntity<B>, ResponseEntity<B>

ResponseEntity:支持快捷自定义响应内容

HttpHeaders

没有响应内容,只有响应头

ErrorResponse

快速构建错误响应

ProblemDetail

SpringBoot3;

String

就是和以前的使用规则一样;

forward: 转发到一个地址

redirect: 重定向到一个地址

配合模板引擎

View

直接返回视图对象

java.util.Map, org.springframework.ui.Model

以前一样

@ModelAttribute

以前一样

Rendering

新版的页面跳转API; 不能标注 @ResponseBody 注解

void

仅代表响应完成信号

Flux<ServerSentEvent>, Observable<ServerSentEvent>, or other reactive type

使用 text/event-stream 完成SSE效果

Other return values

未在上述列表的其他返回值,都会当成给页面的数据;

2.7文件上传

@PostMapping("/")
public String handle(@RequestPart("meta-data") Part metadata, @RequestPart("file-data") FilePart file) { // ...
}

2.8自定义Flux配置

WebFluxConfigurer

容器中注入这个类型的组件,重写底层逻辑

@Configuration
public class MyWebConfiguration {//配置底层@Beanpublic WebFluxConfigurer webFluxConfigurer(){return new WebFluxConfigurer() {@Overridepublic void addCorsMappings(CorsRegistry registry) {registry.addMapping("/**").allowedHeaders("*").allowedMethods("*").allowedOrigins("localhost");}};}
}

2.9 Filter

@Component
public class MyWebFilter implements WebFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {ServerHttpRequest request = exchange.getRequest();ServerHttpResponse response = exchange.getResponse();System.out.println("请求处理放行到目标方法之前...");Mono<Void> filter = chain.filter(exchange); //放行//流一旦经过某个操作就会变成新流Mono<Void> voidMono = filter.doOnError(err -> {System.out.println("目标方法异常以后...");}) // 目标方法发生异常后做事.doFinally(signalType -> {System.out.println("目标方法执行以后...");});// 目标方法执行之后//上面执行不花时间。return voidMono; //看清楚返回的是谁!!!}
}

三、WebFlux  CURD 实战(mongodb)

3.1概述

WebFlux 最为人所诟病的是数据库的支持问题,毕竟数据是一个应用的生命,我们接触的大部分应用程序都是有数据库的,而 WebFlux 在这一方面的支持行一直比较弱,这也是大家总是吐槽它的原因。

不过从 Spring5 开始,这一问题得到了一定程度的缓解。

Spring 官方在 Spring5 发布了响应式 Web 框架 Spring WebFlux 之后急需能够满足异步响应的数据库交互 API,不过由于缺乏标准和驱动,Pivotal 团队开始自己研究响应式关系型数据库连接 Reactive Relational Database Connectivity,并提出了 R2DBC 规范 API 用来评估可行性并讨论数据库厂商是否有兴趣支持响应式的异步非阻塞驱动程序。最早只有 PostgreSQL 、H2、MSSQL 三家数据库厂商,不过现在 MySQL 也加入进来了,这是一个极大的利好。目前 R2DBC 的最新版本是 0.9.0.RELEASE。

3.2相关概念

基于spring-data-mongodb-reactive响应式框架实现与mongodb的互交。ReactiveMongoRepository<T, ID>支持响应式编程并提供基础功能的接口,类似JpaRepository/BaseMapper接口。基于Reactive编程时,查询返回多条记录封装在`Flux<T>`而非`Flux<List<T>>`。 再通过collectList()方法将元素聚合为单一`Mono<List<T>>`类型聚合语句声明在Repository接口查询方法的@Aggregation注解中。java11不支持文本块,可读性太差了。mongodb日期时间以UTC计算,并自动转换本地时间存储。但在获取时,spring-data-mongodb会转换回本地时间。  
因此,涉及日期时间的必须通过业务逻辑操作。

3.3环境配置

maven依赖

yml依赖

spring:application:name: mongodb-examplesdata:mongodb:host: 47.96.254.46port: 27017database: testusername: mongopassword: '1213' # 密码按char[]处理。纯数字要加单引号,最好都加单引号authentication-database: admin
#       uri: mongodb://mongo:1213@192.168.1.8/test?authSource=admin # 等效连接auto-index-creation: true # 仅声明索引注解无效,必须显式声明。logging:level:root: warncom:example: debugpattern:console: '%-5level %C.%M[%line] - %msg%n'

3.4CRUD

实体类

@Document
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class user {@Idprivate String id;private String username;private String address;
}

Repository


@EnableMongoRepositories
public interface UserDao extends ReactiveMongoRepository<User,String> {
}

Controller

@RestController
@RequestMapping("/user")
public class UserController {@AutowiredUserDao userDao;@PostMapping("/add")public Mono<user> addUser(@RequestBody user user) {return userDao.save(user);}@GetMapping("/getall")public Flux<user> getAll() {return userDao.findAll();}@GetMapping(value = "/stream/all", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<user> streamGetAll() {return userDao.findAll();}@DeleteMapping("/delete/{id}")public Mono<ResponseEntity<Void>> deleteUser(@PathVariable String id) {return userDao.findById(id).flatMap(user -> userDao.delete(user).then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK)))).defaultIfEmpty(new ResponseEntity(HttpStatus.NOT_FOUND));}@PutMapping("/update")public Mono<ResponseEntity<user>> updateUser(@RequestBody user user) {return userDao.findById(user.getId()).flatMap(u -> userDao.save(user)).map(u->new ResponseEntity<user>(u,HttpStatus.OK)).defaultIfEmpty(new ResponseEntity(HttpStatus.NOT_FOUND));}
}

四、WebFlux   实战(redis)

4.1概述

hkey的序列化不能使用StringRedisSerializer,否则long就无法序列化  
不使用data-repository,仅使用redistemplate手动操作redis时,一定在配置手动关repositories,
否则无法创建redisReferenceResolver对象,无法将ReactiveRedisTemplate类型转为RedisOperations类型  向hash添加键值对,hkey不存返回true,存在返回false,但是会修改覆盖值  

4.2环境配置

maven依赖

  <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis-reactive</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-json</artifactId><exclusions><exclusion><groupId>org.springframework</groupId><artifactId>spring-web</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency>

yml配置

spring:application:name: reactive-redis-examplesredis:host: 116.63.13.220port: 3390database: 1username: defaultpassword: 2046jackson:default-property-inclusion: non_nulldata:redis:repositories:enabled: false # 关闭data-repository,使用redistemplatelogging:level:root: infocom:example: debugpattern:console: '%-5level %C.%M[%line] - %msg%n'

redis配置

@Configuration
@Slf4j
public class RedisConfig {@Beanpublic ReactiveRedisTemplate<String, Object> redisTemplate(ReactiveRedisConnectionFactory cf) {Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);// 创建jackson配置,对象ObjectMapper mapper = new ObjectMapper();// 序列化时,忽略空值属性。mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);// 反序列化时,忽略不存在属性。即,缓存中有而类中没有的属性,不会异常mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);// 序列化时,同时记录类型的全限定性类名。否则按Map反序列化PolymorphicTypeValidator ptv =BasicPolymorphicTypeValidator.builder().allowIfSubType(Object.class).build();mapper.activateDefaultTyping(ptv, ObjectMapper.DefaultTyping.NON_FINAL);// 按ISO字符串序列化/反序列化日期时间,而非对象mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);mapper.registerModule(new JavaTimeModule());serializer.setObjectMapper(mapper);RedisSerializationContext.RedisSerializationContextBuilder<String, Object> builder =RedisSerializationContext.newSerializationContext();// hkey的序列化不能使用StringRedisSerializer,否则long就无法序列化builder.hashKey(serializer);builder.hashValue(serializer);builder.key(new StringRedisSerializer());builder.value(serializer);RedisSerializationContext<String, Object> context = builder.build();return new ReactiveRedisTemplate<>(cf, context);}}

4.3CRUD

实体类

@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Item {private Long id;private int total;
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 文心快码(Baidu Comate)快速创建数据可视化图表
  • CSS的:host伪类:精确定位于Web组件的指南
  • 00_remipi_软件评估记录
  • 计算机基础知识复习8.13
  • 基于 HTTP构建 YUM网络源
  • 使用ITextRenderer导出PDF后无法打开问题,提示‘无法打开此文件‘
  • JVM性能监控工具
  • 实现异形(拱形)轮播图
  • 高性能内存对象缓存
  • 数据中心存储市场迎强劲反弹
  • 网络如何发送一个数据包
  • Ubuntu安装cuda
  • 【Qt开发】创建并打开子窗口(QWidget)的注意事项 禁止其他窗口点击、隐藏窗口、子窗口不退出的配置和解决方案
  • TypeScript 快速上⼿ (2)
  • SiLM5932SHO系列SiLM5932SHOCG-DG 12A/12A强劲驱动电流能力 支持主动短路保护功能(ASC)单通道隔离门极驱动器
  • 《微软的软件测试之道》成书始末、出版宣告、补充致谢名单及相关信息
  • 0基础学习移动端适配
  • 2017 前端面试准备 - 收藏集 - 掘金
  • 4个实用的微服务测试策略
  • Cumulo 的 ClojureScript 模块已经成型
  • Linux CTF 逆向入门
  • Mysql5.6主从复制
  • RxJS 实现摩斯密码(Morse) 【内附脑图】
  • SegmentFault 社区上线小程序开发频道,助力小程序开发者生态
  • Sublime Text 2/3 绑定Eclipse快捷键
  • SwizzleMethod 黑魔法
  • Unix命令
  • vue从入门到进阶:计算属性computed与侦听器watch(三)
  • vue--为什么data属性必须是一个函数
  • webpack4 一点通
  • Work@Alibaba 阿里巴巴的企业应用构建之路
  • 开源SQL-on-Hadoop系统一览
  • 力扣(LeetCode)965
  • 三分钟教你同步 Visual Studio Code 设置
  • 微服务框架lagom
  • 我建了一个叫Hello World的项目
  • 容器镜像
  • ​MySQL主从复制一致性检测
  • # 深度解析 Socket 与 WebSocket:原理、区别与应用
  • #gStore-weekly | gStore最新版本1.0之三角形计数函数的使用
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第5节(封闭类和Final方法)
  • (附源码)springboot车辆管理系统 毕业设计 031034
  • (附源码)springboot宠物管理系统 毕业设计 121654
  • (考研湖科大教书匠计算机网络)第一章概述-第五节1:计算机网络体系结构之分层思想和举例
  • (转)重识new
  • (轉貼) UML中文FAQ (OO) (UML)
  • (自适应手机端)响应式服装服饰外贸企业网站模板
  • .net core控制台应用程序初识
  • .Net程序帮助文档制作
  • @DependsOn:解析 Spring 中的依赖关系之艺术
  • [ element-ui:table ] 设置table中某些行数据禁止被选中,通过selectable 定义方法解决
  • []利用定点式具实现:文件读取,完成不同进制之间的
  • [2016.7 test.5] T1
  • [BZOJ] 2006: [NOI2010]超级钢琴
  • [C#]winform利用seetaface6实现C#人脸检测活体检测口罩检测年龄预测性别判断眼睛状态检测