当前位置: 首页 > news >正文

Dubbo线程池

前言

    Dubbo使用Netty作为网络调用框架,Netty是一个Reactor模型的框架,线程模型分为boss线程池和worker线程池,boss线程池负责监听、分配事件,worker线程池负责处理事件,简单说就是boss线程池负责hold请求,并分发到worker池,worker线程池负责处理具体事件。

    dubbo在原本的netty中的线程(boss线程和worker)做了一些修改,将其定义为io线程,而后由实现了一套用于处理业务的业务线程池,这就和上一篇介绍的Dubbo协议下的服务端线程模型产生了关联,dubbo的io线程监听请求,业务处理由dubbo自定义的线程池处理,这里将请求分发到具体的业务线程池就是由Dispatcher实现的,默认是AllDispatcher,上一篇已经简单介绍了Dubbo协议的线程池的分发模型,这篇文章就介绍下Dubbo究竟自定义了哪几种线程池的实现,并且都是怎么实现的。

  • 注:Apache Dubbo版本为3.0.7

Dubbo线程池接口ThreadPool

在这里插入图片描述

    Dubbo自定义的线程池的核心接口是org.apache.dubbo.common.threadpool.ThreadPool,并且提供了四种实现分别是CachedThreadPoolFixedThreadPoolLimitedThreadPoolEagerThreadPoolThreadPool接口是SPI的,如果不指定线程池的具体实现默认是fixed,在项目中配置如下:配置线程池类型是fixed,线程数为100,线程模型是all

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

ThreadPool代码如下,接下来分别简单介绍一下四种线程池的具体实现

@SPI(value = "fixed", scope = ExtensionScope.FRAMEWORK)
public interface ThreadPool {

    /**
     * Thread pool
     *
     * @param url URL contains thread parameter
     * @return thread pool
     */
    @Adaptive({THREADPOOL_KEY})
    Executor getExecutor(URL url);

}

CachedThreadPool缓存线程池

    该线程池是缓存类型的,当空闲到一定时间时会将线程删掉,使用时再创建,具体dubbo的实现如下,代码实现很简单,就是使用JUC的ThreadPoolExecutor创建了一个缓存类型的线程池,将maximumPoolSize设置成Integer.MAX_VALUE,keepAliveTime设置成60000毫秒,队列大小设置成0,当超过任务数超过corePoolSize就会直接创建worker线程,当线程空闲60s后就会被销毁。

public class CachedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

FixedThreadPool固定线程数的线程池

    该线程池是固定线程数的线程池实现,具体实现也是使用JUC的ThreadPoolExecutor创建了一个固定线程数的线程池,通过url中配置的threads,将corePoolSize和maximumPoolSize都设置成threads的数量,并且keepAliveTime设置成0。

public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

LimitedThreadPool可伸缩线程池

    虽然叫可伸缩线程池,但是实际上只能伸不能缩,官网上说是为了突然大量的流量引起性能问题,具体实现就是将keepAliveTime设置成无限大,这样当队列满了后就会创建线程达到maximumPoolSize,新创建的这些线程因为keepAliveTime设置成无限大所以也不会销毁了。

public class LimitedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

EagerThreadPool

    Eager单词是渴望的,热切地的意思,这个线程池所实现的逻辑是,当任务数超过corePoolSize但小于maximumPoolSize时不是将新任务放到队列中,而是优先创建新的worker线程,当线程数已经达到maximumPoolSize,接下来新的任务才会放到阻塞队列中,阻塞队列满了会抛出RejectedExecutionException

    EagerThreadPool线程池就不是通过JUC的ThreadPoolExecutor实现的了,而是继承ThreadPoolExecutor自己实现一些逻辑,下面一步一步看。

  • EagerThreadPool

    Dubbo自己实现了阻塞队列TaskQueue和线程池EagerThreadPoolExecutor,从EagerThreadPool的代码中看不到该类型线程池的核心逻辑,核心逻辑是在TaskQueue代码中,这里跳过直接看TaskQueue代码。

public class EagerThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);

        // init queue and executor
        TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
        EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                threads,
                alive,
                TimeUnit.MILLISECONDS,
                taskQueue,
                new NamedInternalThreadFactory(name, true),
                new AbortPolicyWithReport(name, url));
        taskQueue.setExecutor(executor);
        return executor;
    }
}
  • TaskQueue

    Dubbo的EagerThreadPool是通过TaskQueueoffer方法实现的,逻辑就是当提交到线程池任务时,如果任务数大于corePoolSize,会将任务offerTaskQueue中,这时如果活跃的线程数大于等于线程池大小,并且当前线程数小于maximumPoolSize时就会伪装成放入到队列失败,这时线程池就会创建线程,从而实现超过corePoolSize不超过maximumPoolSize时创建worker线程而不是将任务放入到队列中。

