zl程序教程

您现在的位置是:首页 >  后端

当前栏目

c++版线程池和任务池示例

C++线程 示例 任务
2023-06-13 09:15:18 时间

commondef.h

复制代码代码如下:


//单位秒,监测空闲列表时间间隔,在空闲队列中超过TASK_DESTROY_INTERVAL时间的任务将被自动销毁
constintCHECK_IDLE_TASK_INTERVAL=300;
//单位秒,任务自动销毁时间间隔
constintTASK_DESTROY_INTERVAL=60;

//监控线程池是否为空时间间隔,微秒
constintIDLE_CHECK_POLL_EMPTY=500;

//线程池线程空闲自动退出时间间隔,5分钟
constint THREAD_WAIT_TIME_OUT=300;

taskpool.cpp

复制代码代码如下:


#include"taskpool.h"

#include<string.h>

#include<stdio.h>
#include<pthread.h>

   TaskPool::TaskPool(constint&poolMaxSize)
   :m_poolSize(poolMaxSize)
     ,m_taskListSize(0)
     ,m_bStop(false)
{
   pthread_mutex_init(&m_lock,NULL);
   pthread_mutex_init(&m_idleMutex,NULL);
   pthread_cond_init(&m_idleCond,NULL);

   pthread_attr_tattr;
   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);//让线程独立运行
   pthread_create(&m_idleId,&attr,CheckIdleTask,this);//创建监测空闲任务进程
   pthread_attr_destroy(&attr);
}

TaskPool::~TaskPool()
{
   if(!m_bStop)
   {
       StopPool();
   }
   if(!m_taskList.empty())
   {
       std::list<Task*>::iteratorit=m_taskList.begin();
       for(;it!=m_taskList.end();++it)
       {
           if(*it!=NULL)
           {
               delete*it;
               *it=NULL;
           }
       }
       m_taskList.clear();
       m_taskListSize=0;
   }
   if(!m_idleList.empty())
   {
       std::list<Task*>::iteratorit=m_idleList.begin();
       for(;it!=m_idleList.end();++it)
       {
           if(*it!=NULL)
           {
               delete*it;
               *it=NULL;
           }
       }
       m_idleList.clear();
   }


   pthread_mutex_destroy(&m_lock);
   pthread_mutex_destroy(&m_idleMutex);
   pthread_cond_destroy(&m_idleCond);
}

void*TaskPool::CheckIdleTask(void*arg)
{
   TaskPool*pool=(TaskPool*)arg;
   while(1)
   {
       pool->LockIdle();
       pool->RemoveIdleTask();
       if(pool->GetStop())
       {
           pool->UnlockIdle();
           break;
       }
       pool->CheckIdleWait();
       pool->UnlockIdle();
   }
}

voidTaskPool::StopPool()
{
   m_bStop=true;
   LockIdle();
   pthread_cond_signal(&m_idleCond);//防止监控线程正在等待,而引起无法退出的问题
   UnlockIdle();
   pthread_join(m_idleId,NULL);
}

boolTaskPool::GetStop()
{
   returnm_bStop;
}

voidTaskPool::CheckIdleWait()
{
   structtimespectimeout;
   memset(&timeout,0,sizeof(timeout));
   timeout.tv_sec=time(0)+CHECK_IDLE_TASK_INTERVAL;
   timeout.tv_nsec=0;
   pthread_cond_timedwait(&m_idleCond,&m_idleMutex,&timeout);
}

intTaskPool::RemoveIdleTask()
{
   intiRet=0;
   std::list<Task*>::iteratorit,next;
   std::list<Task*>::reverse_iteratorrit=m_idleList.rbegin();
   time_tcurTime=time(0);
   for(;rit!=m_idleList.rend();)
   {
       it=--rit.base();
       if(difftime(curTime,((*it)->last_time))>=TASK_DESTROY_INTERVAL)
       {
           iRet++;
           delete*it;
           *it=NULL;
           next=m_idleList.erase(it);
           rit=std::list<Task*>::reverse_iterator(next);
       }
       else
       {
           break;   
       }
   }
}

