当前位置: 首页 > news >正文

Springboot整合Nacos实现动态线程池

又是美好的一天呀~
个人博客地址: huanghong.top

往下看看~

    • 内容简介
    • 代码实现
      • 配置文件
        • pom.xml
        • bootstrap.yaml
        • dynamic-threadpool.yaml
      • 配置Bean
        • ThreadPoolProperties
        • ThreadPoolConfig
      • 枚举
        • QueueTypeEnum
        • RejectedTypeEnum
      • refresher
      • 测试类
    • 备注内容

内容简介

基于Springboot整合Nacos(Config)构建项目,客户端监听Nacos服务端配置变化动态修改线程池相关参数。

代码实现

配置文件

pom.xml

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <!--nacos client 2.0.4-->
    <spring.boot.version>2.6.11</spring.boot.version>
    <spring.cloud.alibaba.version>2021.0.4.0</spring.cloud.alibaba.version>
    <spring.cloud.version>2021.0.4</spring.cloud.version>
</properties>

<dependencyManagement>
    <dependencies>
        <!--spring cloud-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring.cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <!--spring cloud alibaba-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>${spring.cloud.alibaba.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <!--springboot-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <!--web-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--nacos config-->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
    </dependency>
    <!--spring cloud bootstrap-->
    <!--启动bootstrap配置文件中配置无法加载-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-bootstrap</artifactId>
    </dependency>
    <!--配置注入提示问题-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
    </dependency>
    <!--lombok-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.26</version>
        <optional>true</optional>
    </dependency>
    <!--fastjson-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.25</version>
    </dependency>
</dependencies>

<repositories>
    <repository>
        <id>aliyun-releases</id>
        <name>ali_repo</name>
        <url>https://maven.aliyun.com/repository/public</url>
    </repository>
</repositories>

<build>
    <finalName>${artifactId}</finalName>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <version>2.3.7.RELEASE</version>
        </plugin>
    </plugins>
</build>

bootstrap.yaml

server:
  port: 17015
spring:
  cloud:
    nacos:
      config:
        server-addr: localhost:8848
        extension-configs:
          - dataId: dynamic-threadpool.yaml
            group: DEFAULT_GROUP
            #开启配置文件自动刷新
            refresh: true
#下述配置需要配置到nacos中(dynamic-threadpool.yaml)
dynamic:
  threadpool:
    #可以动态变更(threadPoolFactory也可以)
    corePoolSize: 1
    maximumPoolSize: 5
    keepAliveTime: 1000
    handlerType: CALLER_RUNS_POLICY
    #不能动态变更的配置内容
    unit: MILLISECONDS
    queueType: LINKED_BLOCKING_QUEUE
    #队列长度
    queueCapacity: 100

dynamic-threadpool.yaml

dynamic:
  threadpool:
  	#可以动态变更(threadPoolFactory也可以)
    corePoolSize: 1
    maximumPoolSize: 1
    keepAliveTime: 5000
    handlerType: CALLER_RUNS_POLICY
    #不能动态变更的配置内容
    unit: MILLISECONDS
    queueType: LINKED_BLOCKING_QUEUE
    #队列长度
    queueCapacity: 100

配置Bean

ThreadPoolProperties

/**
 * @Time 2023-03-30 11:09
 * Created by Huang
 * className: ThreadPoolProperties
 * Description: 定义线程池参数
 */
@Data
@Component
@ConfigurationProperties(prefix = "dynamic.threadpool")
public class ThreadPoolProperties {
    private Integer corePoolSize = 5;
    private Integer maximumPoolSize = 10;
    private Long keepAliveTime = 5000L;
    private TimeUnit unit = TimeUnit.MILLISECONDS;
    private Integer queueCapacity = 100;
    private QueueTypeEnum queueType;
    private RejectedTypeEnum handlerType;
}

ThreadPoolConfig

/**
 * @Time 2023-03-30 11:13
 * Created by Huang
 * className: ThreadPoolConfig
 * Description: 手动注入线程池Bean
 */
@Configuration
public class ThreadPoolConfig {

    @Bean
    public ThreadPoolExecutor dynamicThreadPoolExecutor(ThreadPoolProperties threadPoolProperties) {
        QueueTypeEnum queueType = threadPoolProperties.getQueueType();
        BlockingQueue<Runnable> blockingQueue = QueueTypeEnum.buildBlockingQueue(queueType.getName(), threadPoolProperties.getQueueCapacity());
        RejectedTypeEnum handlerType = threadPoolProperties.getHandlerType();
        RejectedExecutionHandler handler = RejectedTypeEnum.buildRejectedExecutionHandler(handlerType.getName());
        return new ThreadPoolExecutor(threadPoolProperties.getCorePoolSize(),
                threadPoolProperties.getMaximumPoolSize(),
                threadPoolProperties.getKeepAliveTime(),
                threadPoolProperties.getUnit(),
                blockingQueue,
                Executors.defaultThreadFactory(),
                handler);
    }

}

枚举

QueueTypeEnum

/**
 * @Time 2023-03-30 11:39
 * Created by Huang
 * className: QueueTypeEnum
 * Description: 阻塞队列枚举类
 */
@Slf4j
public enum QueueTypeEnum {
    ARRAY_BLOCKING_QUEUE("ArrayBlockingQueue"),

    LINKED_BLOCKING_QUEUE("LinkedBlockingQueue"),

    PRIORITY_BLOCKING_QUEUE("PriorityBlockingQueue"),

    DELAY_QUEUE("DelayQueue");

    private final String name;

    QueueTypeEnum(String name) {
        this.name = name;
    }

    public static BlockingQueue<Runnable> buildBlockingQueue(String name, int capacity) {
        BlockingQueue<Runnable> blockingQueue = null;
        if (Objects.equals(name, ARRAY_BLOCKING_QUEUE.getName())) {
            blockingQueue = new ArrayBlockingQueue<>(capacity);
        } else if (Objects.equals(name, LINKED_BLOCKING_QUEUE.getName())) {
            blockingQueue = new LinkedBlockingQueue<>(capacity);
        } else if (Objects.equals(name, PRIORITY_BLOCKING_QUEUE.getName())) {
            blockingQueue = new PriorityBlockingQueue<>(capacity);
        } else if (Objects.equals(name, DELAY_QUEUE.getName())) {
            blockingQueue = new DelayQueue();
        }

        if (blockingQueue != null) {
            return blockingQueue;
        }

        log.warn("Cannot find specified BlockingQueue {}", name);
        log.warn("use default BlockingQueue {LinkedBlockingQueue}");
        return new LinkedBlockingQueue<>(capacity);
    }
    
    public String getName() {
        return name;
    }
}

RejectedTypeEnum

/**
 * @Time 2023-03-30 11:39
 * Created by Huang
 * className: QueueTypeEnum
 * Description: 拒绝策略枚举类
 */
@Slf4j
public enum QueueTypeEnum {
    ARRAY_BLOCKING_QUEUE("ArrayBlockingQueue"),

    LINKED_BLOCKING_QUEUE("LinkedBlockingQueue"),

    PRIORITY_BLOCKING_QUEUE("PriorityBlockingQueue"),

    DELAY_QUEUE("DelayQueue");

    private final String name;

    QueueTypeEnum(String name) {
        this.name = name;
    }

    public static BlockingQueue<Runnable> buildBlockingQueue(String name, int capacity) {
        BlockingQueue<Runnable> blockingQueue = null;
        if (Objects.equals(name, ARRAY_BLOCKING_QUEUE.getName())) {
            blockingQueue = new ArrayBlockingQueue<>(capacity);
        } else if (Objects.equals(name, LINKED_BLOCKING_QUEUE.getName())) {
            blockingQueue = new LinkedBlockingQueue<>(capacity);
        } else if (Objects.equals(name, PRIORITY_BLOCKING_QUEUE.getName())) {
            blockingQueue = new PriorityBlockingQueue<>(capacity);
        } else if (Objects.equals(name, DELAY_QUEUE.getName())) {
            blockingQueue = new DelayQueue();
        }

        if (blockingQueue != null) {
            return blockingQueue;
        }

        log.warn("Cannot find specified BlockingQueue {}", name);
        log.warn("use default BlockingQueue {LinkedBlockingQueue}");
        return new LinkedBlockingQueue<>(capacity);
    }

    public String getName() {
        return name;
    }
}

refresher

/**
 * @Time 2023-03-30 11:23
 * Created by Huang
 * className: NacosRefresher
 * Description: 监听RefreshScopeRefreshedEvent事件更新线程池参数
 */
@Component
public class NacosRefresher implements SmartApplicationListener {
    @Autowired
    private ThreadPoolProperties threadPoolProperties;
    @Autowired
    ThreadPoolExecutor dynamicThreadPoolExecutor;

    @Override
    public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
        return RefreshScopeRefreshedEvent.class.isAssignableFrom(eventType);
    }

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if(event instanceof RefreshScopeRefreshedEvent){
            refreshThreadPoolProperties(threadPoolProperties);
        }
    }

    private void refreshThreadPoolProperties(ThreadPoolProperties threadPoolProperties) {
        //阻塞队列为final定义不能动态变更
        RejectedTypeEnum handlerType = threadPoolProperties.getHandlerType();
        RejectedExecutionHandler handler = RejectedTypeEnum.buildRejectedExecutionHandler(handlerType.getName());

        dynamicThreadPoolExecutor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
        dynamicThreadPoolExecutor.setMaximumPoolSize(threadPoolProperties.getMaximumPoolSize());
        dynamicThreadPoolExecutor.setKeepAliveTime(threadPoolProperties.getKeepAliveTime(), threadPoolProperties.getUnit());
        dynamicThreadPoolExecutor.setRejectedExecutionHandler(handler);
    }
}

