利用ace的ACE_Task等类实现线程池的方法详解
2023-06-13 09:15:00 时间
本代码应该是ace自带的例子了,但是我觉得是非常好的,于是给大家分享一下。
注释非常详细啊。
头文件
复制代码代码如下:
注释非常详细啊。
#ifndefTHREAD_POOL_H
#defineTHREAD_POOL_H
/*Inordertoimplementathreadpool,wehavetohaveanobjectthat
cancreateathread. TheACE_Task<>isthebasisfordoingjust
suchathing. */
#include"ace/Task.h"
//addbyychen20070714below
#include"ace/Mutex.h"
//addbyychen20070714above
#if!defined(ACE_LACKS_PRAGMA_ONCE)
#pragmaonce
#endif/*ACE_LACKS_PRAGMA_ONCE*/
/*WeneedaforwardreferenceforACE_Event_Handlersothatour
enqueue()methodcanacceptapointertoone. */
classACE_Event_Handler;
/*Althoughwemodifiedtherestofourprogramtomakeuseofthe
threadpoolimplementation,ifyoulookcloselyyou"llseethatthe
changeswereratherminor. The"ACEway"isgenerallytocreatea
helperobjectthatabstractsawaythedetailsnotrelevanttoyour
application. That"swhatI"mtryingtodoherebycreatingthe
Thread_Poolobject. */
classThread_Pool:publicACE_Task<ACE_MT_SYNCH>
{
public:
typedefACE_Task<ACE_MT_SYNCH>inherited;
/*Provideanenumerationforthedefaultpoolsize. Bydoingthis,
otherobjectscanusethevaluewhentheywantadefault. */
enumsize_t
{
default_pool_size_=1
};
//Basicconstructor
Thread_Pool(void);
/*Openingthethreadpoolcausesoneormorethreadstobe
activated. Whenactivated,theyallexecutethesvc()method
declaredbelow. */
intopen(intpool_size=default_pool_size_);
/*Somecompilerswillcomplainthatouropen()aboveattemptsto
overrideavirtualfunctioninthebaseclass. Wehaveno
intentionofoverridingthatmethodbutinordertokeepthe
compilerquietwehavetoaddthismethodasapass-thrutothe
baseclassmethod. */
virtualintopen(void*void_data)
{
returninherited::open(void_data);
}
/*
*/
virtualintclose(u_longflags=0);
/*Tousethethreadpool,youhavetoputsomeunitofworkinto
it. Sincewe"redealingwitheventhandlers(oratleasttheir
derivatives),I"vechosentoprovideanenqueue()methodthat
takesapointertoanACE_Event_Handler. Thehandler"s
handle_input()methodwillbecalled,soyourobjecthastoknow
whenitisbeingcalledbythethreadpool. */
intenqueue(ACE_Event_Handler*handler);
/*AnotherhandyACEtemplateisACE_Atomic_Op<>. When
parameterized,thisallowsistohaveathread-safecounting
object. Thetypicalarithmeticoperatorsareallinternally
thread-safesothatyoucanshareitacrossthreadswithout
worryingaboutanycontentionissues. */
typedefACE_Atomic_Op<ACE_Mutex,int>counter_t;
protected:
/*Oursvc()methodwilldequeuetheenqueuedeventhandlerobjects
andinvokethehandle_input()methodoneach. Sincewe"relikely
runninginmorethanonethread,idlethreadscantakeworkfrom
thequeuewhileotherthreadsarebusyexecutinghandle_input()on
someobject. */
intsvc(void);
/*Weusetheatomicoptokeepacountofthenumberofthreadsin
whichoursvc()methodisrunning. Thisisparticularlyimportant
whenwewanttoclose()itdown! */
counter_tactive_threads_;
};
#endif/*THREAD_POOL_H*/
//thread_pool.cpp,v1.91999/09/2203:13:42jcejExp
#include"thread_pool.h"
/*Weneedthisheadersothatwecaninvokehandle_input()onthe
objectswedequeue. */
#include"ace/Event_Handler.h"
/*Allwedohereisinitializeouractivethreadcounter. */
Thread_Pool::Thread_Pool(void)
:active_threads_(0)
{
}
/*Ouropen()methodisathindisguisearoundtheACE_Task<>
activate()method. Byhidingactivate()inthisway,theusersof
Thread_Pooldon"thavetoworryaboutthethreadconfiguration
flags. */
int
Thread_Pool::open(intpool_size)
{
returnthis->activate(THR_NEW_LWP|THR_DETACHED,pool_size);
}
/*Closingthethreadpoolcanbeatrickyexercise. I"vedecidedto
takeaneasyapproachandsimplyenqueueasecretmessageforeach
threadwehaveactive. */
int
Thread_Pool::close(u_longflags)
{
ACE_UNUSED_ARG(flags);
/*Findouthowmanythreadsarecurrentlyactive*/
intcounter=active_threads_.value();
/*Foreachoneoftheactivethreads,enqueuea"null"event
handler. Below,we"llteachoursvc()methodthat"null"means
"shutdown". */
while(counter--)
this->enqueue(0);
/*Aseachsvc()methodexits,itwilldecrementtheactivethread
counter. Wejustwaithereforittoreachzero. Sincewedon"t
knowhowlongitwilltake,wesleepforaquarterofasecond
betweentries. */
while(active_threads_.value())
ACE_OS::sleep(ACE_Time_Value(0,250000));
return(0);
}
/*Whenanobjectwantstodoworkinthepool,itshouldcallthe
enqueue()method. WeintroducetheACE_Message_Blockherebut,
unfortunately,weseriouslymisuseit. */
int
Thread_Pool::enqueue(ACE_Event_Handler*handler)
{
/*AnACE_Message_Blockisachunkofdata. Youputthemintoan
ACE_Message_Queue. ACE_Task<>hasanACE_Message_Queuebuiltin.
Infact,theparametertoACE_Task<>ispasseddirectlyto
ACE_Message_Queue. Ifyoulookbackatourheaderfileyou"llsee
thatweusedACE_MT_SYNCHastheparameterindicatingthatwewant
MultiThreadSynchsafety. Thisallowsustosafelyput
ACE_Message_Blockobjectsintothemessagequeueinonethreadand
takethemoutinanother. */
/*AnACE_Message_Blockwantstohavechar*data. Wedon"thave
that. WecouldcastourACE_Event_Handler*directlytoachar*
butIwantedtobemoreexplicit. Sincecastingpointersaround
isadangerousthing,I"vegoneoutofmywayheretobevery
clearaboutwhatwe"redoing.
First:Castthehandlerpointertoavoidpointer. Youcan"tdo
anyusefulworkonavoidpointer,sothisisaclearmessagethat
we"remakingthepointerunusable.
Next:CastthevoidpointertoacharpointerthattheACE_Message_Blockwillaccept. */
void*v_data=(void*)handler;
char*c_data=(char*)v_data;
ACE_Message_Block*mb;
/*ConstructanewACE_Message_Block. Forefficiency,youmight
wanttopreallocateastackoftheseandreusethem. For
simplicity,I"lljustcreatewhatIneedasIneedit. */
ACE_NEW_RETURN(mb,
ACE_Message_Block(c_data),
-1);
/*Ourputq()methodisawrapperaroundoneoftheenqueuemethods
oftheACE_Message_Queuethatweown. Likeallgoodmethods,it
returns-1ifitfailsforsomereason. */
if(this->putq(mb)==-1)
{
/*AnothertraitoftheACE_Message_Blockobjectsisthatthey
arereferencecounted. Sincethey"redesignedtobepassed
aroundbetweenvariousobjectsinseveralthreadswecan"t
justdeletethemwheneverwefeellikeit. Therelease()
methodissimilartothedestroy()methodwe"veused
elsewhere. Itwatchesthereferencecountandwilldeletethe
objectwhenpossible. */
mb->release();
return-1;
}
return0;
}
/*The"guard"conceptisverypowerfulandusedthroughout
multi-threadedapplications. Aguardnormallydoessomeoperation
onanobjectatconstructionandthe"opposite"operationat
destruction. Forinstance,whenyouguardamutex(lock)object,
theguardwillacquirethelockonconstructionandreleaseiton
destruction. Inthisway,yourmethodcansimplylettheguardgo
outofscopeandknowthatthelockisreleased.
Guardsaren"tonlyusefulforlockshowever. Inthisapplication
I"vecreatedtwoguardobjectsforquiteadifferentpurpose. */
/*TheCounter_Guardisconstructedwithareferencetothethread
pool"sactivethreadcounter. Theguardincrementsthecounter
whenitiscreatedanddecrementsitatdestruction. Bycreating
oneoftheseinsvc(),Iknowthatthecounterwillbedecremented
nomatterhoworwheresvc()returns. */
classCounter_Guard
{
public:
Counter_Guard(Thread_Pool::counter_t&counter)
:counter_(counter)
{
++counter_;
}
~Counter_Guard(void)
{
--counter_;
}
protected:
Thread_Pool::counter_t&counter_;
};
/*MyMessage_Block_Guardisalsoalittlenon-traditional. It
doesn"tdoanythingintheconstructorbutit"sdestructorensures
thatthemessageblock"srelease()methodiscalled. Thisisa
cheapwaytopreventamemoryleakifIneedanadditionalexit
pointinsvc(). */
classMessage_Block_Guard
{
public:
Message_Block_Guard(ACE_Message_Block*&mb)
:mb_(mb)
{
}
~Message_Block_Guard(void)
{
mb_->release();
}
protected:
ACE_Message_Block*&mb_;
};
/*Nowwecometothesvc()method. AsIsaid,thisisbeingexecuted
ineachthreadoftheThread_Pool. Here,wepullmessagesoffof
ourbuilt-inACE_Message_Queueandcausethemtodowork. */
int
Thread_Pool::svc(void)
{
/*Thegetq()methodtakesareferencetoapointer. So...weneed
apointertogiveitareferenceto. */
ACE_Message_Block*mb;
/*Createtheguardforouractivethreadcounterobject. Nomatter
wherewechoosetoreturn()fromsvc(),wenowknowthatthe
counterwillbedecremented. */
Counter_Guardcounter_guard(active_threads_);
/*Getmessagesfromthequeueuntilwehaveafailure. There"sno
realgoodreasonforfailuresoifithappens,weleave
immediately. */
while(this->getq(mb)!=-1)
{
/*Asuccessfulgetq()willcause"mb"topointtoavalid
refernce-countedACE_Message_Block. Weuseourguardobject
heresothatwe"resuretocalltherelease()methodofthat
messageblockandreduceit"sreferencecount. Oncethecount
reacheszero,itwillbedeleted. */
Message_Block_Guardmessage_block_guard(mb);
/*Asnotedbefore,theACE_Message_Blockstoresit"sdataasa
char*. Wepullthatouthereandlaterturnitintoan
ACE_Event_Handler**/
char*c_data=mb->base();
/*We"vechosentousea"null"valueasanindicationtoleave.
Ifthedatawegotfromthequeueisnotnullthenwehave
someworktodo. */
if(c_data)
{
/*Onceagain,wegotogreatlengthstoemphasizethefact
thatwe"recastingpointersaroundinratherimpolite
ways. Wecouldhavecastthechar*directlytoan
ACE_Event_Handler*butthenfolksmightthinkthat"sanOK
thingtodo.
(Note:ThecorrectwaytouseanACE_Message_Blockisto
writedataintoit. WhatIshouldhavedonewascreatea
messageblockbigenoughtoholdaneventhandlerpointer
andthenwrittenthepointervalueintotheblock. When
wegothere,Iwouldhavetoreadthatdatabackintoa
pointer. Whilepoliticallycorrect,itisalsoalotof
work. Ifyou"recarefulyoucangetawaywithcasting
pointersaround.) */
void*v_data=(void*)c_data;
ACE_Event_Handler*handler=(ACE_Event_Handler*)v_data;
/*Nowthatwefinallyhaveaneventhandlerpointer,invoke
it"shandle_input()method. Sincewedon"tknowit"s
handle,wejustgiveitadefault. That"sOKbecausewe
knowthatwe"renotusingthehandleinthemethodanyway. */
if(handler->handle_input(ACE_INVALID_HANDLE)==-1)
{
/*Tellthehandlerthatit"stimetogohome. The
"normal"methodforshuttingdownahandlerwhose
handlerfailedistoinvokehandle_close(). Thiswill
takecareofcleaningitupforus. Noticehowweuse
thehandler"sget_handle()methodtopopulateit"s
"handle"parameter. Convenientisn"tit? */
handler->handle_close(handler->get_handle(),0);
/*Alsonoticethatwedon"texitthesvc()methodhere!
ThefirsttimeIdidthis,Iwasexiting. Afterafew
clientsdisconnectyouhaveanemptythreadpool.
Hardtodoanymoreworkafterthat... */
}
}
else
/*Ifwegethere,weweregivenamessageblockwith"null"
data. Thatisoursignaltoleave,sowereturn(0)to
leavegracefully. */
return0; //Ok,shutdownrequest
//message_block_guardgoesoutofscopehereandreleasesthe
//message_blockinstance.
}
return0;
}
其中,对其中类中的两个变量使用了管理的思想。Counter_Guard类和Message_Block_Guard类分别对其进行了管理。
因为ACE_Task类是使用了ACE_message_block进行对消息的封装。因此使用类,防止了内存的泄漏。
ACE_Event_Handler 是事件句柄,类似于操作符。当我们处理的时候,对其进行处理。
相关文章
- 线程join方法用处「建议收藏」
- ssh端口转发穿越多重跳板机的方法
- 怎么安装linux和win10双系统,在Win10下安装Linux双系统的方法
- 【说站】python requests响应内容的三种方法
- 【说站】python线程通信Condition提供的方法
- 【说站】python线程安全的介绍及解决方法
- 4. 线程代码创建(五种方法)
- 【Android 异步操作】线程池 ( 线程池简介 | 线程池初始化方法 | 线程池种类 | AsyncTask 使用线程池示例 )
- java多线程有几种实现方法线程之间如何同步详解编程语言
- Spring 中获取 request 的几种方法,及其线程安全性分析详解编程语言
- Maven项目mybatis Invalid bound statement (not found)解决方法详解编程语言
- Java通过wait()和notifyAll()方法实现线程间的通信详解编程语言
- Java 底层机制(JVM/堆/栈/方法区/GC/类加载)详解编程语言
- 线程安全与可重入编写方法详解编程语言
- Java非线程安全问题的解决方法
- Linux中终止线程的方法(linux终止线程)
- 让Linux系统启动时间加快的方法(linux系统启动时间)
- MySQL中删除用户权限的方法(删除mysql用户权限)
- 快速重命名文件:Linux批量重命名的简便方法(批量重命名linux)
- Linux下获取线程号的方法(linux获取当线程号)
- 性Oracle中实现线程安全性的方法(oracle线程安全)
- 掌握MySQL日志的查看方法(如何查看mysql 日志)
- 优化MySQL 查询性能 提升数据库操作速度的5种方法(mysql一般查询性能)
- java多线程编程之从线程返回数据的两种方法
- C#实现跨线程操作控件方法
- C#线程间不能调用剪切板的解决方法