intTaskPool::AddTask(task_funfun,void*arg)
{
   intiRet=0;
   if(0!=fun)
   {
       pthread_mutex_lock(&m_lock);
       if(m_taskListSize>=m_poolSize)
       {
           pthread_mutex_unlock(&m_lock);
           iRet=-1;//taskpoolisfull;
       }
       else
       {
           pthread_mutex_unlock(&m_lock);
           Task*task=GetIdleTask();
           if(NULL==task)
           {
               task=newTask;
           }
           if(NULL==task)
           {
               iRet=-2;//newfailed
           }
           else
           {
               task->fun=fun;
               task->data=arg;
               pthread_mutex_lock(&m_lock);
               m_taskList.push_back(task);
               ++m_taskListSize;
               pthread_mutex_unlock(&m_lock);
           }
       }
   }
   returniRet;
}

Task*TaskPool::GetTask()
{
   Task*task=NULL;
   pthread_mutex_lock(&m_lock);
   if(!m_taskList.empty())
   {
       task= m_taskList.front();
       m_taskList.pop_front();
       --m_taskListSize;
   }
   pthread_mutex_unlock(&m_lock);
   returntask;
}

voidTaskPool::LockIdle()
{
   pthread_mutex_lock(&m_idleMutex);
}

voidTaskPool::UnlockIdle()
{
   pthread_mutex_unlock(&m_idleMutex);
}

Task*TaskPool::GetIdleTask()
{
   LockIdle();
   Task*task=NULL;
   if(!m_idleList.empty())
   {
       task=m_idleList.front();
       m_idleList.pop_front();
   }
   UnlockIdle();
   returntask;
}

voidTaskPool::SaveIdleTask(Task*task)
{
   if(NULL!=task)
   {
       task->fun=0;
       task->data=NULL;
       task->last_time=time(0);
       LockIdle();
       m_idleList.push_front(task);
       UnlockIdle();
   }
}

taskpool.h

复制代码代码如下:
#ifndefTASKPOOL_H
#defineTASKPOOL_H
/*purpose@任务池,主要是缓冲外部高并发任务数,有manager负责调度任务
 *         任务池可自动销毁长时间空闲的Task对象
 *         可通过CHECK_IDLE_TASK_INTERVAL设置检查idle空闲进程轮训等待时间
 *         TASK_DESTROY_INTERVAL设置Task空闲时间,超过这个时间值将会被CheckIdleTask线程销毁
 *date   @2013.12.23
 *author @haibin.wang
 */

#include<list>
#include<pthread.h>
#include"commondef.h"

//所有的用户操作为一个task,
typedefvoid(*task_fun)(void*);
structTask
{
   task_funfun;//任务处理函数
   void*data;//任务处理数据
   time_tlast_time;//加入空闲队列的时间,用于自动销毁
};

//任务池,所有任务会投递到任务池中,管理线程负责将任务投递给线程池
classTaskPool
{
public:
 /*pur@初始化任务池,启动任务池空闲队列自动销毁线程
    *para@maxSize最大任务数,大于0
   */
   TaskPool(constint&poolMaxSize);
   ~TaskPool();

   /*pur@添加任务到任务队列的尾部
    *para@task,具体任务
    *return@0添加成功,负数添加失败
   */   
   intAddTask(task_funfun,void*arg);

   /*pur@从任务列表的头获取一个任务
    *return@ 如果列表中有任务则返回一个Task指针,否则返回一个NULL
   */   
   Task*GetTask();

   /*pur@保存空闲任务到空闲队列中
    *para@task已被调用执行的任务
    *return@
   */
   voidSaveIdleTask(Task*task);

   voidStopPool();
public:
   voidLockIdle();
   voidUnlockIdle();
   voidCheckIdleWait();
   intRemoveIdleTask();
   boolGetStop();
private:
   staticvoid*CheckIdleTask(void*);
   /*pur@获取空闲的task
    *para@
    *para@
    *return@NULL说明没有空闲的,否则从m_idleList中获取一个
   */
   Task*GetIdleTask();
   intGetTaskSize();
private:
   intm_poolSize;//任务池大小
   intm_taskListSize;//统计taskList的大小,因为当List的大小会随着数量的增多而耗时增加
   boolm_bStop;//是否停止
   std::list<Task*>m_taskList;//所有待处理任务列表
   std::list<Task*>m_idleList;//所有空闲任务列表
   pthread_mutex_tm_lock;//对任务列表进行加锁,保证每次只能取一个任务
   pthread_mutex_tm_idleMutex;//空闲任务队列锁
   pthread_cond_tm_idleCond;//空闲队列等待条件
   pthread_tm_idleId;;
};
#endif

