当前位置: 首页 > news >正文

Reactor 之 手把手教你 Spring Boot 整合 Reactor

Reactor 是一个完全非阻塞的 JVM 响应式编程基础,有着高效的需求管理(背压的形式)。它直接整合 Java8 的函数式 API,尤其是 CompletableFutureStream,还有 Duration 。提供了可组合的异步化序列 API — Flux (对于 [N] 个元素) and Mono (对于 [0|1] 元素) — 并广泛实现 响应式Stream 规范。

这次带大家从零开始,使用 Spring Boot 框架建立一个 Reactor 响应式项目。

1 创建项目

使用 https://start.spring.io/ 创建项目。添加依赖项:H2、Lombok、Spring Web、JPA、JDBC

然后导入 Reactor

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

2 集成 H2 数据库

application.properties 文件中添加 H2 数据连接信息。此外,端口使用 8081(随意,本地未被使用的端口即可)。

server.port=8081
################ H2 数据库 基础配置 ##############
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.url=jdbc:h2:~/user
spring.datasource.username=sa
spring.datasource.password=

spring.jpa.database=h2
spring.jpa.hibernate.ddl-auto=update
spring.h2.console.path=/h2-console
spring.h2.console.enable=true

3 创建测试类

3.1 user 实体

建立简单数据操作实体 User。

import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.*;

/**
 * @Author: prepared
 * @Date: 2022/8/29 21:40
 */
@Data
@NoArgsConstructor
@Table(name = "t_user")
@Entity
public class User {

	@Id
	@GeneratedValue(strategy = GenerationType.AUTO)
	private Long id;

	private String userName;

	private int age;

	private String sex;

	public User(String userName, int age, String sex) {
		this.userName = userName;
		this.age = age;
		this.sex = sex;
	}
}

3.2 UserRepository

数据模型层使用 JPA 框架。

import com.prepared.user.domain.User;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

/**
 * @Author: prepared
 * @Date: 2022/8/29 21:45
 */
@Repository
public interface UserRepository extends JpaRepository<User, Long> {
}

3.3 UserService

service 增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。所有接口返回 Mono/Flux 对象。

最佳实践:所有的第三方接口、IO 耗时比较长的操作都可以放在 Mono 对象中。

doOnError 监控异常情况;

doFinally 监控整体执行情况,如:耗时、调用量监控等。

import com.prepared.user.dao.UserRepository;
import com.prepared.user.domain.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

import javax.annotation.Resource;
import java.util.List;

/**
 * @Author: prepared
 * @Date: 2022/8/29 21:45
 */
@Service
public class UserService {

	private Logger logger = LoggerFactory.getLogger(UserService.class);

	@Resource
	private UserRepository userRepository;

	public Mono<Boolean> save(User user) {
		long startTime = System.currentTimeMillis();
		return Mono.fromSupplier(() -> {
					return userRepository.save(user) != null;
				})
				.doOnError(e -> {
					// 打印异常日志&增加监控(自行处理)
					logger.error("save.user.error, user={}, e", user, e);
				})
				.doFinally(e -> {
					// 耗时 & 整体健康
					logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime);
				});
	}

	public Mono<User> findById(Long id) {
		long startTime = System.currentTimeMillis();
		return Mono.fromSupplier(() -> {
					return userRepository.getReferenceById(id);
				}).doOnError(e -> {
					// 打印异常日志&增加监控(自行处理)
					logger.error("findById.user.error, id={}, e", id, e);
				})
				.doFinally(e -> {
					// 耗时 & 整体健康
					logger.info("findById.user.time={}, id={}", id, System.currentTimeMillis() - startTime);
				});
	}

	public Mono<List<User>> list() {
		long startTime = System.currentTimeMillis();
		return Mono.fromSupplier(() -> {
					return userRepository.findAll();
				}).doOnError(e -> {
					// 打印异常日志&增加监控(自行处理)
					logger.error("list.user.error, e", e);
				})
				.doFinally(e -> {
					// 耗时 & 整体健康
					logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime);
				});
	}
  
  public Flux<User> listFlux() {
		long startTime = System.currentTimeMillis();
		return Flux.fromIterable(userRepository.findAll())
				.doOnError(e -> {
					// 打印异常日志&增加监控(自行处理)
					logger.error("list.user.error, e", e);
				})
				.doFinally(e -> {
					// 耗时 & 整体健康
					logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime);
				});
	}
}

3.4 UserController

controller 增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。

list 方法还有另外一种写法,这就涉及到 Mono 和 Flux 的不同了。

返回List可以使用Mono<List<User>> ,也可以使用 Flux<User>

  • Mono<T> 是一个特定的 Publisher<T>,最多可以发出一个元素
  • Flux<T> 是一个标准的 Publisher<T>,表示为发出 0 到 N 个元素的异步序列
