当前位置: 首页 > news >正文

nginx源码分析——线程池

源码: nginx 1.13.0-release
 
一、前言
     nginx是采用多进程模型,master和worker之间主要通过pipe管道的方式进行通信,多进程的优势就在于各个进程互不影响。但是经常会有人问道,nginx为什么不采用多线程模型(这个除了之前一篇文章讲到的情况,别的只有去问作者了,HAHA)。其实,nginx代码中提供了一个thread_pool(线程池)的核心模块来处理多任务的。下面就本人对该thread_pool这个模块的理解来跟大家做些分享(文中错误、不足还请大家指出,谢谢)
 
二、thread_pool线程池模块介绍
     nginx的主要功能都是由一个个模块构成的,thread_pool也不例外。线程池主要用于读取、发送文件等IO操作,避免慢速IO影响worker的正常运行。先引用一段官方的配置示例
Syntax: thread_pool name threads=number [max_queue=number];
Default: thread_pool default threads=32 max_queue=65536;
Context: main
     根据上述的配置说明,thread_pool是有名字的,上面的线程数目以及队列大小都是指每个worker进程中的线程,而不是所有worker中线程的总数。一个线程池中所有的线程共享一个队列,队列中的最大人数数量为上面定义的max_queue,如果队列满了的话,再往队列中添加任务就会报错。
 
     根据之前讲到过的模块初始化流程( 在master启动worker之前) create_conf--> command_set函数-->init_conf,下面就按照这个流程看看thread_pool模块的初始化
/*******************  nginx/src/core/ngx_thread_pool.c  ************************/
//创建线程池所需的基础结构
static void * ngx_thread_pool_create_conf(ngx_cycle_t *cycle)
{
    ngx_thread_pool_conf_t  *tcf;
     //从cycle->pool指向的内存池中申请一块内存
    tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t));
    if (tcf == NULL) {
        return NULL;
    }
     
     //先申请包含4个ngx_thread_pool_t指针类型元素的数组
     //ngx_thread_pool_t结构体中保存了一个线程池相关的信息
    if (ngx_array_init(&tcf->pools, cycle->pool, 4,
                       sizeof(ngx_thread_pool_t *))
        != NGX_OK)
    {
        return NULL;
    }
 
    return tcf;
}
 
//解析处理配置文件中thread_pool的配置,并将相关信息保存的ngx_thread_pool_t中
static char *  ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_str_t          *value;
    ngx_uint_t          i;
    ngx_thread_pool_t  *tp;
 
    value = cf->args->elts;
 
    //根据thread_pool配置中的name作为线程池的唯一标识(如果重名,只有第一个有效)
    //申请ngx_thread_pool_t结构保存线程池的相关信息
    //由此可见,nginx支持配置多个name不同的线程池
    tp = ngx_thread_pool_add(cf, &value[1]);
    .......
    //处理thread_pool配置行的所有元素
    for (i = 2; i < cf->args->nelts; i++) {
        //检查配置的线程数
        if (ngx_strncmp(value[i].data, "threads=", 8) == 0) {
         .......
        }
        
        //检查配置的最大队列长度
        if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {
         .......
        }
    }
    ......
}
 
//判断包含多个线程池的数组中的各个线程池的配置是否正确
static char * ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf)
{
    ....
    ngx_thread_pool_t  **tpp;
 
    tpp = tcf->pools.elts;
    //遍历数组中所有的线程池配置,并检查其正确性
    for (i = 0; i < tcf->pools.nelts; i++) {
        .....
    }
 
    return NGX_CONF_OK;
}
 
     在上述的流程走完之后,nginx的master就保存了一份所有线程池的配置(tcf->pools),这份配置在创建worker时也会被继承。然后每个worker中都调用各个核心模块的init_process函数(如果有的话)。
/*******************  nginx/src/core/ngx_thread_pool.c  ************************/
//创建线程池所需的基础结构
static ngx_int_t
ngx_thread_pool_init_worker(ngx_cycle_t *cycle)
{
    ngx_uint_t                i;
    ngx_thread_pool_t       **tpp;
    ngx_thread_pool_conf_t   *tcf;
    //如果不是worker或者只有一个worker就不起用线程池
    if (ngx_process != NGX_PROCESS_WORKER
        && ngx_process != NGX_PROCESS_SINGLE)
    {
        return NGX_OK;
    }
     
    //初始化任务队列
    ngx_thread_pool_queue_init(&ngx_thread_pool_done);
 
    tpp = tcf->pools.elts;
    for (i = 0; i < tcf->pools.nelts; i++) {
        //初始化各个线程池
        if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {
            return NGX_ERROR;
        }
    }
 
    return NGX_OK;
}
 