threadpool.cpp

复制代码代码如下:
/*purpose@线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)
 *date   @2014.01.03
 *author @haibin.wang
 */

#include"threadpool.h"
#include<errno.h>
#include<string.h>

/*
#include<iostream>
#include<stdio.h>
*/

Thread::Thread(booldetach,ThreadPool*pool)
   :m_pool(pool)
{
   pthread_attr_init(&m_attr);
   if(detach)
   {
       pthread_attr_setdetachstate(&m_attr,PTHREAD_CREATE_DETACHED);//让线程独立运行
   }
   else
   {
        pthread_attr_setdetachstate(&m_attr,PTHREAD_CREATE_JOINABLE);
   }

   pthread_mutex_init(&m_mutex,NULL);//初始化互斥量
   pthread_cond_init(&m_cond,NULL);//初始化条件变量
   task.fun=0;
   task.data=NULL;
}

Thread::~Thread()
{
   pthread_cond_destroy(&m_cond);
   pthread_mutex_destroy(&m_mutex);
   pthread_attr_destroy(&m_attr);
}

   ThreadPool::ThreadPool()
   :m_poolMax(0)
   ,m_idleNum(0)
   ,m_totalNum(0)
     ,m_bStop(false)
{
   pthread_mutex_init(&m_mutex,NULL);
   pthread_mutex_init(&m_runMutex,NULL);
   pthread_mutex_init(&m_terminalMutex,NULL);
   pthread_cond_init(&m_terminalCond,NULL);
   pthread_cond_init(&m_emptyCond,NULL);
}

ThreadPool::~ThreadPool()
{
   /*if(!m_threads.empty())
   {
       std::list<Thread*>::iteratorit=m_threads.begin();
       for(;it!=m_threads.end();++it)
       {
           if(*it!=NULL)
           {
               pthread_cond_destroy(&((*it)->m_cond));
               pthread_mutex_destroy(&((*it)->m_mutex));
               delete*it;
               *it=NULL;
           }
       }
       m_threads.clear();
   }*/
   pthread_mutex_destroy(&m_runMutex);
   pthread_mutex_destroy(&m_terminalMutex);
   pthread_mutex_destroy(&m_mutex);
   pthread_cond_destroy(&m_terminalCond);
   pthread_cond_destroy(&m_emptyCond);
}

intThreadPool::InitPool(constint&poolMax,constint&poolPre)
{
   if(poolMax<poolPre
           ||poolPre<0
           ||poolMax<=0)
   {
       return-1;
   }
   m_poolMax=poolMax;

   intiRet=0;
   for(inti=0;i<poolPre;++i)
   {
       Thread*thread=CreateThread();
       if(NULL==thread)
       {
           iRet=-2;
       }
   }

   if(iRet<0)
   { 
       std::list<Thread*>::iteratorit=m_threads.begin();
       for(;it!=m_threads.end();++it)
       {
           if(NULL!=(*it))
           {
               delete*it;
               *it=NULL;
           }
       }
       m_threads.clear();
       m_totalNum=0;
   }
   returniRet;
}

