zl程序教程

您现在的位置是:首页 >  后端

当前栏目

利用ace的ACE_Task等类实现线程池的方法详解

方法线程 实现 详解 利用 Task ACE
2023-06-13 09:15:00 时间
本代码应该是ace自带的例子了,但是我觉得是非常好的,于是给大家分享一下。
注释非常详细啊。
头文件
复制代码代码如下:

#ifndefTHREAD_POOL_H
#defineTHREAD_POOL_H
/*Inordertoimplementathreadpool,wehavetohaveanobjectthat
  cancreateathread. TheACE_Task<>isthebasisfordoingjust
  suchathing. */
#include"ace/Task.h"
//addbyychen20070714below
#include"ace/Mutex.h"
//addbyychen20070714above
#if!defined(ACE_LACKS_PRAGMA_ONCE)
#pragmaonce
#endif/*ACE_LACKS_PRAGMA_ONCE*/
/*WeneedaforwardreferenceforACE_Event_Handlersothatour
  enqueue()methodcanacceptapointertoone. */

classACE_Event_Handler;
/*Althoughwemodifiedtherestofourprogramtomakeuseofthe
  threadpoolimplementation,ifyoulookcloselyyou"llseethatthe
  changeswereratherminor. The"ACEway"isgenerallytocreatea
  helperobjectthatabstractsawaythedetailsnotrelevanttoyour
  application. That"swhatI"mtryingtodoherebycreatingthe
  Thread_Poolobject. */
classThread_Pool:publicACE_Task<ACE_MT_SYNCH>
{
public:
 typedefACE_Task<ACE_MT_SYNCH>inherited;
 /*Provideanenumerationforthedefaultpoolsize. Bydoingthis,
   otherobjectscanusethevaluewhentheywantadefault. */
 enumsize_t
 {
   default_pool_size_=1
 };
 //Basicconstructor
 Thread_Pool(void);
 /*Openingthethreadpoolcausesoneormorethreadstobe
   activated. Whenactivated,theyallexecutethesvc()method
   declaredbelow. */
 intopen(intpool_size=default_pool_size_);
 /*Somecompilerswillcomplainthatouropen()aboveattemptsto
   overrideavirtualfunctioninthebaseclass. Wehaveno
   intentionofoverridingthatmethodbutinordertokeepthe
   compilerquietwehavetoaddthismethodasapass-thrutothe
   baseclassmethod. */
 virtualintopen(void*void_data)
 {
   returninherited::open(void_data);
 }
 /*
  */
 virtualintclose(u_longflags=0);
 /*Tousethethreadpool,youhavetoputsomeunitofworkinto
   it. Sincewe"redealingwitheventhandlers(oratleasttheir
   derivatives),I"vechosentoprovideanenqueue()methodthat
   takesapointertoanACE_Event_Handler. Thehandler"s
   handle_input()methodwillbecalled,soyourobjecthastoknow
   whenitisbeingcalledbythethreadpool. */
 intenqueue(ACE_Event_Handler*handler);
 /*AnotherhandyACEtemplateisACE_Atomic_Op<>. When
   parameterized,thisallowsistohaveathread-safecounting
   object. Thetypicalarithmeticoperatorsareallinternally
   thread-safesothatyoucanshareitacrossthreadswithout
   worryingaboutanycontentionissues. */
   typedefACE_Atomic_Op<ACE_Mutex,int>counter_t;

protected:
 /*Oursvc()methodwilldequeuetheenqueuedeventhandlerobjects
   andinvokethehandle_input()methodoneach. Sincewe"relikely
   runninginmorethanonethread,idlethreadscantakeworkfrom
   thequeuewhileotherthreadsarebusyexecutinghandle_input()on
   someobject. */
 intsvc(void);
 /*Weusetheatomicoptokeepacountofthenumberofthreadsin
   whichoursvc()methodisrunning. Thisisparticularlyimportant
   whenwewanttoclose()itdown! */
 counter_tactive_threads_;
};
#endif/*THREAD_POOL_H*/

实现文件
复制代码代码如下:

//thread_pool.cpp,v1.91999/09/2203:13:42jcejExp
#include"thread_pool.h"
/*Weneedthisheadersothatwecaninvokehandle_input()onthe
  objectswedequeue. */
#include"ace/Event_Handler.h"
/*Allwedohereisinitializeouractivethreadcounter. */
Thread_Pool::Thread_Pool(void)
 :active_threads_(0)
{
}
/*Ouropen()methodisathindisguisearoundtheACE_Task<>
  activate()method. Byhidingactivate()inthisway,theusersof
  Thread_Pooldon"thavetoworryaboutthethreadconfiguration
  flags. */
