使用disruptor队列实现本地异步消费
disruptor不同于其他的消息队列中间件,不需要额外安装消息中间件服务转件;可以直接在项目中引入依赖包,就可以本地实现生产者和消费者模式,但是不能跨项目分布式进行数据传输同步;像rabbitmq,rocketmq,activemq和kafka这类的消息中间件可以称为分布式的消息中间件,可以跨项目,跨服务器使用,不过disruptor就是本地式的消息队列,不具备跨项目分布式的能力。
接下来详细介绍disruptor的用法;首先在项目中引入依赖
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
然后定义一个需要作为传输数据的对象,可以是任意的java对象
例如:
package com.dcboot.module.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.dcboot.base.config.dao.mapping.BaseModel;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalDateTime;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author xiaomifeng1010
* @date: 2022/7/27 15:16
* @version 1.0
* @Description
*/
/**
* 企业信息总表
*/
@ApiModel(value="com-dcboot-module-entity-EnterpriseInfo")
@Data
@EqualsAndHashCode(callSuper=true)
@TableName(value = "enterprise_info")
public class EnterpriseInfo extends BaseModel<EnterpriseInfo> implements Serializable {
private static final long serialVersionUID = 4719777992811345898L;
/**
* 企业名称
*/
@TableField(value = "enterprise_name")
@ApiModelProperty(value="企业名称")
private String enterpriseName;
/**
* 统一社会信用代码
*/
@TableField(value = "uscc")
@ApiModelProperty(value="统一社会信用代码")
private String uscc;
/**
* 法定代表人
*/
@TableField(value = "legal_representative")
@ApiModelProperty(value="法定代表人")
private String legalRepresentative;
/**
* 所属行业
*/
@TableField(value = "industry")
@ApiModelProperty(value="所属行业")
private String industry;
/**
* 属地镇街
*/
@TableField(value = "town_street")
@ApiModelProperty(value="属地镇街")
private String townStreet;
/**
* 注册日期
*/
@TableField(value = "registry_date")
@ApiModelProperty(value="注册日期")
private LocalDate registryDate;
public static final String COL_ID = "id";
public static final String COL_ENTERPRISE_NAME = "enterprise_name";
public static final String COL_USCC = "uscc";
public static final String COL_LEGAL_REPRESENTATIVE = "legal_representative";
public static final String COL_INDUSTRY = "industry";
public static final String COL_TOWN_STREET = "town_street";
public static final String COL_REGISTRY_DATE = "registry_date";
public static final String COL_CREATE_TIME = "create_time";
public static final String COL_CREATE_BY = "create_by";
public static final String COL_UPDATE_TIME = "update_time";
public static final String COL_UPDATE_BY = "update_by";
}
然后创建对应的对象工厂类
package com.dcboot.module.manage.disruptor.factory;
import com.dcboot.module.entity.EnterpriseInfo;
import com.lmax.disruptor.EventFactory;
/**
* @author xiaomifeng1010
* @version 1.0
* @date: 2022/8/9 10:00
* @Description
*/
public class EnterpriseEventFactory implements EventFactory<EnterpriseInfo> {
@Override
public EnterpriseInfo newInstance() {
return new EnterpriseInfo();
}
}
然后创建消费者(事件处理器)
package com.dcboot.module.manage.disruptor.handler;
import cn.hutool.json.JSONUtil;
import com.dcboot.module.entity.EnterpriseInfo;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
/**
* @author xiaomifeng1010
* @version 1.0
* @date: 2022/8/9 10:01
* @Description
*/
@Slf4j
public class EnterpriseLibIndexEventHandler implements EventHandler<EnterpriseInfo> , WorkHandler<EnterpriseInfo> {
@Override
public void onEvent(EnterpriseInfo enterpriseInfo, long sequence, boolean endOfBatch) {
try {
EnterpriseIndexLibMapper enterpriseIndexLibMapper = SpringUtil.getBean(EnterpriseIndexLibMapper.class);
EnterpriseIndexLib enterpriseIndexLib = new EnterpriseIndexLib();
BeanUtils.copyProperties(enterpriseInfo, enterpriseIndexLib);
log.info("消费到对象enterpriseInfo:{}", JSONUtil.toJsonPrettyStr(enterpriseInfo));
LocalDateTime now = LocalDateTime.now();
enterpriseIndexLib.setCreateTime(now);
enterpriseIndexLib.setUpdateTime(now);
enterpriseIndexLib.setId(String.valueOf(enterpriseInfo.getId()));
log.info("sequence:{}",sequence);
log.info("enterpriseIndexLib:{}",JSONUtil.toJsonPrettyStr(enterpriseIndexLib));
log.info("开始往es中插入数据");
Integer integer = enterpriseIndexLibMapper.insert(enterpriseIndexLib);
String indexName = EnterpriseIndexLib.class.getAnnotation(IndexName.class).value();
log.info("成功插入{}条数据到ES的{}索引",integer,indexName);
} catch (Exception e) {
log.error("获取enterpriseInfo出错{}",e);
}
}
@Override
public void onEvent(EnterpriseInfo enterpriseInfo) {
try {
EnterpriseIndexLibMapper enterpriseIndexLibMapper = SpringUtil.getBean(EnterpriseIndexLibMapper.class);
EnterpriseIndexLib enterpriseIndexLib = new EnterpriseIndexLib();
BeanUtils.copyProperties(enterpriseInfo, enterpriseIndexLib);
log.info("消费到对象enterpriseInfo:{}", JSONUtil.toJsonPrettyStr(enterpriseInfo));
LocalDateTime now = LocalDateTime.now();
enterpriseIndexLib.setCreateTime(now);
enterpriseIndexLib.setUpdateTime(now);
enterpriseIndexLib.setId(String.valueOf(enterpriseInfo.getId()));
log.info("enterpriseIndexLib:{}",JSONUtil.toJsonPrettyStr(enterpriseIndexLib));
log.info("开始往es中插入数据");
Integer integer = enterpriseIndexLibMapper.insert(enterpriseIndexLib);
String indexName = EnterpriseIndexLib.class.getAnnotation(IndexName.class).value();
log.info("成功插入{}条数据到ES的{}索引",integer,indexName);
} catch (Exception e) {
log.error("获取enterpriseInfo出错{}",e);
}
}
}
注意实现了EventHandler接口的消费者,只能是普通的消费者,如果在ringbuffer中指定消费者时,即使指定了多个消费者,本质上实现的是消息广播模式,同一条消息,多个消费者同时消费,也就是说虽然有多个消费者(可以配置成多个消费者是多个线程),但实际上多个消费者同时消费的是同一个消息,不是多个消费者同时处理不同的多条消息,如果是要多个消费者同时处理的是多个不同的消息,则需要实现WorkHandler接口
接下来创建一个disruptor配置类
ackage com.dcboot.module.manage.disruptor.configuration;
import com.dcboot.module.entity.EnterpriseInfo;
import com.dcboot.module.manage.disruptor.factory.EnterpriseEventFactory;
import com.dcboot.module.manage.disruptor.handler.EnterpriseLibIndexEventHandler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.*;
/**
* @author xiaomifeng1010
* @version 1.0
* @date: 2022/8/9 10:25
* @Description
*/
@Configuration
public class DisruptorConfig {
@Bean
public RingBuffer<EnterpriseInfo> enterpriseInfoRingBuffer() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("queue-thread-pool-%d").build();
// ExecutorService threadPoolExecutor = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
//指定事件工厂
EnterpriseEventFactory factory = new EnterpriseEventFactory();
//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
int bufferSize = 1024 * 256;
//单线程模式,获取额外的性能
Disruptor<EnterpriseInfo> disruptor = new Disruptor<>(factory, bufferSize, namedThreadFactory,
ProducerType.SINGLE, new BlockingWaitStrategy());
EnterpriseLibIndexEventHandler[] arr1 =new EnterpriseLibIndexEventHandler[10];
for(int i=0;i<10;i++) {
arr1[i] = new EnterpriseLibIndexEventHandler();
}
//设置多线程消费者
disruptor.handleEventsWithWorkerPool(arr1);
// 启动disruptor线程
disruptor.start();
//获取ringbuffer环,用于接取生产者生产的事件
RingBuffer<EnterpriseInfo> ringBuffer = disruptor.getRingBuffer();
return ringBuffer;
}
}
关键的一步设置是 //设置多线程消费者
disruptor.handleEventsWithWorkerPool(arr1);
这样才是开启了10个线程同时处理不同的数据
而如果使用
disruptor.handleEventsWith(arr1);
则是开启10个线程,但是每次10个消费者都是消费同一条数据,无法同时处理多条数据 因为这个api方法的参数是workhandler类型 disruptor.handleEventsWithWorkerPool(arr1); 而disruptor.handleEventsWith(arr1);传入的参数是eventhandler类型
然后具体的调用方法:
package com.dcboot.module.manage.disruptor.controller;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.dcboot.module.common.service.EnterpriseInfoService;
import com.dcboot.module.entity.EnterpriseInfo;
import com.lmax.disruptor.RingBuffer;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* @author xiaomifeng1010
* @version 1.0
* @date: 2022/8/9 10:41
* @Description
*/
@RestController
@Api(tags = "测试disruptor")
@RequestMapping("/testDisruptor")
@AllArgsConstructor
@Slf4j
public class DisruptorTestController {
private RingBuffer<EnterpriseInfo> enterpriseInfoRingBuffer;
private EnterpriseInfoService enterpriseInfoService;
@PostMapping("/web/insertEnterprise")
@ApiOperation("从mysql中同步数据到ElasticSearch")
public void testSendMessage() {
List<EnterpriseInfo> enterpriseInfoList= enterpriseInfoService.list(Wrappers.<EnterpriseInfo> lambdaQuery()
.select(EnterpriseInfo::getEnterpriseName,EnterpriseInfo::getUscc,EnterpriseInfo ::getId));
String tableName = EnterpriseInfo.class.getAnnotation(TableName.class).value();
log.info("从mysql中{}表查出{}条数据",tableName,enterpriseInfoList.size());
for (EnterpriseInfo enterpriseInfo : enterpriseInfoList) {
long next = enterpriseInfoRingBuffer.next();
EnterpriseInfo enterpriseInfo1 = enterpriseInfoRingBuffer.get(next);
enterpriseInfo1.setEnterpriseName(enterpriseInfo.getEnterpriseName());
enterpriseInfo1.setUscc(enterpriseInfo.getUscc());
enterpriseInfo1.setId(enterpriseInfo.getId());
// 往disruptor队列中放数据
enterpriseInfoRingBuffer.publish(next);
}
}
}
这里为了方便写了一个接口测试类,测试方便,项目中我使用的是定时任务,每天固定时间去同步数据:
package com.dcboot.module.manage.schedule;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.dcboot.module.common.service.EnterpriseInfoService;
import com.dcboot.module.entity.EnterpriseInfo;
import com.lmax.disruptor.RingBuffer;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author xiaomifeng1010
* @version 1.0
* @date: 2022/8/9 9:43
* @Description
*/
@Slf4j
@Component
@AllArgsConstructor
public class DataSyncTaskSchedule {
private RingBuffer<EnterpriseInfo> enterpriseInfoRingBuffer;
private EnterpriseInfoService enterpriseInfoService;
/**
* @param
* @description: 每周日从mysql更新同步企业档案信息到elasticsearch
* @author: pengshaoshuan
* @date: 2022/8/9
* @return: void
**/
@Scheduled(cron = "${cron.sync-task}")
public void asyncEnterpriseArchive() {
List<EnterpriseInfo> enterpriseInfoList = enterpriseInfoService.list(Wrappers.<EnterpriseInfo>lambdaQuery()
.select(EnterpriseInfo::getEnterpriseName, EnterpriseInfo::getUscc, EnterpriseInfo::getId));
String tableName = EnterpriseInfo.class.getAnnotation(TableName.class).value();
log.info("从mysql中{}表查出{}条数据", tableName, enterpriseInfoList.size());
for (EnterpriseInfo enterpriseInfo : enterpriseInfoList) {
long next = enterpriseInfoRingBuffer.next();
EnterpriseInfo enterpriseInfo1 = enterpriseInfoRingBuffer.get(next);
enterpriseInfo1.setEnterpriseName(enterpriseInfo.getEnterpriseName());
enterpriseInfo1.setUscc(enterpriseInfo.getUscc());
enterpriseInfo1.setId(enterpriseInfo.getId());
enterpriseInfoRingBuffer.publish(next);
}
}
}
cron表达式配置在yml文件中
还有就是实现的EnterpriseLibIndexEventHandler 类中涉及到的三个类
EnterpriseIndexLib和EnterpriseIndexLibMapper以及SpringUtil
创建EnterpriseIndexLib和EnterpriseIndexLibMapper需要再引入一下ES客户端操作jar包,需要操作Elastic Search
<dependency>
<groupId>cn.easy-es</groupId>
<artifactId>easy-es-boot-starter</artifactId>
<version>0.9.60</version>
</dependency>
可以使用这个easy-es来替代spring官方的spring-data-elasticsearch
创建ES索引对应的实体类: EnterpriseIndexLib
package com.dcboot.module.esindex;
import cn.easyes.annotation.IndexField;
import cn.easyes.annotation.IndexId;
import cn.easyes.annotation.IndexName;
import cn.easyes.common.constants.Analyzer;
import cn.easyes.common.enums.FieldStrategy;
import cn.easyes.common.enums.FieldType;
import cn.easyes.common.enums.IdType;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author xiaomifeng1010
* @version 1.0
* @date: 2022/7/27 20:36
* @Description
*/
@Data
@IndexName("enterprise_index_lib")
@JsonIgnoreProperties(ignoreUnknown = true)
public class EnterpriseIndexLib {
@IndexId(type = IdType.CUSTOMIZE)
private String id;
@IndexField(strategy = FieldStrategy.NOT_EMPTY,fieldType = FieldType.KEYWORD_TEXT,analyzer = Analyzer.IK_SMART,searchAnalyzer = Analyzer.IK_SMART)
private String enterpriseName;
@IndexField(strategy = FieldStrategy.NOT_EMPTY,fieldType = FieldType.KEYWORD)
private String uscc;
@IndexField(dateFormat = "yyyy-MM-dd HH:mm:ss",fieldType = FieldType.DATE)
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone ="GMT+8")
// 说明!不要使用java.uti包下的Date类型,存储到ES中的时候会被转换成Long类型存储
private LocalDateTime createTime;
@IndexField(dateFormat = "yyyy-MM-dd HH:mm:ss",fieldType = FieldType.DATE)
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone ="GMT+8")
private LocalDateTime updateTime;
}
easy-es的查询借鉴了mybatis-plus,所以查询依赖mapper接口,接着创建对应的查询mapper
package com.dcboot.module.esmapper;
import cn.easyes.core.conditions.interfaces.BaseEsMapper;
import com.dcboot.module.esindex.EnterpriseIndexLib;
/**
* @author xiaomifeng1010
* @version 1.0
* @date: 2022/7/27 20:51
* @Description
*/
public interface EnterpriseIndexLibMapper extends BaseEsMapper<EnterpriseIndexLib> {
}
还有一个SpringUtil工具类:
package com.dcboot.base.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component("springUtil")
public class SpringUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
public SpringUtil() {
}
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
public static <T> T getBean(Class<T> tClass) {
return getApplicationContext().getBean(tClass);
}
}
还有注意点就是启动类上加了mybatis的mapper扫描注解,是为了只扫描mysql的查询mapper,所以ES的mapper查询类需要另外创建package
package com.dcboot;
import cn.easyes.starter.register.EsMapperScan;
import com.dcboot.base.config.swagger.SwaggerConfig;
import com.dcboot.module.common.extend.controller.DeptExtendController;
import com.dcboot.module.util.CommonUtil;
import com.github.xiaoymin.knife4j.spring.annotations.EnableKnife4j;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.core.env.Environment;
import org.springframework.data.mongodb.config.EnableMongoAuditing;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.security.core.context.SecurityContextHolder;
/**
*
*/
@Slf4j
@SpringBootApplication
@EnableKnife4j
@EnableMongoAuditing
//只扫描mapper包,不要扫描mongodb的dao层
@MapperScan(basePackages = {"com.dcboot.module.common.mapper"})
@ComponentScan(value = {"com.dcboot.*"},
excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = SwaggerConfig.class))
@EnableScheduling
@EnableRetry
@EsMapperScan("com.dcboot.**.esmapper")
public class FSPInstallApplication {
public static void main(String[] args) {
ConfigurableApplicationContext application = SpringApplication.run(FSPInstallApplication.class, args);
log.info("SwaggerConfiguration:{}", application.getBeansOfType(DeptExtendController.class));
Environment env = application.getEnvironment();
String ip = CommonUtil.getInet4Address().getHostAddress();
String port = env.getProperty("server.port");
String path = env.getProperty("server.servlet.context-path");
log.error("http://" + ip + ":" + port + path);
log.error("swagger接口路径");
log.error("http://" + ip + ":" + port + path + "/swagger-ui.html");
log.error("knife4j接口路径");
log.error("http://" + ip + ":" + port + path + "/doc.html");
}
/**
* @description: 使异步任务可以继承安全上下文(SecurityContext);
* 注意,该方法只适用于spring 框架本身创建线程时使用(例如,在使用@Async方法时),这种方法才有效;
* 如果是在代码中手动创建线程,则需要使用{@link org.springframework.security.concurrent.DelegatingSecurityContextRunnable}
* 或者{@link org.springframework.security.concurrent.DelegatingSecurityContextCallable},
* 当然使用{@link org.springframework.security.concurrent.DelegatingSecurityContextExecutorService}
* 来转发安全上下文更好
* @author: pengshaoshuan
* @date: 2022/8/1
* @param
* @return: InitializingBean
**/
@Bean
public InitializingBean initializingBean(){
return () -> SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);
}
}
es的mapper类存放在
项目中的elastic-search连接配置信息
对应的elastic search的index表是这样的:
推荐使用这个国产的ElasticView可视化管理客户端,个人感觉比elasticsearch head和kibana更好用,更方便
如何安装部署可以到ElasticView官网查看,推荐使用docker安装,非常快速方便,首页是这样的
教程使用的是我们项目中的其中的一个index
以navicat形式可视化展示es的index,展示效果类似关系型数据库的表格形式(也比dbeaver的可视化好用)
使用disruptor的测试接口测试一下
从控制台打印的日志可以看出,disruptor正在往队列中存放从mysql总查询出来的数据,而handler则从队列中消费,然后存入到ES中
同时从queue-thread-pool-数字,可以看到多个线程的消费者,消费的数据是不同的