参考自:http://blog.csdn.net/zouxinfox/article/details/3560891(邹鑫的专栏《一个Linux下C线程池的实现》)


线程池是为了弥补频繁创建及销毁线程开销太大这个缺点,特别是当一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的时候了。


小插曲:

之前看书说C++里面的结构体和类除了默认权限由 public 变成了private,没有其他区别。结果我还想当然以为C语言里面也是这样的。结果。。。

C的结构体和C++的结构体的不同之处:在C的结构体只能自定义数据类型,而不允许有函数,而C++中的结构体可以加入成员函数。


这里,我的线程池使用C的结构体实现。


线程池的实现关键在于线程池结构体以及相关的函数,代码及详细注释如下:

#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
typedef void (*Fun) (void *arg);
//Arg,用于保存执行函数的参数
struct Arg
{
	int *data;
	int rowNum;
	int colNum;
	int index;
};
//task,保存任务的执行函数以及参数。next指针用于task链表
struct task
{
	//任务函数及参数
	Fun myfun;
	void *arg;
	struct task *next;	
};
//thread_pool
struct thread_pool
{
	pthread_mutex_t mutex;
	pthread_cond_t cond;
	//任务链表
	struct task* taskHead;
	//是否销毁线程池
	bool isClose;
	//线程池的线程数量
	int threadNum;
	//各个线程的线程ID
	pthread_t *threadId;
};

void pool_init(int _threadNum);			//初始线程池
void pool_add_task(Fun _myfun, void *_arg);	//向任务链表添加任务,并调用pthread_cond_signal()唤醒某一个被挂起的线程
void pool_destroy();				//销毁线程池
void* thread_run(void *_arg);			//pthread_create函数所调用的函数,很关键,用于管理各线程的实际运行

static struct thread_pool *pool = NULL;

void pool_init(int _threadNum)
{
	pool = (struct thread_pool*) malloc(sizeof(struct thread_pool));
	assert(pool != NULL);

	pthread_mutex_init(&(pool -> mutex), NULL);
	pthread_cond_init(&(pool -> cond), NULL);
	pool -> taskHead = NULL;
	pool -> isClose = false;
	pool -> threadNum = _threadNum;
	pool -> threadId = (pthread_t *) malloc(sizeof(pthread_t) * pool -> threadNum);
	int i;
	for(i = 0; i < pool -> threadNum; ++i)
		pthread_create(&(pool -> threadId[i]), NULL, thread_run, NULL);
}
	
void pool_add_task(Fun _myfun, void *_arg)
{
	//构造一个新任务
	struct task* newTask = (struct task*) malloc(sizeof(struct task));
	newTask -> myfun = _myfun;
	newTask -> arg = _arg;
	newTask -> next = NULL;			//别忘置空
	
	//将任务加到任务链表中
	pthread_mutex_lock(&(pool -> mutex));
	struct task* head = pool -> taskHead;
	if(head == NULL)
		pool -> taskHead = newTask;
	else
	{
		while(head -> next != NULL)
			head = head -> next;
		head -> next = newTask;
	}
	pthread_mutex_unlock(&(pool -> mutex));
	pthread_cond_signal(&(pool -> cond));
}

void pool_destroy()
{
	if(pool -> isClose == true)		//防止多次调用该函数
		return;
	pool -> isClose = true;
	//唤醒所有等待线程,然后销毁线程池
	pthread_cond_broadcast(&(pool -> cond));
	
	//回收线程
	int i;
	for(i = 0; i < pool -> threadNum; ++i)
		pthread_join(pool -> threadId[i], NULL);
	free(pool -> threadId);

	//销毁任务链表
	struct task* tmpTask;
	while(pool -> taskHead != NULL)
	{
		tmpTask = pool -> taskHead;
		pool -> taskHead = pool -> taskHead -> next;
		free(tmpTask);
	}

	//销毁条件变量与互斥量
	pthread_mutex_destroy(&(pool -> mutex));
	pthread_cond_destroy(&(pool -> cond));

	free(pool);
	//释放内存后将指针置空
	pool = NULL;
}

void* thread_run(void *_arg)
{
	printf("thread %d is ready\n", pthread_self());
	struct task *curTask;
	while(1)
	{	
		pthread_mutex_lock(&(pool -> mutex));
		if(pool -> isClose == false && pool -> taskHead == NULL)
		{
			printf("thread %d is waiting\n", pthread_self());
			pthread_cond_wait(&(pool -> cond), &(pool -> mutex));
		}
		if(pool -> isClose == true && pool -> taskHead == NULL)		//销毁线程池时保证任务链表已空
		{
			pthread_mutex_unlock(&(pool -> mutex));
			printf("thread %d is over\n", pthread_self());
			pthread_exit(NULL);
		}
		printf("thread %d is going to work\n", pthread_self());
		assert(pool -> taskHead != NULL);

		curTask = pool -> taskHead;
		pool -> taskHead = pool -> taskHead -> next;
		pthread_mutex_unlock(&(pool -> mutex));
		//执行任务函数
		(curTask -> myfun)(curTask -> arg);
		free(curTask);
		curTask = NULL;
	}
}

void fun(void *arg)
{
	int i, j;
	struct Arg *myArg = (struct Arg *) arg;
	printf("thread %d is working for row %d\n", pthread_self(), myArg -> index);
	for(i = myArg -> rowNum * myArg -> index, j = 0; j < myArg -> rowNum; ++j, ++i)
		myArg -> data[i] = myArg -> index;
	sleep(1);				//延长任务的执行时间
	return;
}

int main()
{
	int rowNum = 10, colNum = 10;
	int i, j;
	int *data = (int *) malloc(sizeof(int) * rowNum * colNum);
	memset(data, 0, sizeof(int) * rowNum * colNum);
	pool_init(3);
	//保证线程池已经生成
	sleep(3);
	struct Arg arg[rowNum];
	for(i = 0; i < rowNum; ++i)
	{
		arg[i].data = data;
		arg[i].rowNum = rowNum;
		arg[i].colNum = colNum;
		arg[i].index = i;
		pool_add_task(fun, (void *)&arg[i]);
	}
	//保证所有任务已经完成
	sleep(3);
	pool_destroy();
	for(i = 0; i < rowNum; ++i)
	{
		for(j = 0; j < colNum; ++j)
			printf("%d ", data[i * rowNum + j]);
		printf("\n");
	}
	free(data);
	return 0;
}



Logo

更多推荐