int
Thread_Pool::open(intpool_size)
{
 returnthis->activate(THR_NEW_LWP|THR_DETACHED,pool_size);
}
/*Closingthethreadpoolcanbeatrickyexercise. I"vedecidedto
  takeaneasyapproachandsimplyenqueueasecretmessageforeach
  threadwehaveactive. */
int
Thread_Pool::close(u_longflags)
{
 ACE_UNUSED_ARG(flags);
 /*Findouthowmanythreadsarecurrentlyactive*/
 intcounter=active_threads_.value();
 /*Foreachoneoftheactivethreads,enqueuea"null"event
   handler. Below,we"llteachoursvc()methodthat"null"means
   "shutdown". */
 while(counter--)
   this->enqueue(0);
 /*Aseachsvc()methodexits,itwilldecrementtheactivethread
   counter. Wejustwaithereforittoreachzero. Sincewedon"t
   knowhowlongitwilltake,wesleepforaquarterofasecond
   betweentries. */
 while(active_threads_.value())
   ACE_OS::sleep(ACE_Time_Value(0,250000));
 return(0);
}
/*Whenanobjectwantstodoworkinthepool,itshouldcallthe
  enqueue()method. WeintroducetheACE_Message_Blockherebut,
  unfortunately,weseriouslymisuseit. */
int
Thread_Pool::enqueue(ACE_Event_Handler*handler)
{
 /*AnACE_Message_Blockisachunkofdata. Youputthemintoan
   ACE_Message_Queue. ACE_Task<>hasanACE_Message_Queuebuiltin.
   Infact,theparametertoACE_Task<>ispasseddirectlyto
   ACE_Message_Queue. Ifyoulookbackatourheaderfileyou"llsee
   thatweusedACE_MT_SYNCHastheparameterindicatingthatwewant
   MultiThreadSynchsafety. Thisallowsustosafelyput
   ACE_Message_Blockobjectsintothemessagequeueinonethreadand
   takethemoutinanother. */
 /*AnACE_Message_Blockwantstohavechar*data. Wedon"thave
   that. WecouldcastourACE_Event_Handler*directlytoachar*
   butIwantedtobemoreexplicit. Sincecastingpointersaround
   isadangerousthing,I"vegoneoutofmywayheretobevery
   clearaboutwhatwe"redoing.
   First:Castthehandlerpointertoavoidpointer. Youcan"tdo
   anyusefulworkonavoidpointer,sothisisaclearmessagethat
   we"remakingthepointerunusable.
   Next:CastthevoidpointertoacharpointerthattheACE_Message_Blockwillaccept. */
 void*v_data=(void*)handler;
 char*c_data=(char*)v_data;
 ACE_Message_Block*mb;
 /*ConstructanewACE_Message_Block. Forefficiency,youmight
   wanttopreallocateastackoftheseandreusethem. For
   simplicity,I"lljustcreatewhatIneedasIneedit. */
 ACE_NEW_RETURN(mb,
                 ACE_Message_Block(c_data),
                 -1);
 /*Ourputq()methodisawrapperaroundoneoftheenqueuemethods
   oftheACE_Message_Queuethatweown. Likeallgoodmethods,it
   returns-1ifitfailsforsomereason. */
 if(this->putq(mb)==-1)
   {
     /*AnothertraitoftheACE_Message_Blockobjectsisthatthey
       arereferencecounted. Sincethey"redesignedtobepassed
       aroundbetweenvariousobjectsinseveralthreadswecan"t
       justdeletethemwheneverwefeellikeit. Therelease()
       methodissimilartothedestroy()methodwe"veused
       elsewhere. Itwatchesthereferencecountandwilldeletethe
       objectwhenpossible. */
     mb->release();
     return-1;
   }
 return0;
}
/*The"guard"conceptisverypowerfulandusedthroughout
  multi-threadedapplications. Aguardnormallydoessomeoperation
  onanobjectatconstructionandthe"opposite"operationat
  destruction. Forinstance,whenyouguardamutex(lock)object,
  theguardwillacquirethelockonconstructionandreleaseiton
  destruction. Inthisway,yourmethodcansimplylettheguardgo
  outofscopeandknowthatthelockisreleased.
  Guardsaren"tonlyusefulforlockshowever. Inthisapplication
  I"vecreatedtwoguardobjectsforquiteadifferentpurpose. */
/*TheCounter_Guardisconstructedwithareferencetothethread
  pool"sactivethreadcounter. Theguardincrementsthecounter
  whenitiscreatedanddecrementsitatdestruction. Bycreating
  oneoftheseinsvc(),Iknowthatthecounterwillbedecremented
  nomatterhoworwheresvc()returns. */