import com.prepared.user.domain.User;
import com.prepared.user.service.UserService;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author: prepared
 * @Date: 2022/8/29 21:47
 */
@RestController
public class UserController {

	@Resource
	private UserService userService;

	@RequestMapping("/add")
	public Mono<Boolean> add() {
		User user = new User("xiaoming", 10, "F");
		return userService.save(user) ;

	}

	@RequestMapping("/list")
	public Mono<List<User>> list() {
		return userService.list();
	}
}

	@RequestMapping("/listFlux")
	public Flux<User> listFlux() {
		return userService.listFlux();
	}

3.5 SpringReactorApplication 添加注解支持

Application 启动类添加注解 @EnableJpaRepositories

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;

/**
 * Hello world!
 */
@SpringBootApplication
@EnableJpaRepositories
public class SpringReactorApplication {
	public static void main(String[] args) {
		SpringApplication.run(SpringReactorApplication.class, args);
	}
}

测试

启动项目,访问 localhost:8081/add,正常返回 true。

查询所有数据,访问localhost:8081/list,可以看到插入的数据,已经查询出来了。PS:我这里执行了多次 add,所以有多条记录。

后台日志:

2022-09-05 20:13:17.385  INFO 15696 --- [nio-8082-exec-2] com.prepared.user.service.UserService    : list.user.time=181, 

执行了 UserService list() 方法的 doFinnally 代码块,打印耗时日志。

总结

响应式编程的优势是不会阻塞。那么正常我们的代码中有哪些阻塞的操作呢?

  1. Futureget() 方法;
  2. Reactor 中的 block() 方法,subcribe() 方法,所以在使用 Reactor 的时候,除非编写测试代码,否则不要直接调用以上两个方法;
  3. 同步方法调用,所以高并发情况下,会使用异步调用(如Future)来提升响应速度。

下一篇,讲解如何将熔断、限流框架 resilience4j 整合到项目中,敬请期待。

相关文章:

  • 【42STL-函数对象使用详情】
  • LVS-Nat模式实战
  • java毕业设计基于的测试项目管理平台Mybatis+系统+数据库+调试部署
  • 对于钾,钙,锌,铁,钠,镁金属离子荧光探针的详细知识整理如下
  • Soft Actor-Critic(SAC算法)
  • C语言的头文件的处理
  • 使用 DM binary 部署 DM 集群
  • iOS小技能:RSA签名、验签、加密、解密的原理
  • 使用 Argon2 的 Java 密码散列
  • 基于多次傅里叶变换算法的快速相位解包裹算法研究
  • Mybatis-Plus用纯注解搞定一对多查询
  • 6.CF431E Chemistry Experiment 权值线段树+二分
  • 基于RFID技术的智能书架系统
  • 1014 Circles of Friends
  • Linux 下进程间通讯之内存映射详解
  • CSS相对定位
  • Docker: 容器互访的三种方式
  • Flannel解读
  • Java 11 发布计划来了,已确定 3个 新特性!!
  • Javascript编码规范
  • k个最大的数及变种小结
  • MySQL主从复制读写分离及奇怪的问题
  • Netty 框架总结「ChannelHandler 及 EventLoop」
  • Vue.js 移动端适配之 vw 解决方案
  • yii2权限控制rbac之rule详细讲解
  • 阿里云应用高可用服务公测发布
  • 编写高质量JavaScript代码之并发
  • 开放才能进步!Angular和Wijmo一起走过的日子
  • 盘点那些不知名却常用的 Git 操作
  • 浅谈Kotlin实战篇之自定义View图片圆角简单应用(一)
  • 嵌入式文件系统
  • 如何学习JavaEE,项目又该如何做?
  • 小程序滚动组件,左边导航栏与右边内容联动效果实现
  • 移动端 h5开发相关内容总结(三)
  • 在Unity中实现一个简单的消息管理器
  • - 转 Ext2.0 form使用实例
  • Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九) ...
  • 从如何停掉 Promise 链说起
  • 微龛半导体获数千万Pre-A轮融资,投资方为国中创投 ...
  • # MySQL server 层和存储引擎层是怎么交互数据的?
  • #define 用法
  • $con= MySQL有关填空题_2015年计算机二级考试《MySQL》提高练习题(10)
  • (1) caustics\
  • (env: Windows,mp,1.06.2308310; lib: 3.2.4) uniapp微信小程序
  • (pojstep1.1.1)poj 1298(直叙式模拟)
  • (zt)最盛行的警世狂言(爆笑)
  • (全注解开发)学习Spring-MVC的第三天
  • (十一)c52学习之旅-动态数码管
  • (顺序)容器的好伴侣 --- 容器适配器
  • (转载)(官方)UE4--图像编程----着色器开发
  • .NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版
  • .net core 源码_ASP.NET Core之Identity源码学习
  • .NET Core、DNX、DNU、DNVM、MVC6学习资料
  • .NET Core引入性能分析引导优化
  • .NET MVC 验证码