Akka并发编程——第三节:Actor模型(二)
2023-09-14 09:00:24 时间
//context变量暴露当前Actor的上下文信息及当前消息
implicit val context : akka.actor.ActorContext = { /* compiled code */ }
//self作为当前ActorRef的引用
implicit final val self : akka.actor.ActorRef = { /* compiled code */ }
//当前Actor接收到最后一条消息对应的消息发送者(Actor)
final def sender() : akka.actor.ActorRef = { /* compiled code */ }
//receive方法,抽象方法,定义Actor的行为逻辑
def receive : akka.actor.Actor.Receive
//内部使用API
protected[akka] def aroundReceive(receive : akka.actor.Actor.Receive, msg : scala.Any) : scala.Unit = { /* compiled code */ }
protected[akka] def aroundPreStart() : scala.Unit = { /* compiled code */ }
protected[akka] def aroundPostStop() : scala.Unit = { /* compiled code */ }
protected[akka] def aroundPreRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ }
protected[akka] def aroundPostRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ }
//监督策略,用于Actor容错处理 def supervisorStrategy : akka.actor.SupervisorStrategy = { /* compiled code */ } //Hook方法,用于Actor生命周期监控 @scala.throws[T](classOf[scala.Exception]) def preStart() : scala.Unit = { /* compiled code */ } @scala.throws[T](classOf[scala.Exception]) def postStop() : scala.Unit = { /* compiled code */ } @scala.throws[T](classOf[scala.Exception]) def preRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ } @scala.throws[T](classOf[scala.Exception]) def postRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ }
//发送给Actor的消息,Actor没有定义相应的处理逻辑时,会调用此方法 def unhandled(message : scala.Any) : scala.Unit = { /* compiled code */ } object Actor extends scala.AnyRef { type Receive = scala.PartialFunction[scala.Any, scala.Unit] //空的行为逻辑 @scala.SerialVersionUID(1) object emptyBehavior extends scala.AnyRef with akka.actor.Actor.Receive { def isDefinedAt(x : scala.Any) : scala.Boolean = { /* compiled code */ } def apply(x : scala.Any) : scala.Nothing = { /* compiled code */ } //Sender为null @scala.SerialVersionUID(1) final val noSender : akka.actor.ActorRef = { /* compiled code */ }
//Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作 override def preStart(): Unit ={ log.info("preStart() in FirstActor") //通过context上下文创建Actor child = context.actorOf(Props[MyActor], name = "myChild") def receive = { //向MyActor发送消息 case x = child ! x;log.info("received "+x) //Hook方法,postStop(),Actor停止之后调用 override def postStop(): Unit = { log.info("postStop() in FirstActor")
class MyActor extends Actor with ActorLogging{ //Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作 override def preStart(): Unit ={ log.info("preStart() in MyActor") def receive = { case "test" = log.info("received test") case _ = log.info("received unknown message") //Hook方法,postStop(),Actor停止之后调用 override def postStop(): Unit = { log.info("postStop() in MyActor") val system = ActorSystem("MyActorSystem") val systemLog=system.log //创建FirstActor对象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor") systemLog.info("准备向myactor发送消息") //向myactor发送消息 myactor!"test" myactor! 123 Thread.sleep(5000) //关闭ActorSystem,停止程序的运行 system.shutdown() }
//通过context上下文创建Actor child = context.actorOf(Props[MyActor], name = "myActor") def receive = { //向MyActor发送消息 case x = child ! x;log.info("received "+x)
def receive = { case "test" = log.info("received test");sender()!"message from MyActor" case "message from self reference"= log.info("message from self refrence") case _ = log.info("received unknown message"); val system = ActorSystem("MyActorSystem") val systemLog=system.log //创建FirstActor对象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor") systemLog.info("准备向myactor发送消息") //向myactor发送消息 myactor!"test" myactor! 123 Thread.sleep(5000) //关闭ActorSystem,停止程序的运行 system.shutdown() }
def receive = { case "test" = log.info("received test");sender()!"message from MyActor" case "message from self reference"= log.info("message from self refrence") case _ = log.info("received unknown message"); }
//创建FirstActor对象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor") systemLog.info("准备向myactor发送消息") //向myactor发送消息 myactor!"test" myactor! 123 Thread.sleep(5000) //关闭ActorSystem,停止程序的运行 system.shutdown() }
override def unhandled(message: Any): Unit = { log.info("unhandled message is {}",message)
//创建FirstActor对象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor") systemLog.info("准备向myactor发送消息") //向myactor发送消息 myactor!"test" myactor! 123 Thread.sleep(5000) //关闭ActorSystem,停止程序的运行 system.shutdown() }
Actor并发编程模型浅析 Actor 模型其实就是定义一组规则,这些规则规定了一组系统中各个模块如何交互及回应。在一个 Actor 系统中,Actor 是最小的单元模块,系统由多个 Actor 组成。每个 Actor 有两个东西,一个是 mailbox,一个是自身状态。
akka设计模式系列-消息模型 通过前面的文章我们总结了几个常见的actor设计模式,但此处不得不提前介绍一下在Akka中消息的设计模式。随着对Akka的使用,我们会发现,使用Akka设计系统其实就是面向消息编程。actor之间消息设计的是否合理,往往意味着Akka应用设计的是否合理。
akka设计模式系列(Actor模型) 谈到Akka就必须介绍Actor并发模型,而谈到Actor就必须看一篇叫做《A Universal Modular Actor Formalism for Artificial Intelligence 》的论文,它最早发表于1973年,提出了一种并发计算的理论模型,Actor就源于该模型。
Akka中的Typed Actor是Active Objects设计模式的实现,Active Objects模式将方法的执行和方法的调用进行解耦合,从而为程序引入并发性。Typed Actor由公用的接口和对应实现两部分构成,其后面深层次
//监督策略,用于Actor容错处理 def supervisorStrategy : akka.actor.SupervisorStrategy = { /* compiled code */ } //Hook方法,用于Actor生命周期监控 @scala.throws[T](classOf[scala.Exception]) def preStart() : scala.Unit = { /* compiled code */ } @scala.throws[T](classOf[scala.Exception]) def postStop() : scala.Unit = { /* compiled code */ } @scala.throws[T](classOf[scala.Exception]) def preRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ } @scala.throws[T](classOf[scala.Exception]) def postRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ }
//发送给Actor的消息,Actor没有定义相应的处理逻辑时,会调用此方法 def unhandled(message : scala.Any) : scala.Unit = { /* compiled code */ } object Actor extends scala.AnyRef { type Receive = scala.PartialFunction[scala.Any, scala.Unit] //空的行为逻辑 @scala.SerialVersionUID(1) object emptyBehavior extends scala.AnyRef with akka.actor.Actor.Receive { def isDefinedAt(x : scala.Any) : scala.Boolean = { /* compiled code */ } def apply(x : scala.Any) : scala.Nothing = { /* compiled code */ } //Sender为null @scala.SerialVersionUID(1) final val noSender : akka.actor.ActorRef = { /* compiled code */ }
//Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作 override def preStart(): Unit ={ log.info("preStart() in FirstActor") //通过context上下文创建Actor child = context.actorOf(Props[MyActor], name = "myChild") def receive = { //向MyActor发送消息 case x = child ! x;log.info("received "+x) //Hook方法,postStop(),Actor停止之后调用 override def postStop(): Unit = { log.info("postStop() in FirstActor")
class MyActor extends Actor with ActorLogging{ //Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作 override def preStart(): Unit ={ log.info("preStart() in MyActor") def receive = { case "test" = log.info("received test") case _ = log.info("received unknown message") //Hook方法,postStop(),Actor停止之后调用 override def postStop(): Unit = { log.info("postStop() in MyActor") val system = ActorSystem("MyActorSystem") val systemLog=system.log //创建FirstActor对象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor") systemLog.info("准备向myactor发送消息") //向myactor发送消息 myactor!"test" myactor! 123 Thread.sleep(5000) //关闭ActorSystem,停止程序的运行 system.shutdown() }
代码运行结果:
[INFO] [04/02/2016 17:04:49.607] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息 [INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor [INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test [INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received 123 [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] preStart() in MyActor [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] received test [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] received unknown message [INFO] [04/02/2016 17:04:54.616] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myChild] postStop() in MyActor [INFO] [04/02/2016 17:04:54.617] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] postStop() in FirstActor
在代码
class FirstActor extends Actor with ActorLogging{ //通过context.actorOf方法创建Actor var child:ActorRef = _ //Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作 override def preStart(): Unit ={ log.info("preStart() in FirstActor") //通过context上下文创建Actor child = context.actorOf(Props[MyActor], name = "myChild") def receive = { //向MyActor发送消息 case x = child ! x;log.info("received "+x) //Hook方法,postStop(),Actor停止之后调用,用于完成初始化工作 override def postStop(): Unit = { log.info("postStop() in FirstActor") }
中分别对postStop、preStart方法进行了重写,在preStart方法中通过代码
//创建FirstActor对象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
创建FirstActor时,便会调用preStart方法完成MyActor的创建,因此首先会执行FirstActor中的preStart()方法
dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
然后在创建MyActor时执行MyActor中定义的preStart方法
[INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] preStart() in MyActor
在执行
FirstActor作为MyActor的Supervisor,会先停止MyActor,再停止自身,因此先调用MyActor的postStop方法,再调用FirstActor的postStop方法。
//通过context上下文创建Actor child = context.actorOf(Props[MyActor], name = "myActor") def receive = { //向MyActor发送消息 case x = child ! x;log.info("received "+x)
def receive = { case "test" = log.info("received test");sender()!"message from MyActor" case "message from self reference"= log.info("message from self refrence") case _ = log.info("received unknown message"); val system = ActorSystem("MyActorSystem") val systemLog=system.log //创建FirstActor对象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor") systemLog.info("准备向myactor发送消息") //向myactor发送消息 myactor!"test" myactor! 123 Thread.sleep(5000) //关闭ActorSystem,停止程序的运行 system.shutdown() }
运行结果:
[INFO] [04/02/2016 18:40:37.805] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息 [INFO] [04/02/2016 18:40:37.805] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received 123 [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received test [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] message from self refrence [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received message from MyActor [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
代码:
def receive = { case "test" = log.info("received test");sender()!"message from MyActor" case "message from self reference"= log.info("message from self refrence") case _ = log.info("received unknown message"); }
中使用
def receive = { case "test" = log.info("received test");sender()!"message from MyActor"
“test”消息时,会调用
处理消息时又向MyActor回送该消息,因此最终的输出有两个unknown message,分别对应123和”message from MyActor”
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
unhandled方法用于处理没有被receive方法处理的消息,下面的代码给出的是当不重写unhandled方法时的代码
//创建FirstActor对象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor") systemLog.info("准备向myactor发送消息") //向myactor发送消息 myactor!"test" myactor! 123 Thread.sleep(5000) //关闭ActorSystem,停止程序的运行 system.shutdown() }
代码输出:
[INFO] [04/02/2016 19:14:11.992] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息 [INFO] [04/02/2016 19:14:11.992] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test
不难看出,对于
发送的这条消息没有被处理,没有任何的处理逻辑。在实际开发过程中,可能会对不能被处理的消息增加一些应对逻辑,此时可以重写unhandled方法,代码如下:
override def unhandled(message: Any): Unit = { log.info("unhandled message is {}",message)
//创建FirstActor对象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor") systemLog.info("准备向myactor发送消息") //向myactor发送消息 myactor!"test" myactor! 123 Thread.sleep(5000) //关闭ActorSystem,停止程序的运行 system.shutdown() }
代码输出结果:
[INFO] [04/02/2016 19:17:18.458] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息 [INFO] [04/02/2016 19:17:18.458] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test [INFO] [04/02/2016 19:17:18.458] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] unhandled message is 123
其它如preRestart等方法的使用将在Akka容错部分进行讲解。
Scala学习(公众微信号:ScalaLearning)每天为大家带来一点Scala语言、Spark、Kafka、Flink、AKKA等大数据技术干货及相关技术资讯。技术永无止境,勇攀高峰,一往直前!
觉得文章不错?扫描关注
Actor并发编程模型浅析 Actor 模型其实就是定义一组规则,这些规则规定了一组系统中各个模块如何交互及回应。在一个 Actor 系统中,Actor 是最小的单元模块,系统由多个 Actor 组成。每个 Actor 有两个东西,一个是 mailbox,一个是自身状态。
akka设计模式系列-消息模型 通过前面的文章我们总结了几个常见的actor设计模式,但此处不得不提前介绍一下在Akka中消息的设计模式。随着对Akka的使用,我们会发现,使用Akka设计系统其实就是面向消息编程。actor之间消息设计的是否合理,往往意味着Akka应用设计的是否合理。
akka设计模式系列(Actor模型) 谈到Akka就必须介绍Actor并发模型,而谈到Actor就必须看一篇叫做《A Universal Modular Actor Formalism for Artificial Intelligence 》的论文,它最早发表于1973年,提出了一种并发计算的理论模型,Actor就源于该模型。
Akka中的Typed Actor是Active Objects设计模式的实现,Active Objects模式将方法的执行和方法的调用进行解耦合,从而为程序引入并发性。Typed Actor由公用的接口和对应实现两部分构成,其后面深层次
相关文章
- 并发编程篇:java 高并发面试题
- 同步类容器和并发类容器的区别_jdk提供的用于并发编程的同步器有
- JUC并发编程01——谈谈锁机制:轻量级锁、重量级锁、偏向锁、锁消除与锁优化
- Jenkins添加html报告并发送到邮箱
- 高并发图数据库系统如何实现?
- java并发编程(七)
- 【Java并发编程】- 02 线程池总结
- Go语言——并发编程
- Go-并发编程-声明和使用 channel
- Go-并发编程-无缓冲和有缓冲 channel 的区别(一)
- Go-并发编程-无缓冲和有缓冲 channel 的区别(二)
- [javaSE] 并发编程(线程间通信)详解编程语言
- 一些常见的并发编程错误
- Nginx搭建高可用,高并发的WCF集群教程
- Redis并发操作如何保证数据安全?(redis并发安全)
- 解决Oracle并发性能问题的归档模式设置(oracle归档模式设置)
- 性能MySQL 1万并发性能提升:实现服务的卓越性能(mysql1万并发)
- Redis瞬间并发失效面临严峻考验(瞬时并发redis失效)
- 抽奖大赛用Redis实现并发锁保障公平公正(抽奖redis并发锁)
- Redis实现高效的金额计算处理(redis高并发金额计算)
- Redis系统支持高并发,满足点赞需求(redis高并发点赞)
- Redis集群实现并发限制(redis集群并发限制)
- Redis解决超高并发时的防重时间差(redis防重时间差)
- Go语言并发模型的2种编程方案
- Python中的并发编程实例