【多线程编程】Linux下线程池C实现
参考自:http://blog.csdn.net/zouxinfox/article/details/3560891(邹鑫的专栏《一个Linux下C线程池的实现》)线程池是为了弥补频繁创建及销毁线程开销太大这个缺点,特别是当一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的时候了。小插曲:之前看书
·
参考自:http://blog.csdn.net/zouxinfox/article/details/3560891(邹鑫的专栏《一个Linux下C线程池的实现》)
线程池是为了弥补频繁创建及销毁线程开销太大这个缺点,特别是当一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的时候了。
小插曲:
之前看书说C++里面的结构体和类除了默认权限由 public 变成了private,没有其他区别。结果我还想当然以为C语言里面也是这样的。结果。。。
C的结构体和C++的结构体的不同之处:在C的结构体只能自定义数据类型,而不允许有函数,而C++中的结构体可以加入成员函数。
这里,我的线程池使用C的结构体实现。
线程池的实现关键在于线程池结构体以及相关的函数,代码及详细注释如下:
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
typedef void (*Fun) (void *arg);
//Arg,用于保存执行函数的参数
struct Arg
{
int *data;
int rowNum;
int colNum;
int index;
};
//task,保存任务的执行函数以及参数。next指针用于task链表
struct task
{
//任务函数及参数
Fun myfun;
void *arg;
struct task *next;
};
//thread_pool
struct thread_pool
{
pthread_mutex_t mutex;
pthread_cond_t cond;
//任务链表
struct task* taskHead;
//是否销毁线程池
bool isClose;
//线程池的线程数量
int threadNum;
//各个线程的线程ID
pthread_t *threadId;
};
void pool_init(int _threadNum); //初始线程池
void pool_add_task(Fun _myfun, void *_arg); //向任务链表添加任务,并调用pthread_cond_signal()唤醒某一个被挂起的线程
void pool_destroy(); //销毁线程池
void* thread_run(void *_arg); //pthread_create函数所调用的函数,很关键,用于管理各线程的实际运行
static struct thread_pool *pool = NULL;
void pool_init(int _threadNum)
{
pool = (struct thread_pool*) malloc(sizeof(struct thread_pool));
assert(pool != NULL);
pthread_mutex_init(&(pool -> mutex), NULL);
pthread_cond_init(&(pool -> cond), NULL);
pool -> taskHead = NULL;
pool -> isClose = false;
pool -> threadNum = _threadNum;
pool -> threadId = (pthread_t *) malloc(sizeof(pthread_t) * pool -> threadNum);
int i;
for(i = 0; i < pool -> threadNum; ++i)
pthread_create(&(pool -> threadId[i]), NULL, thread_run, NULL);
}
void pool_add_task(Fun _myfun, void *_arg)
{
//构造一个新任务
struct task* newTask = (struct task*) malloc(sizeof(struct task));
newTask -> myfun = _myfun;
newTask -> arg = _arg;
newTask -> next = NULL; //别忘置空
//将任务加到任务链表中
pthread_mutex_lock(&(pool -> mutex));
struct task* head = pool -> taskHead;
if(head == NULL)
pool -> taskHead = newTask;
else
{
while(head -> next != NULL)
head = head -> next;
head -> next = newTask;
}
pthread_mutex_unlock(&(pool -> mutex));
pthread_cond_signal(&(pool -> cond));
}
void pool_destroy()
{
if(pool -> isClose == true) //防止多次调用该函数
return;
pool -> isClose = true;
//唤醒所有等待线程,然后销毁线程池
pthread_cond_broadcast(&(pool -> cond));
//回收线程
int i;
for(i = 0; i < pool -> threadNum; ++i)
pthread_join(pool -> threadId[i], NULL);
free(pool -> threadId);
//销毁任务链表
struct task* tmpTask;
while(pool -> taskHead != NULL)
{
tmpTask = pool -> taskHead;
pool -> taskHead = pool -> taskHead -> next;
free(tmpTask);
}
//销毁条件变量与互斥量
pthread_mutex_destroy(&(pool -> mutex));
pthread_cond_destroy(&(pool -> cond));
free(pool);
//释放内存后将指针置空
pool = NULL;
}
void* thread_run(void *_arg)
{
printf("thread %d is ready\n", pthread_self());
struct task *curTask;
while(1)
{
pthread_mutex_lock(&(pool -> mutex));
if(pool -> isClose == false && pool -> taskHead == NULL)
{
printf("thread %d is waiting\n", pthread_self());
pthread_cond_wait(&(pool -> cond), &(pool -> mutex));
}
if(pool -> isClose == true && pool -> taskHead == NULL) //销毁线程池时保证任务链表已空
{
pthread_mutex_unlock(&(pool -> mutex));
printf("thread %d is over\n", pthread_self());
pthread_exit(NULL);
}
printf("thread %d is going to work\n", pthread_self());
assert(pool -> taskHead != NULL);
curTask = pool -> taskHead;
pool -> taskHead = pool -> taskHead -> next;
pthread_mutex_unlock(&(pool -> mutex));
//执行任务函数
(curTask -> myfun)(curTask -> arg);
free(curTask);
curTask = NULL;
}
}
void fun(void *arg)
{
int i, j;
struct Arg *myArg = (struct Arg *) arg;
printf("thread %d is working for row %d\n", pthread_self(), myArg -> index);
for(i = myArg -> rowNum * myArg -> index, j = 0; j < myArg -> rowNum; ++j, ++i)
myArg -> data[i] = myArg -> index;
sleep(1); //延长任务的执行时间
return;
}
int main()
{
int rowNum = 10, colNum = 10;
int i, j;
int *data = (int *) malloc(sizeof(int) * rowNum * colNum);
memset(data, 0, sizeof(int) * rowNum * colNum);
pool_init(3);
//保证线程池已经生成
sleep(3);
struct Arg arg[rowNum];
for(i = 0; i < rowNum; ++i)
{
arg[i].data = data;
arg[i].rowNum = rowNum;
arg[i].colNum = colNum;
arg[i].index = i;
pool_add_task(fun, (void *)&arg[i]);
}
//保证所有任务已经完成
sleep(3);
pool_destroy();
for(i = 0; i < rowNum; ++i)
{
for(j = 0; j < colNum; ++j)
printf("%d ", data[i * rowNum + j]);
printf("\n");
}
free(data);
return 0;
}
更多推荐
已为社区贡献1条内容
所有评论(0)