当前位置: 首页 > news >正文

Springboot 自定义线程池 ThreadPoolTaskExecutor

场景

假设 需要使用多线程清理es中的历史数据

知识

参数解释

 
  1. corePoolSize(核心线程数):线程池中的核心线程数量,即使线程池处于空闲状态,这些核心线程也不会被销毁。
  2. maximumPoolSize(最大线程数):线程池允许创建的最大线程数量。如果阻塞队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
  3. keepAliveTime(非核心线程的空闲时间):非核心线程等待keepAliveTime时间后还没有获取到任务就会自动销毁。
  4. unit(空闲时间单位)keepAliveTime的时间单位。
  5. workQueue(任务队列):用于保存等待执行的任务的阻塞队列。
  6. ThreadFactory(线程工厂):用于创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,后期方便定位问题。
  7. RejectedExecutionHandler(拒绝策略):当线程池中的线程达到maximumPoolSize,说明线程池处于饱和状态,此时仍然有任务提交过来,那么必须采取一种策略处理提交的新任务。

拒绝策略:当线程池中的线程达到最大数量,且任务队列已满时,需要采取一种策略来处理新提交的任务。ThreadPoolTaskExecutor提供了以下几种拒绝策略:

 
  1. AbortPolicy(默认拒绝策略):当实际线程数量大于维护队列中设定的数量时,将会触发拒绝任务的处理程序,它将抛出RejectedExecutionException
  2. DiscardPolicy:当实际线程数量大于维护队列中设定的数量时,新提交的任务将被静默丢弃。
  3. DiscardOldestPolicy:当实际线程数量大于维护队列中设定的数量时,队列中最老的任务将被静默丢弃,新任务将被放入队列。
  4. CallerRunsPolicy:当实际线程数量大于维护队列中设定的数量时,多出来的任务将由调用线程(主线程)处理。

实现方案

1.一个线程安全(保证时间段连贯)的参数生产方法(产生es 清理所需的时间段,索引,时间字段),定义一个外配置的json文件去做持久化记录时间段的位置(根据个人需要,完全可以摈弃)

2.一个es 清理的方法,我们用es的_delete_by_query 去处理

3.一个自定义线程池elasticsearchClearPool,定时任务去调用es 的清理方法 定时任务的线程为es-clear-main 清理的主线程,elasticsearchClearPool为es数据的清理的线程,

线程池的拒绝策略设置为:ThreadPoolExecutor.CallerRunsPolicy() (使用该策略保证清理时间的连贯性)

4.一个定时任务定时启动清理方法 使用fixedDelay  保证单一性

代码

只贴部分与文章相关的代码(存在冗余,使用按需改动即可)

自定义线程池配置


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;/*** @author sszdzq*/
@Configuration
public class ThreadPoolTaskConfig {@Bean(name = "elasticsearchClearPool")public Executor elasticsearchClearPool() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//核心线程池大小executor.setCorePoolSize(5);//最大线程数executor.setMaxPoolSize(9);//队列容量executor.setQueueCapacity(1);//活跃时间executor.setKeepAliveSeconds(60);//线程名字前缀executor.setThreadNamePrefix("es-clear-pool-");// setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务// CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown(true);executor.initialize();return executor;}}