//线程池初始化
static ngx_int_t  ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
{
    .....
    //初始化任务队列
    ngx_thread_pool_queue_init(&tp->queue);
 
    //创建线程锁
    if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
        return NGX_ERROR;
    }
 
    //创建线程条件变量
    if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
        (void) ngx_thread_mutex_destroy(&tp->mtx, log);
        return NGX_ERROR;
    }
    ......
    for (n = 0; n < tp->threads; n++) {
        //创建线程池中的每个线程
        err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
        if (err) {
            ngx_log_error(NGX_LOG_ALERT, log, err,
                          "pthread_create() failed");
            return NGX_ERROR;
        }
    }
    ......
}
 
//线程池中线程处理主函数
static void *ngx_thread_pool_cycle(void *data)
{
     ......
     for ( ;; ) {
        //阻塞的方式获取线程锁
        if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
            return NULL;
        }
 
        /* the number may become negative */
        tp->waiting--;
 
        //如果任务队列为空,就cond_wait阻塞等待有新任务时调用cond_signal/broadcast触发
        while (tp->queue.first == NULL) {
            if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
                != NGX_OK)
            {
                (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
                return NULL;
            }
        }
        //从任务队列中获取task,并将其从队列中移除
        task = tp->queue.first;
        tp->queue.first = task->next;
 
        if (tp->queue.first == NULL) {
            tp->queue.last = &tp->queue.first;
        }
 
        if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
            return NULL;
        }
        ......
        //task的处理函数
        task->handler(task->ctx, tp->log);
        .....
 
        ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
 
        //将经过预处理的任务添加到done队列中等待调用event的回调函数继续处理
        *ngx_thread_pool_done.last = task;
        ngx_thread_pool_done.last = &task->next;
        
        //防止编译器优化,保证解锁操作是在上述语句执行完毕后再去执行的
        ngx_memory_barrier();
 
        ngx_unlock(&ngx_thread_pool_done_lock);
        
        (void) ngx_notify(ngx_thread_pool_handler);
    }
}
 
//处理pool_done队列上task中包含的每个event事件
static void  ngx_thread_pool_handler(ngx_event_t *ev)
{
    .....
    ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048); 
 
    //获取任务链表的头部
    task = ngx_thread_pool_done.first;
    ngx_thread_pool_done.first = NULL;
    ngx_thread_pool_done.last = &ngx_thread_pool_done.first;
 
    ngx_memory_barrier();
 
    ngx_unlock(&ngx_thread_pool_done_lock);
 
    while (task) {
        ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
                       "run completion handler for task #%ui", task->id);
        //遍历队列中的所有任务事件
        event = &task->event;
        task = task->next;
 
        event->complete = 1;
        event->active = 0;
 
        //调用event对应的处理函数有针对性的进行处理
        event->handler(event);
    }
}
 
三、thread_pool线程池使用示例
     根据之前所讲到的,nginx中的线程池主要是用于操作文件的IO操作。所以,在nginx中自带的模块ngx_http_file_cache.c文件中看到了线程池的使用。
/*********************** nginx/src/os/unix/ngx_files.c  **********************/
//file_cache模块的处理函数(涉及到了线程池)
static ssize_t  ngx_http_file_cache_aio_read(ngx_http_request_t *r, ngx_http_cache_t *c)
{
    .......
#if (NGX_THREADS)
 
    if (clcf->aio == NGX_HTTP_AIO_THREADS) {
        c->file.thread_task = c->thread_task;
        //这里注册的函数在下面语句中的ngx_thread_read函数中被调用
        c->file.thread_handler = ngx_http_cache_thread_handler;
        c->file.thread_ctx = r;
        //根据任务的属性,选择正确的线程池,并初始化task结构体中的各个成员        
        n = ngx_thread_read(&c->file, c->buf->pos, c->body_start, 0, r->pool);
 
        c->thread_task = c->file.thread_task;
        c->reading = (n == NGX_AGAIN);
 
        return n;
    }
#endif
 
    return ngx_read_file(&c->file, c->buf->pos, c->body_start, 0);
}
 
 
//task任务的处理函数
static ngx_int_t  ngx_http_cache_thread_handler(ngx_thread_task_t *task, ngx_file_t *file)
{
    .......
    tp = clcf->thread_pool;
    .......
    
    task->event.data = r;
    //注册thread_event_handler函数,该函数在处理pool_done队列中event事件时被调用
    task->event.handler = ngx_http_cache_thread_event_handler;
 
    //将任务放到线程池的任务队列中
    if (ngx_thread_task_post(tp, task) != NGX_OK) {
        return NGX_ERROR;
    }
    ......
}
 
