c++版线程池和任务池示例
commondef.h
//单位秒,监测空闲列表时间间隔,在空闲队列中超过TASK_DESTROY_INTERVAL时间的任务将被自动销毁
constintCHECK_IDLE_TASK_INTERVAL=300;
//单位秒,任务自动销毁时间间隔
constintTASK_DESTROY_INTERVAL=60;
//监控线程池是否为空时间间隔,微秒
constintIDLE_CHECK_POLL_EMPTY=500;
//线程池线程空闲自动退出时间间隔,5分钟
constint THREAD_WAIT_TIME_OUT=300;
taskpool.cpp
#include"taskpool.h"
#include<string.h>
#include<stdio.h>
#include<pthread.h>
TaskPool::TaskPool(constint&poolMaxSize)
:m_poolSize(poolMaxSize)
,m_taskListSize(0)
,m_bStop(false)
{
pthread_mutex_init(&m_lock,NULL);
pthread_mutex_init(&m_idleMutex,NULL);
pthread_cond_init(&m_idleCond,NULL);
pthread_attr_tattr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);//让线程独立运行
pthread_create(&m_idleId,&attr,CheckIdleTask,this);//创建监测空闲任务进程
pthread_attr_destroy(&attr);
}
TaskPool::~TaskPool()
{
if(!m_bStop)
{
StopPool();
}
if(!m_taskList.empty())
{
std::list<Task*>::iteratorit=m_taskList.begin();
for(;it!=m_taskList.end();++it)
{
if(*it!=NULL)
{
delete*it;
*it=NULL;
}
}
m_taskList.clear();
m_taskListSize=0;
}
if(!m_idleList.empty())
{
std::list<Task*>::iteratorit=m_idleList.begin();
for(;it!=m_idleList.end();++it)
{
if(*it!=NULL)
{
delete*it;
*it=NULL;
}
}
m_idleList.clear();
}
pthread_mutex_destroy(&m_lock);
pthread_mutex_destroy(&m_idleMutex);
pthread_cond_destroy(&m_idleCond);
}
void*TaskPool::CheckIdleTask(void*arg)
{
TaskPool*pool=(TaskPool*)arg;
while(1)
{
pool->LockIdle();
pool->RemoveIdleTask();
if(pool->GetStop())
{
pool->UnlockIdle();
break;
}
pool->CheckIdleWait();
pool->UnlockIdle();
}
}
voidTaskPool::StopPool()
{
m_bStop=true;
LockIdle();
pthread_cond_signal(&m_idleCond);//防止监控线程正在等待,而引起无法退出的问题
UnlockIdle();
pthread_join(m_idleId,NULL);
}
boolTaskPool::GetStop()
{
returnm_bStop;
}
voidTaskPool::CheckIdleWait()
{
structtimespectimeout;
memset(&timeout,0,sizeof(timeout));
timeout.tv_sec=time(0)+CHECK_IDLE_TASK_INTERVAL;
timeout.tv_nsec=0;
pthread_cond_timedwait(&m_idleCond,&m_idleMutex,&timeout);
}
intTaskPool::RemoveIdleTask()
{
intiRet=0;
std::list<Task*>::iteratorit,next;
std::list<Task*>::reverse_iteratorrit=m_idleList.rbegin();
time_tcurTime=time(0);
for(;rit!=m_idleList.rend();)
{
it=--rit.base();
if(difftime(curTime,((*it)->last_time))>=TASK_DESTROY_INTERVAL)
{
iRet++;
delete*it;
*it=NULL;
next=m_idleList.erase(it);
rit=std::list<Task*>::reverse_iterator(next);
}
else
{
break;
}
}
}
intTaskPool::AddTask(task_funfun,void*arg)
{
intiRet=0;
if(0!=fun)
{
pthread_mutex_lock(&m_lock);
if(m_taskListSize>=m_poolSize)
{
pthread_mutex_unlock(&m_lock);
iRet=-1;//taskpoolisfull;
}
else
{
pthread_mutex_unlock(&m_lock);
Task*task=GetIdleTask();
if(NULL==task)
{
task=newTask;
}
if(NULL==task)
{
iRet=-2;//newfailed
}
else
{
task->fun=fun;
task->data=arg;
pthread_mutex_lock(&m_lock);
m_taskList.push_back(task);
++m_taskListSize;
pthread_mutex_unlock(&m_lock);
}
}
}
returniRet;
}
Task*TaskPool::GetTask()
{
Task*task=NULL;
pthread_mutex_lock(&m_lock);
if(!m_taskList.empty())
{
task= m_taskList.front();
m_taskList.pop_front();
--m_taskListSize;
}
pthread_mutex_unlock(&m_lock);
returntask;
}
voidTaskPool::LockIdle()
{
pthread_mutex_lock(&m_idleMutex);
}
voidTaskPool::UnlockIdle()
{
pthread_mutex_unlock(&m_idleMutex);
}
Task*TaskPool::GetIdleTask()
{
LockIdle();
Task*task=NULL;
if(!m_idleList.empty())
{
task=m_idleList.front();
m_idleList.pop_front();
}
UnlockIdle();
returntask;
}
voidTaskPool::SaveIdleTask(Task*task)
{
if(NULL!=task)
{
task->fun=0;
task->data=NULL;
task->last_time=time(0);
LockIdle();
m_idleList.push_front(task);
UnlockIdle();
}
}
taskpool.h
#include<list> //所有的用户操作为一个task, //任务池,所有任务会投递到任务池中,管理线程负责将任务投递给线程池 /*pur@添加任务到任务队列的尾部 /*pur@保存空闲任务到空闲队列中 threadpool.cpp #include"threadpool.h" /* Thread::Thread(booldetach,ThreadPool*pool) pthread_mutex_init(&m_mutex,NULL);//初始化互斥量 Thread::~Thread() ThreadPool::ThreadPool() ThreadPool::~ThreadPool() intThreadPool::InitPool(constint&poolMax,constint&poolPre) intiRet=0; voidThreadPool::GetThreadRun(task_funfun,void*arg) Thread*thread=m_threads.front(); pthread_mutex_lock(&thread->m_mutex); intThreadPool::Run(task_funfun,void*arg) voidThreadPool::StopPool(boolbStop) boolThreadPool::GetStop() Thread*ThreadPool::CreateThread() void*ThreadPool::WapperFun(void*arg) if(true==pool->GetStop()) pool->LockMutex(); voidThreadPool::SaveIdleThread(Thread*thread) intThreadPool::TotalThreads() void*ThreadPool::TerminalCheck(void*arg) usleep(IDLE_CHECK_POLL_EMPTY); voidThreadPool::TerminalCondSignal() voidThreadPool::RemoveThread(Thread*thread) voidThreadPool::LockMutex() voidThreadPool::UnlockMutex() voidThreadPool::IncreaseTotalNum() threadpool.h #include<list> //线程池,负责创建线程处理任务,处理完毕后会将线程加入到空闲队列中,从任务池中 /*pur@初始化线程池 /*pur@执行一个任务 /*pur@从线程池中获取一个一个线程运行任务 staticvoid*WapperFun(void*); private: pthread_mutex_tm_terminalMutex;//终止所有线程互斥量 threadpoolmanager.cpp #include<errno.h> /*#include<string.h> /*memset(&time_beg,0,sizeof(structtimeval)); ThreadPoolManager::~ThreadPoolManager() pthread_cond_destroy(&m_cond_task); /*gettimeofday(&time_end,NULL); intThreadPoolManager::Init( if(0>m_threadPool->InitPool(threadPoolMax,threadPoolPre)) voidThreadPoolManager::StopAll() voidThreadPoolManager::LockTask() voidThreadPoolManager::UnlockTask() void*ThreadPoolManager::TaskThread(void*arg) if(manager->GetStop()) ThreadPool*ThreadPoolManager::GetThreadPool() TaskPool*ThreadPoolManager::GetTaskPool() int ThreadPoolManager::Run(task_funfun,void*arg) if(iRet==0&&(0==pthread_mutex_trylock(&m_mutex_task))) boolThreadPoolManager::GetStop() voidThreadPoolManager::TaskCondWait() pthread_cond_timedwait(&m_cond_task,&m_mutex_task,&to);//60秒超时 threadpoolmanager.h #include<pthread.h> classThreadPool; classThreadPoolManager /*pur@初始化线程池与任务池,threadPoolMax>threadPoolPre>threadPoolMin>=0 /*pur@执行一个任务 private: main.cpp intmain(intargc,char**argv) pthread_mutex_init(&m_mutex,NULL);
#ifndefTASKPOOL_H
#defineTASKPOOL_H
/*purpose@任务池,主要是缓冲外部高并发任务数,有manager负责调度任务
* 任务池可自动销毁长时间空闲的Task对象
* 可通过CHECK_IDLE_TASK_INTERVAL设置检查idle空闲进程轮训等待时间
* TASK_DESTROY_INTERVAL设置Task空闲时间,超过这个时间值将会被CheckIdleTask线程销毁
*date @2013.12.23
*author @haibin.wang
*/
#include<pthread.h>
#include"commondef.h"
typedefvoid(*task_fun)(void*);
structTask
{
task_funfun;//任务处理函数
void*data;//任务处理数据
time_tlast_time;//加入空闲队列的时间,用于自动销毁
};
classTaskPool
{
public:
/*pur@初始化任务池,启动任务池空闲队列自动销毁线程
*para@maxSize最大任务数,大于0
*/
TaskPool(constint&poolMaxSize);
~TaskPool();
*para@task,具体任务
*return@0添加成功,负数添加失败
*/
intAddTask(task_funfun,void*arg);
/*pur@从任务列表的头获取一个任务
*return@ 如果列表中有任务则返回一个Task指针,否则返回一个NULL
*/
Task*GetTask();
*para@task已被调用执行的任务
*return@
*/
voidSaveIdleTask(Task*task);
voidStopPool();
public:
voidLockIdle();
voidUnlockIdle();
voidCheckIdleWait();
intRemoveIdleTask();
boolGetStop();
private:
staticvoid*CheckIdleTask(void*);
/*pur@获取空闲的task
*para@
*para@
*return@NULL说明没有空闲的,否则从m_idleList中获取一个
*/
Task*GetIdleTask();
intGetTaskSize();
private:
intm_poolSize;//任务池大小
intm_taskListSize;//统计taskList的大小,因为当List的大小会随着数量的增多而耗时增加
boolm_bStop;//是否停止
std::list<Task*>m_taskList;//所有待处理任务列表
std::list<Task*>m_idleList;//所有空闲任务列表
pthread_mutex_tm_lock;//对任务列表进行加锁,保证每次只能取一个任务
pthread_mutex_tm_idleMutex;//空闲任务队列锁
pthread_cond_tm_idleCond;//空闲队列等待条件
pthread_tm_idleId;;
};
#endif
/*purpose@线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)
*date @2014.01.03
*author @haibin.wang
*/
#include<errno.h>
#include<string.h>
#include<iostream>
#include<stdio.h>
*/
:m_pool(pool)
{
pthread_attr_init(&m_attr);
if(detach)
{
pthread_attr_setdetachstate(&m_attr,PTHREAD_CREATE_DETACHED);//让线程独立运行
}
else
{
pthread_attr_setdetachstate(&m_attr,PTHREAD_CREATE_JOINABLE);
}
pthread_cond_init(&m_cond,NULL);//初始化条件变量
task.fun=0;
task.data=NULL;
}
{
pthread_cond_destroy(&m_cond);
pthread_mutex_destroy(&m_mutex);
pthread_attr_destroy(&m_attr);
}
:m_poolMax(0)
,m_idleNum(0)
,m_totalNum(0)
,m_bStop(false)
{
pthread_mutex_init(&m_mutex,NULL);
pthread_mutex_init(&m_runMutex,NULL);
pthread_mutex_init(&m_terminalMutex,NULL);
pthread_cond_init(&m_terminalCond,NULL);
pthread_cond_init(&m_emptyCond,NULL);
}
{
/*if(!m_threads.empty())
{
std::list<Thread*>::iteratorit=m_threads.begin();
for(;it!=m_threads.end();++it)
{
if(*it!=NULL)
{
pthread_cond_destroy(&((*it)->m_cond));
pthread_mutex_destroy(&((*it)->m_mutex));
delete*it;
*it=NULL;
}
}
m_threads.clear();
}*/
pthread_mutex_destroy(&m_runMutex);
pthread_mutex_destroy(&m_terminalMutex);
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_terminalCond);
pthread_cond_destroy(&m_emptyCond);
}
{
if(poolMax<poolPre
||poolPre<0
||poolMax<=0)
{
return-1;
}
m_poolMax=poolMax;
for(inti=0;i<poolPre;++i)
{
Thread*thread=CreateThread();
if(NULL==thread)
{
iRet=-2;
}
}
if(iRet<0)
{
std::list<Thread*>::iteratorit=m_threads.begin();
for(;it!=m_threads.end();++it)
{
if(NULL!=(*it))
{
delete*it;
*it=NULL;
}
}
m_threads.clear();
m_totalNum=0;
}
returniRet;
}
{
//从线程池中获取一个线程
pthread_mutex_lock(&m_mutex);
if(m_threads.empty())
{
pthread_cond_wait(&m_emptyCond,&m_mutex);//阻塞等待有空闲线程
}
m_threads.pop_front();
pthread_mutex_unlock(&m_mutex);
thread->task.fun=fun;
thread->task.data=arg;
pthread_cond_signal(&thread->m_cond);//触发线程WapperFun循环执行
pthread_mutex_unlock(&thread->m_mutex);
}
{
pthread_mutex_lock(&m_runMutex);//保证每次只能由一个线程执行
intiRet=0;
if(m_totalNum<m_poolMax)//
{
if(m_threads.empty()&&(NULL==CreateThread()))
{
iRet=-1;//cannotcreatenewthread!
}
else
{
GetThreadRun(fun,arg);
}
}
else
{
GetThreadRun(fun,arg);
}
pthread_mutex_unlock(&m_runMutex);
returniRet;
}
{
m_bStop=bStop;
if(bStop)
{
//启动监控所有空闲线程是否退出的线程
Threadthread(false,this);
pthread_create(&thread.m_threadId,&thread.m_attr,ThreadPool::TerminalCheck,&thread);//启动监控所有线程退出线程
//阻塞等待所有空闲线程退出
pthread_join(thread.m_threadId,NULL);
}
/*if(bStop)
{
pthread_mutex_lock(&m_terminalMutex);
//启动监控所有空闲线程是否退出的线程
Threadthread(true,this);
pthread_create(&thread.m_threadId,&thread.m_attr,ThreadPool::TerminalCheck,&thread);//启动监控所有线程退出线程
//阻塞等待所有空闲线程退出
pthread_cond_wait(&m_terminalCond,&m_terminalMutex);
pthread_mutex_unlock(&m_terminalMutex);
}*/
}
{
returnm_bStop;
}
{
Thread*thread=NULL;
thread=newThread(true,this);
if(NULL!=thread)
{
intiret=pthread_create(&thread->m_threadId,&thread->m_attr,ThreadPool::WapperFun,thread);//通过WapperFun将线程加入到空闲队列中
if(0!=iret)
{
deletethread;
thread=NULL;
}
}
returnthread;
}
{
Thread*thread=(Thread*)arg;
if(NULL==thread||NULL==thread->m_pool)
{
returnNULL;
}
ThreadPool*pool=thread->m_pool;
pool->IncreaseTotalNum();
structtimespecabstime;
memset(&abstime,0,sizeof(abstime));
while(1)
{
if(0!=thread->task.fun)
{
thread->task.fun(thread->task.data);
}
{
break;//确定当前任务执行完毕后再判定是否退出线程
}
pthread_mutex_lock(&thread->m_mutex);
pool->SaveIdleThread(thread);//将线程加入到空闲队列中
abstime.tv_sec=time(0)+THREAD_WAIT_TIME_OUT;
abstime.tv_nsec=0;
if(ETIMEDOUT ==pthread_cond_timedwait(&thread->m_cond,&thread->m_mutex,&abstime))//等待线程被唤醒或超时自动退出
{
pthread_mutex_unlock(&thread->m_mutex);
break;
}
pthread_mutex_unlock(&thread->m_mutex);
}
pool->DecreaseTotalNum();
if(thread!=NULL)
{
pool->RemoveThread(thread);
deletethread;
thread=NULL;
}
pool->UnlockMutex();
return0;
}
{
if(thread)
{
thread->task.fun=0;
thread->task.data=NULL;
LockMutex();
if(m_threads.empty())
{
pthread_cond_broadcast(&m_emptyCond);//发送不空的信号,告诉run函数线程队列已经不空了
}
m_threads.push_front(thread);
UnlockMutex();
}
}
{
returnm_totalNum;
}
voidThreadPool::SendSignal()
{
LockMutex();
std::list<Thread*>::iteratorit=m_threads.begin();
for(;it!=m_threads.end();++it)
{
pthread_mutex_lock(&(*it)->m_mutex);
pthread_cond_signal(&((*it)->m_cond));
pthread_mutex_unlock(&(*it)->m_mutex);
}
UnlockMutex();
}
{
Thread*thread=(Thread*)arg;
if(NULL==thread||NULL==thread->m_pool)
{
returnNULL;
}
ThreadPool*pool=thread->m_pool;
while((false==pool->GetStop())||pool->TotalThreads()>0)
{
pool->SendSignal();
}
//pool->TerminalCondSignal();
return0;
}
{
pthread_cond_signal(&m_terminalCond);
}
{
m_threads.remove(thread);
}
{
pthread_mutex_lock(&m_mutex);
}
{
pthread_mutex_unlock(&m_mutex);
}
{
LockMutex();
m_totalNum++;
UnlockMutex();
}
voidThreadPool::DecreaseTotalNum()
{
m_totalNum--;
}
#ifndefTHREADPOOL_H
#defineTHREADPOOL_H
/*purpose@线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)a
* 当线程池退出时创建TerminalCheck线程,负责监测线程池所有线程退出
*date @2013.12.23
*author @haibin.wang
*/
#include<string>
#include"taskpool.h"
//通过threadmanager来控制任务调度进程
//threadpool的TerminalCheck线程负责监测线程池所有线程退出
classThreadPool;
classThread
{
public:
Thread(booldetach,ThreadPool*pool);
~Thread();
pthread_t m_threadId;//线程id
pthread_mutex_tm_mutex;//互斥锁
pthread_cond_tm_cond;//条件变量
pthread_attr_tm_attr;//线程属性
Task task;//
ThreadPool*m_pool;//所属线程池
};
classThreadPool
{
public:
ThreadPool();
~ThreadPool();
*para@poolMax线程池最大线程数
*para@poolPre预创建线程数
*return@0:成功
* -1:parametererror,mustpoolMax>poolPre>=0
* -2:创建线程失败
*/
intInitPool(constint&poolMax,constint&poolPre);
*para@task任务指针
*return@0任务分配成功,负值任务分配失败,-1,创建新线程失败
*/
intRun(task_funfun,void*arg);
/*pur@设置是否停止线程池工作
*para@bStoptrue停止,false不停止
*/
voidStopPool(boolbStop);
public://此公有函数主要用于静态函数调用
/*pur@获取进程池的启停状态
*return@
*/
boolGetStop();
voidSaveIdleThread(Thread*thread);
voidLockMutex();
voidUnlockMutex();
voidDecreaseTotalNum();
voidIncreaseTotalNum();
voidRemoveThread(Thread*thread);
voidTerminalCondSignal();
intTotalThreads();
voidSendSignal();
private:
/*pur@创建线程
*return@非空成功,NULL失败,
*/
Thread*CreateThread();
*para@fun函数指针
*para@arg函数参数
*return@
*/
voidGetThreadRun(task_funfun,void*arg);
staticvoid*TerminalCheck(void*);//循环监测是否所有线程终止线程
intm_poolMax;//线程池最大线程数
intm_idleNum;//空闲线程数
intm_totalNum;//当前线程总数小于最大线程数
boolm_bStop;//是否停止线程池
pthread_mutex_tm_mutex;//线程列表锁
pthread_mutex_tm_runMutex;//run函数锁
pthread_cond_t m_terminalCond;//终止所有线程条件变量
pthread_cond_t m_emptyCond;//空闲线程不空条件变量
std::list<Thread*>m_threads;//线程列表
};
#endif
#include"threadpoolmanager.h"
#include"threadpool.h"
#include"taskpool.h"
#include<string.h>
#include<sys/time.h>
#include<stdio.h>*/
// structtimevaltime_beg,time_end;
ThreadPoolManager::ThreadPoolManager()
:m_threadPool(NULL)
,m_taskPool(NULL)
,m_bStop(false)
{
pthread_mutex_init(&m_mutex_task,NULL);
pthread_cond_init(&m_cond_task,NULL);
memset(&time_end,0,sizeof(structtimeval));
gettimeofday(&time_beg,NULL);*/
}
{
StopAll();
if(NULL!=m_threadPool)
{
deletem_threadPool;
m_threadPool=NULL;
}
if(NULL!=m_taskPool)
{
deletem_taskPool;
m_taskPool=NULL;
}
pthread_mutex_destroy(&m_mutex_task);
longtotal=(time_end.tv_sec-time_beg.tv_sec)*1000000+(time_end.tv_usec-time_beg.tv_usec);
printf("managertotaltime=%d\n",total);
gettimeofday(&time_beg,NULL);*/
}
constint&tastPoolSize,
constint&threadPoolMax,
constint&threadPoolPre)
{
m_threadPool=newThreadPool();
if(NULL==m_threadPool)
{
return-1;
}
m_taskPool=newTaskPool(tastPoolSize);
if(NULL==m_taskPool)
{
return-2;
}
{
return-3;
}
//启动线程池
//启动任务池
//启动任务获取线程,从任务池中不断拿任务到线程池中
pthread_attr_tattr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);
pthread_create(&m_taskThreadId,&attr,TaskThread,this);//创建获取任务进程
pthread_attr_destroy(&attr);
return0;
}
{
m_bStop=true;
LockTask();
pthread_cond_signal(&m_cond_task);
UnlockTask();
pthread_join(m_taskThreadId,NULL);
//等待当前所有任务执行完毕
m_taskPool->StopPool();
m_threadPool->StopPool(true);//停止线程池工作
}
{
pthread_mutex_lock(&m_mutex_task);
}
{
pthread_mutex_unlock(&m_mutex_task);
}
{
ThreadPoolManager*manager=(ThreadPoolManager*)arg;
while(1)
{
manager->LockTask();//防止任务没有执行完毕发送了停止信号
while(1)//将任务队列中的任务执行完再退出
{
Task*task=manager->GetTaskPool()->GetTask();
if(NULL==task)
{
break;
}
else
{
manager->GetThreadPool()->Run(task->fun,task->data);
manager->GetTaskPool()->SaveIdleTask(task);
}
}
{
manager->UnlockTask();
break;
}
manager->TaskCondWait();//等待有任务的时候执行
manager->UnlockTask();
}
return0;
}
{
returnm_threadPool;
}
{
returnm_taskPool;
}
{
if(0==fun)
{
return0;
}
if(!m_bStop)
{
intiRet= m_taskPool->AddTask(fun,arg);
{
pthread_cond_signal(&m_cond_task);
UnlockTask();
}
returniRet;
}
else
{
return-3;
}
}
{
returnm_bStop;
}
{
structtimespecto;
memset(&to,0,sizeofto);
to.tv_sec=time(0)+60;
to.tv_nsec=0;
}
#ifndefTHREADPOOLMANAGER_H
#defineTHREADPOOLMANAGER_H
/*purpose@
* 基本流程:
* 管理线程池和任务池,先将任务加入任务池,然后由TaskThread负责从任务池中将任务取出放入到线程池中
* 基本功能:
* 1、工作线程可以在业务不忙的时候自动退出部分长时间不使用的线程
* 2、任务池可以在业务不忙的时候自动释放长时间不使用的资源(可通过commondef.h修改)
* 3、当程序退时不再向任务池中添加任务,当任务池中所有任务执行完毕后才退出相关程序(做到程序的安全退出)
* 线程资源:
* 如果不预分配任何处理线程的话,ThreadPool只有当有任务的时候才实际创建需要的线程,最大线程创建数为用户指定
* 当manager销毁的时候,manager会创建一个监控所有任务执行完毕的监控线程,只有当所有任务执行完毕后manager才销毁
* 线程最大数为:1个TaskPool线程+1个manager任务调度线程+ThreadPool最大线程数+1个manager退出监控线程+1线程池所有线程退出监控线程
* 线程最小数为:1个TaskPool创建空闲任务资源销毁监控线程+1个manager创建任务调度线程
* 使用方法:
* ThreadPoolManagermanager;
* manager.Init(100000,50,5);//初始化一个任务池为10000,线程池最大线程数50,预创建5个线程的管理器
* manager.run(fun,data);//添加执行任务到manager中,fun为函数指针,data为fun需要传入的参数,data可以为NULL
*
*date @2013.12.23
*author @haibin.wang
*
* 详细参数控制可以修改commondef.h中的相关变量值
*/
typedefvoid(*task_fun)(void*);
classTaskPool;
{
public:
ThreadPoolManager();
~ThreadPoolManager();
*para@tastPoolSize任务池大小
*para@threadPoolMax线程池最大线程数
*para@threadPoolPre预创建线程数
*return@0:初始化成功,负数初始化失败
* -1:创建线程池失败
* -2:创建任务池失败
* -3:线程池初始化失败
*/
intInit(constint&tastPoolSize,
constint&threadPoolMax,
constint&threadPoolPre);
*para@fun需要执行的函数指针
*para@argfun需要的参数,默认为NULL
*return@0任务分配成功,负数任务分配失败
* -1:任务池满
* -2:任务池new失败
* -3:manager已经发送停止信号,不再接收新任务
*/
intRun(task_funfun,void*arg=NULL);
public://以下public函数主要用于静态函数调用
boolGetStop();
voidTaskCondWait();
TaskPool*GetTaskPool();
ThreadPool*GetThreadPool();
voidLockTask();
voidUnlockTask();
voidLockFull();
staticvoid*TaskThread(void*);//任务处理线程
voidStopAll();
private:
ThreadPool*m_threadPool;//线程池
TaskPool*m_taskPool;//任务池
boolm_bStop;//是否终止管理器
pthread_tm_taskThreadId;//TaskThread线程id
pthread_mutex_tm_mutex_task;
pthread_cond_tm_cond_task;
};
#endif
#include<iostream>
#include<string>
#include"threadpoolmanager.h"
#include<sys/time.h>
#include<string.h>
#include<stdlib.h>
#include<pthread.h>
usingnamespacestd;
intseq=0;
intbillNum=0;
intinter=1;
pthread_mutex_tm_mutex;
voidmyFunc(void*arg)
{
pthread_mutex_lock(&m_mutex);
seq++;
if(seq%inter==0)
{
cout<<"fun1="<<seq<<endl;
}
if(seq>=1000000000)
{
cout<<"billion"<<endl;
seq=0;
billNum++;
}
pthread_mutex_unlock(&m_mutex);
//sleep();
}
{
if(argc!=6)
{
cout<<"必须有5个参数任务执行次数任务池大小线程池大小预创建线程数输出间隔"<<endl;
cout<<"eg:./test999999100001001020"<<endl;
cout<<"上例代表创建一个间隔20个任务输出,任务池大小为10000,线程池大小为100,预创建10个线程,执行任务次数为:999999"<<endl;
return0;
}
doubleloopSize=atof(argv[1]);
inttaskSize=atoi(argv[2]);
intthreadPoolSize=atoi(argv[3]);
intpreSize=atoi(argv[4]);
inter=atoi(argv[5]);
ThreadPoolManagermanager;
if(0>manager.Init(taskSize, threadPoolSize,preSize))
{
cout<<"初始化失败"<<endl;
return0;
}
cout<<"*******************初始化完成*********************"<<endl;
structtimevaltime_beg,time_end;
memset(&time_beg,0,sizeof(structtimeval));
memset(&time_end,0,sizeof(structtimeval));
gettimeofday(&time_beg,NULL);
doublei=0;
for(;i<loopSize;++i)
{
while(0>manager.Run(myFunc,NULL))
{
usleep(100);
}
}
gettimeofday(&time_end,NULL);
longtotal=(time_end.tv_sec-time_beg.tv_sec)*1000000+(time_end.tv_usec-time_beg.tv_usec);
cout<<"totaltime="<<total<<endl;
cout<<"totalnum="<<i <<"billionnum="<<billNum<<endl;
cout<<__FILE__<<"将关闭所有线程"<<endl;
//pthread_mutex_destroy(&m_mutex);
return0;
}相关文章