voidThreadPool::GetThreadRun(task_funfun,void*arg)
{
   //从线程池中获取一个线程
   pthread_mutex_lock(&m_mutex);
   if(m_threads.empty())
   {
       pthread_cond_wait(&m_emptyCond,&m_mutex);//阻塞等待有空闲线程
   }

   Thread*thread=m_threads.front();
   m_threads.pop_front();
   pthread_mutex_unlock(&m_mutex);

   pthread_mutex_lock(&thread->m_mutex);
   thread->task.fun=fun;
   thread->task.data=arg;       
   pthread_cond_signal(&thread->m_cond);//触发线程WapperFun循环执行
   pthread_mutex_unlock(&thread->m_mutex);
}

intThreadPool::Run(task_funfun,void*arg)
{
   pthread_mutex_lock(&m_runMutex);//保证每次只能由一个线程执行
   intiRet=0;
   if(m_totalNum<m_poolMax)//
   {
       if(m_threads.empty()&&(NULL==CreateThread()))
       {
           iRet=-1;//cannotcreatenewthread!
       }
       else
       {
           GetThreadRun(fun,arg);
       }
   }
   else
   {
       GetThreadRun(fun,arg);
   }
   pthread_mutex_unlock(&m_runMutex);
   returniRet;
}

voidThreadPool::StopPool(boolbStop)
{
   m_bStop=bStop;
   if(bStop)
   {
       //启动监控所有空闲线程是否退出的线程
       Threadthread(false,this);
       pthread_create(&thread.m_threadId,&thread.m_attr,ThreadPool::TerminalCheck,&thread);//启动监控所有线程退出线程
       //阻塞等待所有空闲线程退出
       pthread_join(thread.m_threadId,NULL);
   }
   /*if(bStop)
   {
       pthread_mutex_lock(&m_terminalMutex);
       //启动监控所有空闲线程是否退出的线程
       Threadthread(true,this);
       pthread_create(&thread.m_threadId,&thread.m_attr,ThreadPool::TerminalCheck,&thread);//启动监控所有线程退出线程
       //阻塞等待所有空闲线程退出
       pthread_cond_wait(&m_terminalCond,&m_terminalMutex);
       pthread_mutex_unlock(&m_terminalMutex);
   }*/
}

boolThreadPool::GetStop()
{
   returnm_bStop;
}

Thread*ThreadPool::CreateThread()
{
   Thread*thread=NULL;
   thread=newThread(true,this);
   if(NULL!=thread)
   {
       intiret=pthread_create(&thread->m_threadId,&thread->m_attr,ThreadPool::WapperFun,thread);//通过WapperFun将线程加入到空闲队列中
       if(0!=iret)
       {
           deletethread;
           thread=NULL;
       }
   }
   returnthread;
}

void*ThreadPool::WapperFun(void*arg)
{
   Thread*thread=(Thread*)arg;
   if(NULL==thread||NULL==thread->m_pool)
   {
       returnNULL;
   }
   ThreadPool*pool=thread->m_pool;
   pool->IncreaseTotalNum();
   structtimespecabstime;
   memset(&abstime,0,sizeof(abstime));
   while(1)
   {
       if(0!=thread->task.fun)
       {
           thread->task.fun(thread->task.data);
       }

       if(true==pool->GetStop()) 
       {
           break;//确定当前任务执行完毕后再判定是否退出线程
       }
       pthread_mutex_lock(&thread->m_mutex);
       pool->SaveIdleThread(thread);//将线程加入到空闲队列中
       abstime.tv_sec=time(0)+THREAD_WAIT_TIME_OUT;
       abstime.tv_nsec=0;
       if(ETIMEDOUT ==pthread_cond_timedwait(&thread->m_cond,&thread->m_mutex,&abstime))//等待线程被唤醒或超时自动退出
       {
           pthread_mutex_unlock(&thread->m_mutex);
           break;
       }
       pthread_mutex_unlock(&thread->m_mutex);
   }

   pool->LockMutex();
   pool->DecreaseTotalNum();
   if(thread!=NULL)
   {
       pool->RemoveThread(thread);
       deletethread;
       thread=NULL;
   }
   pool->UnlockMutex();
   return0;
}