测试类

/**
 * @Time 2023-03-30 12:32
 * Created by Huang
 * className: ThreadPoolController
 * Description:
 */
@Slf4j
@RestController
public class ThreadPoolController {
    @Autowired
    private ThreadPoolExecutor threadPoolExecutor;

    @GetMapping("/")
    public JSONObject dynamicThreadPoolTest() {
        int corePoolSize = threadPoolExecutor.getCorePoolSize();
        int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
        long keepAliveTime = threadPoolExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS);
        BlockingQueue<Runnable> blockingQueue = threadPoolExecutor.getQueue();
        RejectedExecutionHandler handler = threadPoolExecutor.getRejectedExecutionHandler();
        long taskCount = threadPoolExecutor.getTaskCount();
        int poolSize = threadPoolExecutor.getPoolSize();

        return new JSONObject() {{
            put("corePoolSize", corePoolSize);
            put("maximumPoolSize", maximumPoolSize);
            put("keepAliveTime", keepAliveTime);
            put("blockingQueue", blockingQueue.toString());
            put("handler", handler.toString());
            put("taskCount", taskCount);
            put("poolSize", poolSize);
        }};
    }

    @GetMapping("/submitTask/{content}")
    public JSONObject submitTask(@PathVariable String content) {
        threadPoolExecutor.execute(() -> {
            while (true) {
                log.info("thread: {} content: {}", Thread.currentThread().getName(), content);
                try {
                    TimeUnit.SECONDS.sleep(30);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        long taskCount = threadPoolExecutor.getTaskCount();
        int poolSize = threadPoolExecutor.getPoolSize();

        return new JSONObject(){{
            put("taskCount",taskCount);
            put("poolSize",poolSize);
            put("taskContent",content);
        }};
    }
}

备注内容

  1. 线程池中线程工厂ThreadFactory可动态变更;
  2. 如果执行任务过程中开启线程数量大于变更后的线程最大数量,线程池的执行任务数量依然为更改前的数量,但相关参数已完成变更;
  3. 阻塞队列、拒绝策略及其他相关线程池参数均可在上述代码基础上进行扩展

感谢阅读完本篇文章!!!
个人博客地址: huanghong.top

相关文章:

  • 软考中级到底有没有用?价值高吗?
  • java基础面试题(一)
  • 【算法题】831. 隐藏个人信息
  • arthas 源码分析 篇一 【结构篇】
  • HTML5 <abbr> 标签 和 HTML5 <applet> 标签
  • C++面向对象高级编程(上)
  • 安装和配置 Flask
  • Java --- 继承
  • Rabbitmq了解
  • 51单片机-LED篇
  • ThreeJS-3D引擎渲染从入门到入土 搞定前端前沿技术
  • chatGPT 会给程序员带来失业潮吗?
  • 2023美赛春季赛F题思路数据代码论文分享
  • 快速搭建python爬虫管理平台
  • 从 JDK 9 到 19,认识一个新的 Java 形态(内存篇)
  • AHK 中 = 和 == 等比较运算符的用法
  •  D - 粉碎叛乱F - 其他起义
  • ESLint简单操作
  • iBatis和MyBatis在使用ResultMap对应关系时的区别
  • jQuery(一)
  • js
  • vue.js框架原理浅析
  • webpack项目中使用grunt监听文件变动自动打包编译
  • 产品三维模型在线预览
  • 浮动相关
  • 高度不固定时垂直居中
  • 紧急通知:《观止-微软》请在经管柜购买!
  • 驱动程序原理
  • 听说你叫Java(二)–Servlet请求
  • 协程
  • 通过调用文摘列表API获取文摘
  • ​Linux Ubuntu环境下使用docker构建spark运行环境(超级详细)
  • ​直流电和交流电有什么区别为什么这个时候又要变成直流电呢?交流转换到直流(整流器)直流变交流(逆变器)​
  • !! 2.对十份论文和报告中的关于OpenCV和Android NDK开发的总结
  • #HarmonyOS:基础语法
  • #Z2294. 打印树的直径
  • (1)Nginx简介和安装教程
  • (2009.11版)《网络管理员考试 考前冲刺预测卷及考点解析》复习重点
  • (二十一)devops持续集成开发——使用jenkins的Docker Pipeline插件完成docker项目的pipeline流水线发布
  • (附源码)springboot高校宿舍交电费系统 毕业设计031552
  • (附源码)springboot家庭财务分析系统 毕业设计641323
  • (附源码)ssm考生评分系统 毕业设计 071114
  • (六)库存超卖案例实战——使用mysql分布式锁解决“超卖”问题
  • (十一)手动添加用户和文件的特殊权限
  • (心得)获取一个数二进制序列中所有的偶数位和奇数位, 分别输出二进制序列。
  • .net 反编译_.net反编译的相关问题
  • @ModelAttribute注解使用
  • @transactional 方法执行完再commit_当@Transactional遇到@CacheEvict,你的代码是不是有bug!...
  • @取消转义
  • [ 常用工具篇 ] POC-bomber 漏洞检测工具安装及使用详解
  • [BZOJ] 2006: [NOI2010]超级钢琴
  • [BZOJ1877][SDOI2009]晨跑[最大流+费用流]
  • [C++]:for循环for(int num : nums)
  • [CareerCup] 12.3 Test Move Method in a Chess Game 测试象棋游戏中的移动方法
  • [CareerCup] 2.1 Remove Duplicates from Unsorted List 移除无序链表中的重复项