/*********************** nginx/src/core/ngx_thread_pool.c  **********************/
//添加任务到队列中
ngx_int_t  ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
{
    //如果当前的任务正在处理就退出
    if (task->event.active) {
        ngx_log_error(NGX_LOG_ALERT, tp->log, 0,
                      "task #%ui already active", task->id);
        return NGX_ERROR;
    }
 
    if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
        return NGX_ERROR;
    }
    
    //判断当前线程池等待的任务数量与最大队列长度的关系
    if (tp->waiting >= tp->max_queue) {
        (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
 
        ngx_log_error(NGX_LOG_ERR, tp->log, 0,
                      "thread pool \"%V\" queue overflow: %i tasks waiting",
                      &tp->name, tp->waiting);
        return NGX_ERROR;
    }
    //激活任务
    task->event.active = 1;
 
    task->id = ngx_thread_pool_task_id++;
    task->next = NULL;
     
    //通知阻塞的线程有新事件加入,可以解除阻塞
    if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
        (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
        return NGX_ERROR;
    }
 
    *tp->queue.last = task;
    tp->queue.last = &task->next;
 
    tp->waiting++;
 
    (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
 
    ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
                   "task #%ui added to thread pool \"%V\"",
                   task->id, &tp->name);
 
    return NGX_OK;
}
 
    上面示例基本展示了nginx目前对线程池的使用方法,采用线程池来处理IO这类慢速操作可以提升worker的主线程的执行效率。当然,用户自己在开发模块时,也可以参照file_cache模块中使用线程池的方法来调用多线程提升程序性能。( 欢迎大家多多批评指正)
 

转载于:https://www.cnblogs.com/sxhlinux/p/6906490.html

相关文章:

  • 禁止选中文本JS
  • 【书签】一个leading, mobile-friendly的交互地图js lib
  • 图片轮播的手写代码
  • 入口文件 bootstrap / app.php
  • 深入Oracle的left join中on和where的区别详解
  • 怎样让百度快速收录的新方法
  • 把两个字段的和作为新的字段
  • vagrant
  • 为女票写的计算工作时间的SQL
  • bug优先级别
  • 分析器错误消息: 该配置节不能包含 CDATA 或文本元素。
  • Anroid开发中常用快捷键
  • linux学习 – shell脚本
  • Web性能测试工具推荐
  • openstack一键安装(最新版)
  • [ 一起学React系列 -- 8 ] React中的文件上传
  • 《剑指offer》分解让复杂问题更简单
  • 【407天】跃迁之路——程序员高效学习方法论探索系列(实验阶段164-2018.03.19)...
  • Akka系列(七):Actor持久化之Akka persistence
  • el-input获取焦点 input输入框为空时高亮 el-input值非法时
  • JS实现简单的MVC模式开发小游戏
  • js学习笔记
  • MYSQL 的 IF 函数
  • php ci框架整合银盛支付
  • 基于Vue2全家桶的移动端AppDEMO实现
  • 前端临床手札——文件上传
  • 深度学习中的信息论知识详解
  • 数据结构java版之冒泡排序及优化
  • 怎么把视频里的音乐提取出来
  • Spring第一个helloWorld
  • 好程序员web前端教程分享CSS不同元素margin的计算 ...
  • 你学不懂C语言,是因为不懂编写C程序的7个步骤 ...
  • ​如何防止网络攻击?
  • #【QT 5 调试软件后,发布相关:软件生成exe文件 + 文件打包】
  • (03)光刻——半导体电路的绘制
  • (09)Hive——CTE 公共表达式
  • (10)Linux冯诺依曼结构操作系统的再次理解
  • (14)Hive调优——合并小文件
  • (33)STM32——485实验笔记
  • (WSI分类)WSI分类文献小综述 2024
  • (八)光盘的挂载与解挂、挂载CentOS镜像、rpm安装软件详细学习笔记
  • (板子)A* astar算法,AcWing第k短路+八数码 带注释
  • (二十四)Flask之flask-session组件
  • (附源码)springboot 校园学生兼职系统 毕业设计 742122
  • (六) ES6 新特性 —— 迭代器(iterator)
  • (免费领源码)Python#MySQL图书馆管理系统071718-计算机毕业设计项目选题推荐
  • (三分钟)速览传统边缘检测算子
  • (正则)提取页面里的img标签
  • (转)Android中使用ormlite实现持久化(一)--HelloOrmLite
  • (转)eclipse内存溢出设置 -Xms212m -Xmx804m -XX:PermSize=250M -XX:MaxPermSize=356m
  • (转)IOS中获取各种文件的目录路径的方法
  • *p++,*(p++),*++p,(*p)++区别?
  • .Net6 Api Swagger配置
  • .NetCore Flurl.Http 升级到4.0后 https 无法建立SSL连接
  • .NET简谈互操作(五:基础知识之Dynamic平台调用)