 参数生成+清理方法


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import fri.bhlz.bean.BetweenTime;
import fri.bhlz.bean.EsParams;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;@Service
@Slf4j
public class ClearElasticSearchService {@Autowiredprivate RestTemplate restTemplate;@Value("${elasticsearch.index}")private String CLEAR_INDEX_FIELD;@Value("${elasticsearch.retention:90}")private String ES_DATA_RETENTION;@Value("${spring.elasticsearch.hostname}")private String HOST_NAME;@Value("${elasticsearch.clear:false}")private String clear;@Synchronizedpublic EsParams getClearParams(String indexFiled) {//读取jsonJSONObject js = cjrReadJson();String index = indexFiled.split(":")[0];String field = indexFiled.split(":")[1];String record = index + "-date";js = validJson(js, record);LocalDateTime st = js.getDate(record).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();LocalDateTime et = st.plusHours(1);EsParams es = new EsParams().setIndices(index).setClearTimeField(field);es.setClearStartTime(st);es.setClearEndTime(et);//持久化jsonjs.put(record, et.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));cjrWriteJson(js);log.debug("获取es时间:{} -> {}", es.getClearStartTime(), es.getClearEndTime());return es;}private JSONObject validJson(JSONObject js, String recordField) {js = ObjectUtils.isEmpty(js) ? new JSONObject() : js;if (!js.containsKey(recordField)) {js.put(recordField, LocalDateTime.now().minusYears(3).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));}return js;}@Async("elasticsearchClearPool")public ResponseEntity<JSONObject> deleteByQuery(EsParams j) {log.debug("开始处理{} {}->{}", j.getIndices(), j.getClearStartTime(), j.getClearEndTime());//构建查询体SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();//查询条件 时间范围RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(j.getClearTimeField());rangeQueryBuilder.lt(j.getClearEndTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));rangeQueryBuilder.gte(j.getClearStartTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));searchSourceBuilder.postFilter(rangeQueryBuilder);searchSourceBuilder.toString();log.debug("请求体:{}", searchSourceBuilder);HttpHeaders httpHeaders = new HttpHeaders();HttpEntity httpEntity = new HttpEntity(searchSourceBuilder.toString(), httpHeaders);ResponseEntity<JSONObject> resp = restTemplate.postForEntity("http://" + HOST_NAME + "/" + j.getIndices() + "/_delete_by_query", httpEntity, JSONObject.class);if (resp.getStatusCode().equals(HttpStatus.OK)) {log.debug(resp.getBody().toString(SerializerFeature.DisableCircularReferenceDetect));log.info(" Elasticsearch清理 {} {}->{} {} {}", j.getIndices(), j.getClearStartTime(), j.getClearEndTime(), resp.getStatusCode().value(), resp.getBody().getString("deleted"));}return resp;}private void getBetweenTime(BetweenTime bt) {bt.setStartTime(bt.getStartTime().plusHours(1));bt.setEndTime(bt.getStartTime().plusHours(1));bt.setStartTimeStr(bt.getStartTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));bt.setEndTimeStr(bt.getEndTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));}@Value("${clears.record-file}")private String recordPath;public JSONObject cjrReadJson() {createFile(recordPath);try (FileInputStream fis = new FileInputStream(recordPath)) {String jsonContent = new String(FileCopyUtils.copyToByteArray(fis), StandardCharsets.UTF_8);if (!StringUtils.hasText(jsonContent)) {return new JSONObject();}return JSON.parseObject(jsonContent);} catch (Exception e) {e.printStackTrace();return new JSONObject();}}public void cjrWriteJson(JSONObject j) {try (FileOutputStream fos = new FileOutputStream(recordPath)) {byte[] bytes = j.toString(SerializerFeature.PrettyFormat).getBytes(StandardCharsets.UTF_8);// 将字节数据写入文件FileCopyUtils.copy(bytes, fos);} catch (Exception e) {e.printStackTrace();}}private void createFile(String path) {try {File file = new File(path);if (!file.exists()) {file.createNewFile();}} catch (IOException e) {e.printStackTrace();}}
}

 定时任务

package fri.bhlz.schedule;import fri.bhlz.bean.EsParams;
import fri.bhlz.service.ClearElasticSearchService;
import fri.bhlz.service.ClearService;
import fri.bhlz.service.VerificationService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;@Component
@Slf4j
public class ScheduleTask {@Autowiredprivate ClearService clearService;@Autowiredprivate VerificationService verificationService;@Value("${elasticsearch.index}")private String CLEAR_INDEX_FIELD;@Value("${elasticsearch.retention:90}")private String ES_DATA_RETENTION;@Value("${elasticsearch.clear:false}")private String clear;@Autowiredprivate ClearElasticSearchService clearElasticSearchService;@Scheduled(initialDelay = 1500, fixedDelay = 1000 * 60 * 5)public void clearEsData() {Thread.currentThread().setName("es-clear-main");if (!Boolean.valueOf(clear)) {return;}log.info("clear elasticsearch start");LocalDateTime endTime = LocalDateTime.now().minusDays(Integer.parseInt(ES_DATA_RETENTION));for (String indexFiled : CLEAR_INDEX_FIELD.split(",")) {LocalDateTime now = endTime.minusDays(1);while (now.isBefore(endTime)) {EsParams params = clearElasticSearchService.getClearParams(indexFiled);clearElasticSearchService.deleteByQuery(params);now = params.getClearEndTime();}}log.info("clear elasticsearch end");}
}
spring:elasticsearch:hostname: 127.0.0.1:9200
elasticsearch:clear: trueindex: index_aa:create_time,index_bb:createtime,index_cc:save_timeretention: 90

注意事项

调用时一定要保证非本类的方法直接调用@Async标注的方法,不然会不生效

相关文章:

