Springboot整合Nacos实现动态线程池
又是美好的一天呀~
个人博客地址: huanghong.top
往下看看~
- 内容简介
- 代码实现
- 配置文件
- pom.xml
- bootstrap.yaml
- dynamic-threadpool.yaml
- 配置Bean
- ThreadPoolProperties
- ThreadPoolConfig
- 枚举
- QueueTypeEnum
- RejectedTypeEnum
- refresher
- 测试类
- 备注内容
内容简介
基于Springboot整合Nacos(Config)构建项目,客户端监听Nacos服务端配置变化动态修改线程池相关参数。
代码实现
配置文件
pom.xml
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<!--nacos client 2.0.4-->
<spring.boot.version>2.6.11</spring.boot.version>
<spring.cloud.alibaba.version>2021.0.4.0</spring.cloud.alibaba.version>
<spring.cloud.version>2021.0.4</spring.cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<!--spring cloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--spring cloud alibaba-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring.cloud.alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--springboot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--nacos config-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!--spring cloud bootstrap-->
<!--启动bootstrap配置文件中配置无法加载-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<!--配置注入提示问题-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<optional>true</optional>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.25</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>aliyun-releases</id>
<name>ali_repo</name>
<url>https://maven.aliyun.com/repository/public</url>
</repository>
</repositories>
<build>
<finalName>${artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.7.RELEASE</version>
</plugin>
</plugins>
</build>
bootstrap.yaml
server:
port: 17015
spring:
cloud:
nacos:
config:
server-addr: localhost:8848
extension-configs:
- dataId: dynamic-threadpool.yaml
group: DEFAULT_GROUP
#开启配置文件自动刷新
refresh: true
#下述配置需要配置到nacos中(dynamic-threadpool.yaml)
dynamic:
threadpool:
#可以动态变更(threadPoolFactory也可以)
corePoolSize: 1
maximumPoolSize: 5
keepAliveTime: 1000
handlerType: CALLER_RUNS_POLICY
#不能动态变更的配置内容
unit: MILLISECONDS
queueType: LINKED_BLOCKING_QUEUE
#队列长度
queueCapacity: 100
dynamic-threadpool.yaml
dynamic:
threadpool:
#可以动态变更(threadPoolFactory也可以)
corePoolSize: 1
maximumPoolSize: 1
keepAliveTime: 5000
handlerType: CALLER_RUNS_POLICY
#不能动态变更的配置内容
unit: MILLISECONDS
queueType: LINKED_BLOCKING_QUEUE
#队列长度
queueCapacity: 100
配置Bean
ThreadPoolProperties
/**
* @Time 2023-03-30 11:09
* Created by Huang
* className: ThreadPoolProperties
* Description: 定义线程池参数
*/
@Data
@Component
@ConfigurationProperties(prefix = "dynamic.threadpool")
public class ThreadPoolProperties {
private Integer corePoolSize = 5;
private Integer maximumPoolSize = 10;
private Long keepAliveTime = 5000L;
private TimeUnit unit = TimeUnit.MILLISECONDS;
private Integer queueCapacity = 100;
private QueueTypeEnum queueType;
private RejectedTypeEnum handlerType;
}
ThreadPoolConfig
/**
* @Time 2023-03-30 11:13
* Created by Huang
* className: ThreadPoolConfig
* Description: 手动注入线程池Bean
*/
@Configuration
public class ThreadPoolConfig {
@Bean
public ThreadPoolExecutor dynamicThreadPoolExecutor(ThreadPoolProperties threadPoolProperties) {
QueueTypeEnum queueType = threadPoolProperties.getQueueType();
BlockingQueue<Runnable> blockingQueue = QueueTypeEnum.buildBlockingQueue(queueType.getName(), threadPoolProperties.getQueueCapacity());
RejectedTypeEnum handlerType = threadPoolProperties.getHandlerType();
RejectedExecutionHandler handler = RejectedTypeEnum.buildRejectedExecutionHandler(handlerType.getName());
return new ThreadPoolExecutor(threadPoolProperties.getCorePoolSize(),
threadPoolProperties.getMaximumPoolSize(),
threadPoolProperties.getKeepAliveTime(),
threadPoolProperties.getUnit(),
blockingQueue,
Executors.defaultThreadFactory(),
handler);
}
}
枚举
QueueTypeEnum
/**
* @Time 2023-03-30 11:39
* Created by Huang
* className: QueueTypeEnum
* Description: 阻塞队列枚举类
*/
@Slf4j
public enum QueueTypeEnum {
ARRAY_BLOCKING_QUEUE("ArrayBlockingQueue"),
LINKED_BLOCKING_QUEUE("LinkedBlockingQueue"),
PRIORITY_BLOCKING_QUEUE("PriorityBlockingQueue"),
DELAY_QUEUE("DelayQueue");
private final String name;
QueueTypeEnum(String name) {
this.name = name;
}
public static BlockingQueue<Runnable> buildBlockingQueue(String name, int capacity) {
BlockingQueue<Runnable> blockingQueue = null;
if (Objects.equals(name, ARRAY_BLOCKING_QUEUE.getName())) {
blockingQueue = new ArrayBlockingQueue<>(capacity);
} else if (Objects.equals(name, LINKED_BLOCKING_QUEUE.getName())) {
blockingQueue = new LinkedBlockingQueue<>(capacity);
} else if (Objects.equals(name, PRIORITY_BLOCKING_QUEUE.getName())) {
blockingQueue = new PriorityBlockingQueue<>(capacity);
} else if (Objects.equals(name, DELAY_QUEUE.getName())) {
blockingQueue = new DelayQueue();
}
if (blockingQueue != null) {
return blockingQueue;
}
log.warn("Cannot find specified BlockingQueue {}", name);
log.warn("use default BlockingQueue {LinkedBlockingQueue}");
return new LinkedBlockingQueue<>(capacity);
}
public String getName() {
return name;
}
}
RejectedTypeEnum
/**
* @Time 2023-03-30 11:39
* Created by Huang
* className: QueueTypeEnum
* Description: 拒绝策略枚举类
*/
@Slf4j
public enum QueueTypeEnum {
ARRAY_BLOCKING_QUEUE("ArrayBlockingQueue"),
LINKED_BLOCKING_QUEUE("LinkedBlockingQueue"),
PRIORITY_BLOCKING_QUEUE("PriorityBlockingQueue"),
DELAY_QUEUE("DelayQueue");
private final String name;
QueueTypeEnum(String name) {
this.name = name;
}
public static BlockingQueue<Runnable> buildBlockingQueue(String name, int capacity) {
BlockingQueue<Runnable> blockingQueue = null;
if (Objects.equals(name, ARRAY_BLOCKING_QUEUE.getName())) {
blockingQueue = new ArrayBlockingQueue<>(capacity);
} else if (Objects.equals(name, LINKED_BLOCKING_QUEUE.getName())) {
blockingQueue = new LinkedBlockingQueue<>(capacity);
} else if (Objects.equals(name, PRIORITY_BLOCKING_QUEUE.getName())) {
blockingQueue = new PriorityBlockingQueue<>(capacity);
} else if (Objects.equals(name, DELAY_QUEUE.getName())) {
blockingQueue = new DelayQueue();
}
if (blockingQueue != null) {
return blockingQueue;
}
log.warn("Cannot find specified BlockingQueue {}", name);
log.warn("use default BlockingQueue {LinkedBlockingQueue}");
return new LinkedBlockingQueue<>(capacity);
}
public String getName() {
return name;
}
}
refresher
/**
* @Time 2023-03-30 11:23
* Created by Huang
* className: NacosRefresher
* Description: 监听RefreshScopeRefreshedEvent事件更新线程池参数
*/
@Component
public class NacosRefresher implements SmartApplicationListener {
@Autowired
private ThreadPoolProperties threadPoolProperties;
@Autowired
ThreadPoolExecutor dynamicThreadPoolExecutor;
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return RefreshScopeRefreshedEvent.class.isAssignableFrom(eventType);
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
if(event instanceof RefreshScopeRefreshedEvent){
refreshThreadPoolProperties(threadPoolProperties);
}
}
private void refreshThreadPoolProperties(ThreadPoolProperties threadPoolProperties) {
//阻塞队列为final定义不能动态变更
RejectedTypeEnum handlerType = threadPoolProperties.getHandlerType();
RejectedExecutionHandler handler = RejectedTypeEnum.buildRejectedExecutionHandler(handlerType.getName());
dynamicThreadPoolExecutor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
dynamicThreadPoolExecutor.setMaximumPoolSize(threadPoolProperties.getMaximumPoolSize());
dynamicThreadPoolExecutor.setKeepAliveTime(threadPoolProperties.getKeepAliveTime(), threadPoolProperties.getUnit());
dynamicThreadPoolExecutor.setRejectedExecutionHandler(handler);
}
}
测试类
/**
* @Time 2023-03-30 12:32
* Created by Huang
* className: ThreadPoolController
* Description:
*/
@Slf4j
@RestController
public class ThreadPoolController {
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
@GetMapping("/")
public JSONObject dynamicThreadPoolTest() {
int corePoolSize = threadPoolExecutor.getCorePoolSize();
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
long keepAliveTime = threadPoolExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS);
BlockingQueue<Runnable> blockingQueue = threadPoolExecutor.getQueue();
RejectedExecutionHandler handler = threadPoolExecutor.getRejectedExecutionHandler();
long taskCount = threadPoolExecutor.getTaskCount();
int poolSize = threadPoolExecutor.getPoolSize();
return new JSONObject() {{
put("corePoolSize", corePoolSize);
put("maximumPoolSize", maximumPoolSize);
put("keepAliveTime", keepAliveTime);
put("blockingQueue", blockingQueue.toString());
put("handler", handler.toString());
put("taskCount", taskCount);
put("poolSize", poolSize);
}};
}
@GetMapping("/submitTask/{content}")
public JSONObject submitTask(@PathVariable String content) {
threadPoolExecutor.execute(() -> {
while (true) {
log.info("thread: {} content: {}", Thread.currentThread().getName(), content);
try {
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
long taskCount = threadPoolExecutor.getTaskCount();
int poolSize = threadPoolExecutor.getPoolSize();
return new JSONObject(){{
put("taskCount",taskCount);
put("poolSize",poolSize);
put("taskContent",content);
}};
}
}
备注内容
- 线程池中线程工厂ThreadFactory可动态变更;
- 如果执行任务过程中开启线程数量大于变更后的线程最大数量,线程池的执行任务数量依然为更改前的数量,但相关参数已完成变更;
- 阻塞队列、拒绝策略及其他相关线程池参数均可在上述代码基础上进行扩展
感谢阅读完本篇文章!!!
个人博客地址: huanghong.top