classCounter_Guard
{
public:
 Counter_Guard(Thread_Pool::counter_t&counter)
   :counter_(counter)
 {
   ++counter_;
 }
 ~Counter_Guard(void)
 {
   --counter_;
 }
protected:
 Thread_Pool::counter_t&counter_;
};
/*MyMessage_Block_Guardisalsoalittlenon-traditional. It
  doesn"tdoanythingintheconstructorbutit"sdestructorensures
  thatthemessageblock"srelease()methodiscalled. Thisisa
  cheapwaytopreventamemoryleakifIneedanadditionalexit
  pointinsvc(). */
classMessage_Block_Guard
{
public:
 Message_Block_Guard(ACE_Message_Block*&mb)
   :mb_(mb)
 {
 }
 ~Message_Block_Guard(void)
 {
   mb_->release();
 }
protected:
 ACE_Message_Block*&mb_;
};
/*Nowwecometothesvc()method. AsIsaid,thisisbeingexecuted
  ineachthreadoftheThread_Pool. Here,wepullmessagesoffof
  ourbuilt-inACE_Message_Queueandcausethemtodowork. */
int
Thread_Pool::svc(void)
{
 /*Thegetq()methodtakesareferencetoapointer. So...weneed
   apointertogiveitareferenceto. */
 ACE_Message_Block*mb;
 /*Createtheguardforouractivethreadcounterobject. Nomatter
   wherewechoosetoreturn()fromsvc(),wenowknowthatthe
   counterwillbedecremented. */
 Counter_Guardcounter_guard(active_threads_);
 /*Getmessagesfromthequeueuntilwehaveafailure. There"sno
   realgoodreasonforfailuresoifithappens,weleave
   immediately. */
 while(this->getq(mb)!=-1)
   {
     /*Asuccessfulgetq()willcause"mb"topointtoavalid
       refernce-countedACE_Message_Block. Weuseourguardobject
       heresothatwe"resuretocalltherelease()methodofthat
       messageblockandreduceit"sreferencecount. Oncethecount
       reacheszero,itwillbedeleted. */
     Message_Block_Guardmessage_block_guard(mb);
     /*Asnotedbefore,theACE_Message_Blockstoresit"sdataasa
       char*. Wepullthatouthereandlaterturnitintoan
       ACE_Event_Handler**/
     char*c_data=mb->base();
     /*We"vechosentousea"null"valueasanindicationtoleave.
       Ifthedatawegotfromthequeueisnotnullthenwehave
       someworktodo. */
     if(c_data)
       {
         /*Onceagain,wegotogreatlengthstoemphasizethefact
           thatwe"recastingpointersaroundinratherimpolite
           ways. Wecouldhavecastthechar*directlytoan
           ACE_Event_Handler*butthenfolksmightthinkthat"sanOK
           thingtodo.
           (Note:ThecorrectwaytouseanACE_Message_Blockisto
           writedataintoit. WhatIshouldhavedonewascreatea
           messageblockbigenoughtoholdaneventhandlerpointer
           andthenwrittenthepointervalueintotheblock. When
           wegothere,Iwouldhavetoreadthatdatabackintoa
           pointer. Whilepoliticallycorrect,itisalsoalotof
           work. Ifyou"recarefulyoucangetawaywithcasting
           pointersaround.) */
         void*v_data=(void*)c_data;
         ACE_Event_Handler*handler=(ACE_Event_Handler*)v_data;
         /*Nowthatwefinallyhaveaneventhandlerpointer,invoke
           it"shandle_input()method. Sincewedon"tknowit"s
           handle,wejustgiveitadefault. That"sOKbecausewe
           knowthatwe"renotusingthehandleinthemethodanyway. */
         if(handler->handle_input(ACE_INVALID_HANDLE)==-1)
           {
             /*Tellthehandlerthatit"stimetogohome. The
               "normal"methodforshuttingdownahandlerwhose
               handlerfailedistoinvokehandle_close(). Thiswill
               takecareofcleaningitupforus. Noticehowweuse
               thehandler"sget_handle()methodtopopulateit"s
               "handle"parameter. Convenientisn"tit? */
             handler->handle_close(handler->get_handle(),0);
             /*Alsonoticethatwedon"texitthesvc()methodhere!
               ThefirsttimeIdidthis,Iwasexiting. Afterafew
               clientsdisconnectyouhaveanemptythreadpool.
               Hardtodoanymoreworkafterthat... */
           }
       }
     else
       /*Ifwegethere,weweregivenamessageblockwith"null"
          data. Thatisoursignaltoleave,sowereturn(0)to
          leavegracefully. */
         return0;  //Ok,shutdownrequest
     //message_block_guardgoesoutofscopehereandreleasesthe
     //message_blockinstance.
   }
 return0;
}

其中,对其中类中的两个变量使用了管理的思想。Counter_Guard类和Message_Block_Guard类分别对其进行了管理。
因为ACE_Task类是使用了ACE_message_block进行对消息的封装。因此使用类,防止了内存的泄漏。
ACE_Event_Handler 是事件句柄,类似于操作符。当我们处理的时候,对其进行处理。