voidThreadPool::SaveIdleThread(Thread*thread)
{
   if(thread)
   {
       thread->task.fun=0;
       thread->task.data=NULL;
       LockMutex();
       if(m_threads.empty())
       {
           pthread_cond_broadcast(&m_emptyCond);//发送不空的信号,告诉run函数线程队列已经不空了
       }
       m_threads.push_front(thread);
       UnlockMutex();
   }
}

intThreadPool::TotalThreads()
{
   returnm_totalNum;
}


voidThreadPool::SendSignal()
{
   LockMutex();
   std::list<Thread*>::iteratorit=m_threads.begin();
   for(;it!=m_threads.end();++it)
   {
       pthread_mutex_lock(&(*it)->m_mutex);
       pthread_cond_signal(&((*it)->m_cond));
       pthread_mutex_unlock(&(*it)->m_mutex);
   }
   UnlockMutex();
}

void*ThreadPool::TerminalCheck(void*arg)
{
   Thread*thread=(Thread*)arg;
   if(NULL==thread||NULL==thread->m_pool)
   {
       returnNULL;
   }
   ThreadPool*pool=thread->m_pool;
   while((false==pool->GetStop())||pool->TotalThreads()>0)
   {
       pool->SendSignal();

       usleep(IDLE_CHECK_POLL_EMPTY);
   }
   //pool->TerminalCondSignal();
   return0;
}

voidThreadPool::TerminalCondSignal()
{
   pthread_cond_signal(&m_terminalCond);
}

voidThreadPool::RemoveThread(Thread*thread)
{
   m_threads.remove(thread);
}

voidThreadPool::LockMutex()
{
   pthread_mutex_lock(&m_mutex);
}

voidThreadPool::UnlockMutex()
{
   pthread_mutex_unlock(&m_mutex);
}

voidThreadPool::IncreaseTotalNum()
{
   LockMutex();
   m_totalNum++;
   UnlockMutex();
}
voidThreadPool::DecreaseTotalNum()
{
   m_totalNum--;
}

threadpool.h

复制代码代码如下:
#ifndefTHREADPOOL_H
#defineTHREADPOOL_H
/*purpose@线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)a
 *         当线程池退出时创建TerminalCheck线程,负责监测线程池所有线程退出
 *date   @2013.12.23
 *author @haibin.wang
 */

#include<list>
#include<string>
#include"taskpool.h"
//通过threadmanager来控制任务调度进程
//threadpool的TerminalCheck线程负责监测线程池所有线程退出


classThreadPool;
classThread
{
public:
   Thread(booldetach,ThreadPool*pool);
   ~Thread();
   pthread_t m_threadId;//线程id
   pthread_mutex_tm_mutex;//互斥锁
   pthread_cond_tm_cond;//条件变量
   pthread_attr_tm_attr;//线程属性
 Task task;//
   ThreadPool*m_pool;//所属线程池
};