public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = -2635853580887179627L;

    private EagerThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EagerThreadPoolExecutor exec) {
        executor = exec;
    }

    @Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }

        int currentPoolThreadSize = executor.getPoolSize();
        // have free worker. put task into queue to let the worker deal with task.
        if (executor.getActiveCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }

        // 伪装放入队列失败,让线程池创建线程
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        // currentPoolThreadSize >= max
        return super.offer(runnable);
    }

    /**
     * retry offer task
     *
     * @param o task
     * @return offer success or not
     * @throws RejectedExecutionException if executor is terminated.
     */
    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor is shutdown!");
        }
        return super.offer(o, timeout, unit);
    }
}
  • EagerThreadPoolExecutor

    当任务数大于maximumPoolSize时,线程池会抛出RejectedExecutionExceptionEagerThreadPoolExecutor捕获这个异常,并且调用TaskQueueretryOffer方法尝试放入队列,这样就实现了当线程数已经达到maximumPoolSize,接下来新的任务才会放到阻塞队列中,阻塞队列满了会抛出RejectedExecutionException,代码如下:

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    public EagerThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit, TaskQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }

        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // 重新尝试将任务放到队列中.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                throw new RejectedExecutionException(x);
            }
        }
    }
}

总结

    Dubbo实现了自定义线程池,其核心接口是ThreadPool,该接口是SPI的默认的实现是fixed,Dubbo提供了四种实现,分别是CachedThreadPoolFixedThreadPoolLimitedThreadPoolEagerThreadPool

相关文章:

  • 面试必问 | 必须了解的MySQL三大日志:binlog、redolog 和 undolog
  • 【C++】obj模型文件解析(tiny_obj_loader)
  • 数智随行 | 财务数字化转型的抓手:业务能力标准化
  • 22 C++设计模式之备忘录(Memento)模式
  • 小物体的目标检测的研究综述
  • 一文搞定Linux信号
  • 跟着MindSpore一起学习深度概率
  • 模型的动态LOD优化
  • 人工智能学习日记------KNN分类
  • Salesforce撤离中国后,谁来缓解在华跨国企业的焦虑?
  • 分布式系列精讲 分布式系统和单体系统之间到底有什么区别?
  • 什么是物联网数据采集网关?物联网数据采集网关的特点
  • 【vue3】05. 跟着官网学习vue3
  • 金九银十,阿里高级测开给面试者的十大建议
  • 使能OpenHarmony富设备产品化落地,润和软件HH-SCDAYU110通过兼容性测评
  • Google 是如何开发 Web 框架的
  • IE9 : DOM Exception: INVALID_CHARACTER_ERR (5)
  • 【mysql】环境安装、服务启动、密码设置
  • 【技术性】Search知识
  • 78. Subsets
  • android百种动画侧滑库、步骤视图、TextView效果、社交、搜房、K线图等源码
  • axios请求、和返回数据拦截,统一请求报错提示_012
  • canvas实际项目操作,包含:线条,圆形,扇形,图片绘制,图片圆角遮罩,矩形,弧形文字...
  • Debian下无root权限使用Python访问Oracle
  • Java读取Properties文件的六种方法
  • Linux各目录及每个目录的详细介绍
  • pdf文件如何在线转换为jpg图片
  • Perseus-BERT——业内性能极致优化的BERT训练方案
  • PHP那些事儿
  • Promise面试题2实现异步串行执行
  • RxJS: 简单入门
  • vagrant 添加本地 box 安装 laravel homestead
  • 大型网站性能监测、分析与优化常见问题QA
  • 关于Flux,Vuex,Redux的思考
  • 基于Javascript, Springboot的管理系统报表查询页面代码设计
  • 普通函数和构造函数的区别
  • 什么是Javascript函数节流?
  • 听说你叫Java(二)–Servlet请求
  • 小程序滚动组件,左边导航栏与右边内容联动效果实现
  • 一个6年java程序员的工作感悟,写给还在迷茫的你
  • 自定义函数
  • 你学不懂C语言,是因为不懂编写C程序的7个步骤 ...
  • ​​​​​​​sokit v1.3抓手机应用socket数据包: Socket是传输控制层协议,WebSocket是应用层协议。
  • #{}和${}的区别是什么 -- java面试
  • (C语言)共用体union的用法举例
  • (C语言版)链表(三)——实现双向链表创建、删除、插入、释放内存等简单操作...
  • (ZT)薛涌:谈贫说富
  • (独孤九剑)--文件系统
  • (二)构建dubbo分布式平台-平台功能导图
  • (附源码)springboot建达集团公司平台 毕业设计 141538
  • (附源码)springboot优课在线教学系统 毕业设计 081251
  • (切换多语言)vantUI+vue-i18n进行国际化配置及新增没有的语言包
  • (十)c52学习之旅-定时器实验
  • (学习日记)2024.02.29:UCOSIII第二节
  • **PHP二维数组遍历时同时赋值