Linux中实现线程池
为了实现高并发服务器,试验了epoll+线程池的方式进行并发处理,效果并不理想。并发量主要受到线程池数目的约束,但是将线程池的实现记录如下:
参考网上相关文章,实现了线程池,方便以后使用。
/*threadpool.h*/
#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<unistd.h>
#include<assert.h>
/*线程池会维护一个任务链表,每个thread_worker接头结构就是一个任务*/
typedef struct worker{
/*回调函数,任务运行时会调用此函数*/
void (*process)(void *arg);
void *arg; /*回调参数*/
struct worker *next;
}thread_worker;
/*线程池结构*/
typedef struct
{
pthread_mutex_t queue_lock;
pthread_cond_t queue_ready;
/*链表结构*/
thread_worker *queue_head;
/*是否销毁线程池*/
int tp_stop;
//线程数组
pthread_t *threadid;
/*线程池中允许的活动线程数目*/
int max_thread_num;
/*当前等待队列的任务数目*/
int cur_queue_size;
}thread_pool;
//thread_pool *pool=NULL;
void pool_init(int max_thread_num);
/*向线程池的任务链表中加入一个任务*/
int pool_add_worker(void *(*process)(void *arg),void *arg);
int pool_destroy();
#endif
/*threadpool.c*/
#include "threadpool.h"
thread_pool *pool=NULL;
static void *thread_routine(void *arg);
void
pool_init(int max_thread_num){
pool=(thread_pool *)malloc(sizeof(thread_pool));
pthread_mutex_init(&(pool->queue_lock),NULL);
pthread_cond_init(&(pool->queue_ready),NULL);
pool->queue_head=NULL;
pool->max_thread_num=max_thread_num;
pool->cur_queue_size=0;
pool->tp_stop=0;
pool->threadid=(pthread_t *)malloc(max_thread_num *sizeof(pthread_t));
int i=0;
for(i=0;i<max_thread_num;i++){
pthread_create(&(pool->threadid[i]),NULL,thread_routine,NULL);
}
}
/*向线程池中加入任务*/
int pool_add_worker(void *(*process)(void *arg),void *arg){
/*构造一个新任务*/
thread_worker *newworker=
(thread_worker *)malloc(sizeof(thread_worker));
newworker->process=(void *)&process;
newworker->arg=arg;
newworker->next=NULL;
pthread_mutex_lock(&(pool->queue_lock));
/*将任务加入等待队列中*/
thread_worker *member=pool->queue_head;
if(member!=NULL){
while(member->next!=NULL)
member=member->next;
member->next=newworker;
}else{
pool->queue_head=newworker;
}
assert(pool->queue_head!=NULL);
pool->cur_queue_size++;
pthread_mutex_unlock(&(pool->queue_lock));
/*此时等待队列中有任务了,唤醒一个等待线程*/
pthread_cond_signal(&(pool->queue_ready));
return 0;
}
/*销毁线程池,等待队列中的任务不会再执行,
* 但是正在运行的线程会一直把任务运行完再退出*/
int pool_destroy(){
if(pool->tp_stop)
return -1;/*防止两次调用*/
pool->tp_stop=1;
/*唤醒所有等待线程,线程池要销毁了*/
pthread_cond_broadcast(&(pool->queue_ready));
/*阻塞等待线程退出,否则就成僵尸线程了*/
int i;
for(i=0;i<pool->max_thread_num;i++){
pthread_join(pool->threadid[i],NULL);
}
free(pool->threadid);
/*销毁等待队列*/
thread_worker *head=NULL;
while(pool->queue_head!=NULL){
head=pool->queue_head;
pool->queue_head=pool->queue_head->next;
free(head);
}
/*条件变量和互斥变量也需要销毁*/
pthread_mutex_destroy(&(pool->queue_lock));
pthread_cond_destroy(&(pool->queue_ready));
free(pool);
pool=NULL;
return 0;
}
static void *
thread_routine(void *arg){
printf("starting thread 0x%x\n",pthread_self());
while(1){
pthread_mutex_lock(&(pool->queue_lock));
/*如果等待队列为0,并且不销毁线程池,则处于阻塞状态
* */
while(pool->cur_queue_size==0 && !pool->tp_stop){
printf("thread 0x%x is waiting\n",pthread_self());
pthread_cond_wait(&(pool->queue_ready),&(pool->queue_lock));
}
/*如果线程池要销毁了*/
if(pool->tp_stop){
/*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/
pthread_mutex_unlock(&(pool->queue_lock));
printf("thread 0x%x will exit\n",pthread_self());
pthread_exit(NULL);
}
printf ("thread 0x%x is starting to work\n", pthread_self());
assert(pool->cur_queue_size!=0);
assert(pool->queue_head!=NULL);
/*等待队列长度减去1,并却出链表中的头元素*/
pool->cur_queue_size--;
thread_worker *work=pool->queue_head;
pool->queue_head=work->next;
pthread_mutex_unlock(&(pool->queue_lock));
/*调用回调函数,执行任务*/
work->process(work->arg);
free(work);
work=NULL;
}
/*这一句话应该是不可达的*/
pthread_exit(NULL);
}
相关文章:
Linux--线程编程
http://www.cnblogs.com/forstudy/archive/2012/04/05/2433853.html