//线程池,负责创建线程处理任务,处理完毕后会将线程加入到空闲队列中,从任务池中
classThreadPool
{
public:
   ThreadPool();
   ~ThreadPool();

   /*pur@初始化线程池
    *para@poolMax线程池最大线程数
    *para@poolPre预创建线程数
    *return@0:成功
    *         -1:parametererror,mustpoolMax>poolPre>=0
    *         -2:创建线程失败
   */
   intInitPool(constint&poolMax,constint&poolPre);

   /*pur@执行一个任务
    *para@task任务指针
    *return@0任务分配成功,负值任务分配失败,-1,创建新线程失败
   */
   intRun(task_funfun,void*arg);

 /*pur@设置是否停止线程池工作
    *para@bStoptrue停止,false不停止
   */
 voidStopPool(boolbStop);

public://此公有函数主要用于静态函数调用
   /*pur@获取进程池的启停状态
    *return@
   */
   boolGetStop();   
 voidSaveIdleThread(Thread*thread);
   voidLockMutex();
   voidUnlockMutex();
   voidDecreaseTotalNum();
   voidIncreaseTotalNum();
   voidRemoveThread(Thread*thread);
   voidTerminalCondSignal();
   intTotalThreads();
   voidSendSignal();
private:
 /*pur@创建线程
    *return@非空成功,NULL失败,
   */
 Thread*CreateThread();

   /*pur@从线程池中获取一个一个线程运行任务
    *para@fun函数指针
    *para@arg函数参数
    *return@
   */
   voidGetThreadRun(task_funfun,void*arg);

 staticvoid*WapperFun(void*);
 staticvoid*TerminalCheck(void*);//循环监测是否所有线程终止线程

private:
   intm_poolMax;//线程池最大线程数
   intm_idleNum;//空闲线程数
   intm_totalNum;//当前线程总数小于最大线程数 
 boolm_bStop;//是否停止线程池
 pthread_mutex_tm_mutex;//线程列表锁
 pthread_mutex_tm_runMutex;//run函数锁

   pthread_mutex_tm_terminalMutex;//终止所有线程互斥量
   pthread_cond_t m_terminalCond;//终止所有线程条件变量
   pthread_cond_t m_emptyCond;//空闲线程不空条件变量

   std::list<Thread*>m_threads;//线程列表
};
#endif

threadpoolmanager.cpp

复制代码代码如下:
#include"threadpoolmanager.h"
#include"threadpool.h"
#include"taskpool.h"

#include<errno.h>
#include<string.h>

/*#include<string.h>
#include<sys/time.h>
#include<stdio.h>*/
 //  structtimevaltime_beg,time_end;
ThreadPoolManager::ThreadPoolManager()
   :m_threadPool(NULL)
   ,m_taskPool(NULL)
   ,m_bStop(false)
{
   pthread_mutex_init(&m_mutex_task,NULL);
   pthread_cond_init(&m_cond_task,NULL);

  /*memset(&time_beg,0,sizeof(structtimeval));
   memset(&time_end,0,sizeof(structtimeval));
   gettimeofday(&time_beg,NULL);*/
}

ThreadPoolManager::~ThreadPoolManager()
{
   StopAll();
   if(NULL!=m_threadPool)
   {
       deletem_threadPool;
       m_threadPool=NULL;
   }
   if(NULL!=m_taskPool)
   {
       deletem_taskPool;
       m_taskPool=NULL;
   }

   pthread_cond_destroy(&m_cond_task);
   pthread_mutex_destroy(&m_mutex_task);

   /*gettimeofday(&time_end,NULL);
   longtotal=(time_end.tv_sec-time_beg.tv_sec)*1000000+(time_end.tv_usec-time_beg.tv_usec);
   printf("managertotaltime=%d\n",total);
   gettimeofday(&time_beg,NULL);*/
}

intThreadPoolManager::Init(
       constint&tastPoolSize,
       constint&threadPoolMax,
       constint&threadPoolPre)
{
   m_threadPool=newThreadPool();
   if(NULL==m_threadPool)
   {
       return-1;
   }
   m_taskPool=newTaskPool(tastPoolSize);
   if(NULL==m_taskPool)
   {
       return-2;
   }

   if(0>m_threadPool->InitPool(threadPoolMax,threadPoolPre))
   {
       return-3;
   }
   //启动线程池
   //启动任务池
   //启动任务获取线程,从任务池中不断拿任务到线程池中
   pthread_attr_tattr;
   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);
   pthread_create(&m_taskThreadId,&attr,TaskThread,this);//创建获取任务进程
   pthread_attr_destroy(&attr);
   return0;
}

voidThreadPoolManager::StopAll()
{
   m_bStop=true;
   LockTask();
   pthread_cond_signal(&m_cond_task);
   UnlockTask();
   pthread_join(m_taskThreadId,NULL);
   //等待当前所有任务执行完毕
   m_taskPool->StopPool();
   m_threadPool->StopPool(true);//停止线程池工作
}

voidThreadPoolManager::LockTask()
{
   pthread_mutex_lock(&m_mutex_task);
}