  • 标准库算法
  • Android 观察者模式(OBSERVER)应用详解
  • Spring与Netty底层源码解析
  • 一个基于HOOK机制的微信机器人
  • 论文阅读--ViLD
  • 力扣226. 翻转二叉树(DFS的两种思路)
  • 开源模型应用落地-模型量化-Qwen1.5-7B-Chat-GPTQ-Int8(一)
  • 初见flyway
  • MongoDB 和 MySQL 的对比
  • Flutter 页面布局 Flex Expanded弹性布局
  • 谷歌上架,个人号比企业号好上?“14+20”封测如何解决,你知道了吗
  • 基于RV1126的AI网络摄像机AHD、CVBS、HDMI接口的区别有哪些?支持8路AHD摄像头,支持AI实时分析
  • Python-温故知新
  • 2024上海国际化工自动化仪器仪表展览会
  • 数据结构_栈在括号匹配中的应用_代码
  • [原]深入对比数据科学工具箱:Python和R 非结构化数据的结构化
  • 【347天】每日项目总结系列085(2018.01.18)
  • 2017前端实习生面试总结
  • Docker入门(二) - Dockerfile
  • exports和module.exports
  • Hexo+码云+git快速搭建免费的静态Blog
  • java中具有继承关系的类及其对象初始化顺序
  • JS专题之继承
  • Just for fun——迅速写完快速排序
  • Linux编程学习笔记 | Linux多线程学习[2] - 线程的同步
  • mysql_config not found
  • Netty源码解析1-Buffer
  • Shell编程
  • socket.io+express实现聊天室的思考(三)
  • Spring核心 Bean的高级装配
  • SwizzleMethod 黑魔法
  • uni-app项目数字滚动
  • vue-cli3搭建项目
  • WePY 在小程序性能调优上做出的探究
  • 等保2.0 | 几维安全发布等保检测、等保加固专版 加速企业等保合规
  • 分享一个自己写的基于canvas的原生js图片爆炸插件
  • 机器学习 vs. 深度学习
  • 看图轻松理解数据结构与算法系列(基于数组的栈)
  • 实现简单的正则表达式引擎
  • 腾讯优测优分享 | 你是否体验过Android手机插入耳机后仍外放的尴尬?
  • 携程小程序初体验
  • 在Unity中实现一个简单的消息管理器
  • 阿里云服务器如何修改远程端口?
  • ​人工智能之父图灵诞辰纪念日,一起来看最受读者欢迎的AI技术好书
  • ​软考-高级-系统架构设计师教程(清华第2版)【第12章 信息系统架构设计理论与实践(P420~465)-思维导图】​
  • #控制台大学课堂点名问题_课堂随机点名
  • (1)(1.11) SiK Radio v2(一)
  • (2)(2.4) TerraRanger Tower/Tower EVO(360度)
  • (3)(3.5) 遥测无线电区域条例
  • (9)YOLO-Pose:使用对象关键点相似性损失增强多人姿态估计的增强版YOLO
  • (C语言)字符分类函数
  • (day18) leetcode 204.计数质数
  • (python)数据结构---字典
  • (void) (_x == _y)的作用
  • (vue)el-checkbox 实现展示区分 label 和 value(展示值与选中获取值需不同)