voidThreadPoolManager::UnlockTask()
{
   pthread_mutex_unlock(&m_mutex_task);
}

void*ThreadPoolManager::TaskThread(void*arg)
{
   ThreadPoolManager*manager=(ThreadPoolManager*)arg;
   while(1)
   {
       manager->LockTask();//防止任务没有执行完毕发送了停止信号
       while(1)//将任务队列中的任务执行完再退出
       {
           Task*task=manager->GetTaskPool()->GetTask();
           if(NULL==task)
           {
               break;
           }
           else
           {
               manager->GetThreadPool()->Run(task->fun,task->data);
               manager->GetTaskPool()->SaveIdleTask(task);
           }
       }

       if(manager->GetStop())
       {
           manager->UnlockTask();
           break;
       }
       manager->TaskCondWait();//等待有任务的时候执行
       manager->UnlockTask();
   }
   return0;
}

ThreadPool*ThreadPoolManager::GetThreadPool()
{
   returnm_threadPool;
}

TaskPool*ThreadPoolManager::GetTaskPool()
{
   returnm_taskPool;
}

int ThreadPoolManager::Run(task_funfun,void*arg)
{
   if(0==fun)
   {
       return0;
   }
   if(!m_bStop)
   {  
       intiRet= m_taskPool->AddTask(fun,arg);

       if(iRet==0&&(0==pthread_mutex_trylock(&m_mutex_task)))
       {
           pthread_cond_signal(&m_cond_task);
           UnlockTask();
       }
       returniRet;
   }
   else
   {
       return-3;
   }
}

boolThreadPoolManager::GetStop()
{
   returnm_bStop;
}

voidThreadPoolManager::TaskCondWait()
{
   structtimespecto;
   memset(&to,0,sizeofto);
   to.tv_sec=time(0)+60;
   to.tv_nsec=0;

   pthread_cond_timedwait(&m_cond_task,&m_mutex_task,&to);//60秒超时
}

threadpoolmanager.h

复制代码代码如下:
#ifndefTHREADPOOLMANAGER_H
#defineTHREADPOOLMANAGER_H
/*purpose@
 *     基本流程:
 *         管理线程池和任务池,先将任务加入任务池,然后由TaskThread负责从任务池中将任务取出放入到线程池中
 *     基本功能:
 *         1、工作线程可以在业务不忙的时候自动退出部分长时间不使用的线程
 *         2、任务池可以在业务不忙的时候自动释放长时间不使用的资源(可通过commondef.h修改)
 *         3、当程序退时不再向任务池中添加任务,当任务池中所有任务执行完毕后才退出相关程序(做到程序的安全退出)
 *     线程资源:
 *         如果不预分配任何处理线程的话,ThreadPool只有当有任务的时候才实际创建需要的线程,最大线程创建数为用户指定
 *         当manager销毁的时候,manager会创建一个监控所有任务执行完毕的监控线程,只有当所有任务执行完毕后manager才销毁
 *         线程最大数为:1个TaskPool线程+1个manager任务调度线程+ThreadPool最大线程数+1个manager退出监控线程+1线程池所有线程退出监控线程
 *         线程最小数为:1个TaskPool创建空闲任务资源销毁监控线程+1个manager创建任务调度线程
 *     使用方法:
 *         ThreadPoolManagermanager;
 *         manager.Init(100000,50,5);//初始化一个任务池为10000,线程池最大线程数50,预创建5个线程的管理器
 *         manager.run(fun,data);//添加执行任务到manager中,fun为函数指针,data为fun需要传入的参数,data可以为NULL
 *
 *date   @2013.12.23
 *author @haibin.wang
 *
 * 详细参数控制可以修改commondef.h中的相关变量值
 */

#include<pthread.h>
typedefvoid(*task_fun)(void*);

classThreadPool;
classTaskPool;

classThreadPoolManager
{
public:
   ThreadPoolManager();
   ~ThreadPoolManager();

   /*pur@初始化线程池与任务池,threadPoolMax>threadPoolPre>threadPoolMin>=0
    *para@tastPoolSize任务池大小
    *para@threadPoolMax线程池最大线程数
    *para@threadPoolPre预创建线程数
    *return@0:初始化成功,负数初始化失败
    *         -1:创建线程池失败
    *         -2:创建任务池失败
    *         -3:线程池初始化失败
   */
   intInit(constint&tastPoolSize,
           constint&threadPoolMax,
           constint&threadPoolPre);

   /*pur@执行一个任务
    *para@fun需要执行的函数指针
    *para@argfun需要的参数,默认为NULL
    *return@0任务分配成功,负数任务分配失败
    *         -1:任务池满
    *         -2:任务池new失败
    *         -3:manager已经发送停止信号,不再接收新任务
   */
   intRun(task_funfun,void*arg=NULL);

public://以下public函数主要用于静态函数调用
   boolGetStop();
   voidTaskCondWait();
   TaskPool*GetTaskPool();
   ThreadPool*GetThreadPool();
   voidLockTask();
   voidUnlockTask();
   voidLockFull();

private:
 staticvoid*TaskThread(void*);//任务处理线程
 voidStopAll();

private:
   ThreadPool*m_threadPool;//线程池
   TaskPool*m_taskPool;//任务池
   boolm_bStop;//是否终止管理器

   pthread_tm_taskThreadId;//TaskThread线程id
 pthread_mutex_tm_mutex_task;
   pthread_cond_tm_cond_task;
};
#endif

main.cpp

复制代码代码如下:
#include<iostream>
#include<string>
#include"threadpoolmanager.h"
#include<sys/time.h>
#include<string.h>
#include<stdlib.h>
#include<pthread.h>


usingnamespacestd;
intseq=0;
intbillNum=0;
intinter=1;
pthread_mutex_tm_mutex;
voidmyFunc(void*arg)
{
   pthread_mutex_lock(&m_mutex);
   seq++;
   if(seq%inter==0)
   {
       cout<<"fun1="<<seq<<endl;
   }
   if(seq>=1000000000)
   {
       cout<<"billion"<<endl;
       seq=0;
       billNum++;
   }
   pthread_mutex_unlock(&m_mutex);
   //sleep();
}

intmain(intargc,char**argv)
{
   if(argc!=6)
   {
       cout<<"必须有5个参数任务执行次数任务池大小线程池大小预创建线程数输出间隔"<<endl;
       cout<<"eg:./test999999100001001020"<<endl;
       cout<<"上例代表创建一个间隔20个任务输出,任务池大小为10000,线程池大小为100,预创建10个线程,执行任务次数为:999999"<<endl;
       return0;
   }
   doubleloopSize=atof(argv[1]);
   inttaskSize=atoi(argv[2]);
   intthreadPoolSize=atoi(argv[3]);
   intpreSize=atoi(argv[4]);
   inter=atoi(argv[5]);

   pthread_mutex_init(&m_mutex,NULL);
   ThreadPoolManagermanager;
   if(0>manager.Init(taskSize, threadPoolSize,preSize))
   {
       cout<<"初始化失败"<<endl;
       return0;
   }
   cout<<"*******************初始化完成*********************"<<endl;
   structtimevaltime_beg,time_end;
   memset(&time_beg,0,sizeof(structtimeval));
   memset(&time_end,0,sizeof(structtimeval));
   gettimeofday(&time_beg,NULL);
   doublei=0;
   for(;i<loopSize;++i)
   {
       while(0>manager.Run(myFunc,NULL))
       {
           usleep(100);
       }
   }
   gettimeofday(&time_end,NULL);
   longtotal=(time_end.tv_sec-time_beg.tv_sec)*1000000+(time_end.tv_usec-time_beg.tv_usec);
   cout<<"totaltime="<<total<<endl;
   cout<<"totalnum="<<i <<"billionnum="<<billNum<<endl;
   cout<<__FILE__<<"将关闭所有线程"<<endl;
   //pthread_mutex_destroy(&m_mutex);
   return0;
}