zl程序教程

您现在的位置是:首页 >  其他

当前栏目

ScalaPB(1): using protobuf in akka详解编程语言

编程语言 详解 in Using Akka Protobuf ScalaPB
2023-06-13 09:20:37 时间

    任何类型的实例作为消息在两端独立系统的机器之间进行传递时必须经过序列化/反序列化serialize/deserialize处理过程。假设以下场景:在一个网络里有两台连接的服务器,它们分别部署了独立的akka系统。如果我们需要在这两台服务器的akka系统之间进行消息交换的话,所有消息都必须经过序列化/反序列化处理。akka系统对于用户自定义消息类型的默认序列化处理是以java-object serialization 方式进行的。我们上次提过:由于java-object-serialization会把一个java-object的类型信息、实例值、它所包含的其它类型描述信息等都写入序列化的结果里,所以会占据较大空间,传输数据的效率相对就低了。protobuf是binary格式的,基本只包括实例值,所以数据传输效率较高。下面我们就介绍如何在akka系统中使用protobuf序列化。在akka中使用自定义序列化方法包括下面的这些步骤:

1、在.proto文件中对消息类型进行IDL定义

2、用ScalaPB编译IDL文件并产生scala源代码。这些源代码中包括了涉及的消息类型及它们的操作方法

3、在akka程序模块中import产生的classes,然后直接调用这些类型和方法

4、按akka要求编写序列化方法

5、在akka的.conf文件里actor.serializers段落中定义akka的默认serializer

下面的build.sbt文件里描述了程序结构:

lazy val commonSettings = Seq( 

 name := "AkkaProtobufDemo", 

 version := "1.0", 

 scalaVersion := "2.12.6", 

lazy val local = (project in file(".")) 

 .settings(commonSettings) 

 .settings( 

 libraryDependencies ++= Seq( 

 "com.typesafe.akka" %% "akka-remote" % "2.5.11", 

 "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf" 

 name := "akka-protobuf-demo" 

lazy val remote = (project in file("remote")) 

 .settings(commonSettings) 

 .settings( 

 libraryDependencies ++= Seq( 

 "com.typesafe.akka" %% "akka-remote" % "2.5.11" 

 name := "remote-system" 

 ).dependsOn(local) 

PB.targets in Compile := Seq( 

 scalapb.gen() - (sourceManaged in Compile).value 

local和remote是两个分开的项目。我们会在这两个项目里分别部署akka系统。注意依赖项中的scalapb.runtime。PB.targets指明了产生源代码的路径。我们还需要在project/scalapb.sbt中指定scalaPB插件: 

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") 

libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

我们首先在.proto文件里定义消息:

syntax = "proto3"; 

// Brought in from scalapb-runtime 

import "scalapb/scalapb.proto"; 

import "google/protobuf/wrappers.proto"; 

package learn.proto; 

message Added { 

 int32 nbr1 = 1; 

 int32 nbr2 = 2; 

message Subtracted { 

 int32 nbr1 = 1; 

 int32 nbr2 = 2; 

message AddedResult { 

 int32 nbr1 = 1; 

 int32 nbr2 = 2; 

 int32 result = 3; 

message SubtractedResult { 

 int32 nbr1 = 1; 

 int32 nbr2 = 2; 

 int32 result = 3; 

}

现在我们先在remote项目里定义一个Calculator actor:

package akka.protobuf.calculator 

import akka.actor._ 

import com.typesafe.config.ConfigFactory 

import learn.proto.messages._ 

class Calculator extends Actor with ActorLogging { 


val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2552") .withFallback(ConfigFactory.load()) val calcSystem = ActorSystem("calcSystem",config) calcSystem.actorOf(Calculator.props,"calculator") println("press any key to end program ...") scala.io.StdIn.readLine() calcSystem.terminate() }

运行CalculatorStarter产生一个calculator actor:  akka.tcp:[email protected]:2552/user/calculator

下面我们在local项目里从端口2551上部署另一个akka系统,然后调用端口2552上部署akka系统的calculator actor:

package akka.protobuf.calcservice 

import akka.actor._ 

import learn.proto.messages._ 

import scala.concurrent.duration._ 

class CalcRunner(path: String) extends Actor with ActorLogging { 

 sendIdentifyRequest() 

 def sendIdentifyRequest(): Unit = { 

 context.actorSelection(path) ! Identify(path) 

 import context.dispatcher 

 context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout) 

 def receive = identifying 

 def identifying : Receive = { 

 case ActorIdentity(calcPath,Some(calcRef)) if (path.equals(calcPath)) = 

 log.info("Remote calculator started!") 

 context.watch(calcRef) 

 context.become(calculating(calcRef)) 

 case ActorIdentity(_,None) = 

 log.info("Remote calculator not found!") 

 case ReceiveTimeout = 

 sendIdentifyRequest() 

 case s @ _ = 

 log.info(s"Remote calculator not ready. [$s]") 

 def calculating(calculator: ActorRef) : Receive = { 

 case (op : Added) = calculator ! op 

 case (op : Subtracted) = calculator ! op 

 case AddedResult(a,b,r) = 

 log.info(s"$a + $b = $r") 

 case SubtractedResult(a,b,r) = 

 log.info(s"$a - $b = $r") 

 case Terminated(calculator) = 

 log.info("Remote calculator terminated, restarting ...") 

 sendIdentifyRequest() 

 context.become(identifying) 

 case ReceiveTimeout = //nothing 

object CalcRunner { 

 def props(path: String) = Props(new CalcRunner(path)) 

}

这个CalcRunner是一个actor,在程序里首先通过向remote项目中的calculator-actor传送Identify消息以取得具体的ActorRef。然后用这个ActorRef与calculator-actor进行交互。这其中Identify是akka预定消息类型,其它消息都是ScalaPB从.proto文件中产生的。下面是local项目的运算程序:

 

package akka.protobuf.demo 

import akka.actor._ 

import akka.util.Timeout 

import com.typesafe.config.ConfigFactory 

import akka.protobuf.calcservice._ 

import scala.concurrent.duration._ 

import scala.util._ 

import learn.proto.messages._ 

object Main extends App { 

 val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2551") 

 .withFallback(ConfigFactory.load()) 

 val calcSystem = ActorSystem("calcSystem",config) 

 val calcPath = "akka.tcp:[email protected]:2552/user/calculator" 

 val calculator = calcSystem.actorOf(CalcRunner.props(calcPath),"calcRunner") 

 println("Calculator started ...") 

 import calcSystem.dispatcher 

 calcSystem.scheduler.schedule(1.second, 1.second) { 

 if (Random.nextInt(100) % 2 == 0) 

 calculator ! Added(Random.nextInt(100), Random.nextInt(100)) 

 else 

 calculator ! Subtracted(Random.nextInt(100), Random.nextInt(100)) 


[akka.serialization.Serialization(akka://calcSystem)] Using the default Java serializer for class [learn.proto.messages.Added] which is not recommended because of performance implications. Use another serializer 

下面是protobuf类型的序列化方法:

package akka.protobuf.serializer 

import akka.serialization.SerializerWithStringManifest 

import learn.proto.messages._ 


override def manifest(o: AnyRef): String = o.getClass.getName final val AddedManifest = classOf[Added].getName final val SubtractedManifest = classOf[Subtracted].getName final val AddedResultManifest = classOf[AddedResult].getName final val SubtractedResultManifest = classOf[SubtractedResult].getName
case AddedManifest = Added.parseFrom(bytes) case SubtractedManifest = Subtracted.parseFrom(bytes) case AddedResultManifest = AddedResult.parseFrom(bytes) case SubtractedResultManifest = SubtractedResult.parseFrom(bytes) override def toBinary(o: AnyRef): Array[Byte] = { println("inside toBinary ") o match { case a: Added = a.toByteArray case s :Subtracted = s.toByteArray case aR: AddedResult = aR.toByteArray case sR: SubtractedResult = sR.toByteArray }

然后我们需要在application.conf中告诉akka系统使用这些方法:

 actor { 

 serializers { 

 proto = "akka.protobuf.serializer.ProtobufSerializer" 

 serialization-bindings { 

 "java.io.Serializable" = none 

 "com.google.protobuf.Message" = proto 

 "learn.proto.messages.Added" = proto 

 "learn.proto.messages.AddedResult" = proto 

 "learn.proto.messages.Subtracted" = proto 

 "learn.proto.messages.SubtractedResult" = proto 

 }

现在再重新运行:

[INFO] [04/30/2018 18:41:02.348] [calcSystem-akka.actor.default-dispatcher-2] [akka.tcp://[email protected]:2551/user/calcRunner] Remote calculator started! 

inside toBinary 

inside fromBinarylearn.proto.messages.AddedResult 

[INFO] [04/30/2018 18:41:03.234] [calcSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2551/user/calcRunner] 18 + 38 = 56 

inside toBinary 

inside fromBinarylearn.proto.messages.AddedResult 

[INFO] [04/30/2018 18:41:04.197] [calcSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2551/user/calcRunner] 22 + 74 = 96

系统使用了自定义的ProtobufferSerializer。

下面是本次示范的完整源代码:

project/scalapb.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") 

libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

build.sbt

lazy val commonSettings = Seq( 

 name := "AkkaProtobufDemo", 

 version := "1.0", 

 scalaVersion := "2.12.6", 

lazy val local = (project in file(".")) 

 .settings(commonSettings) 

 .settings( 

 libraryDependencies ++= Seq( 

 "com.typesafe.akka" %% "akka-remote" % "2.5.11", 

 "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf" 

 name := "akka-protobuf-demo" 

lazy val remote = (project in file("remote")) 

 .settings(commonSettings) 

 .settings( 

 libraryDependencies ++= Seq( 

 "com.typesafe.akka" %% "akka-remote" % "2.5.11" 

 name := "remote-system" 

 ).dependsOn(local) 

PB.targets in Compile := Seq( 

 scalapb.gen() - (sourceManaged in Compile).value 

resources/application.conf

akka { 

 actor { 

 provider = remote 

 remote { 

 netty.tcp { 

 hostname = "127.0.0.1" 

 actor { 

 serializers { 

 proto = "akka.protobuf.serializer.ProtobufSerializer" 

 serialization-bindings { 

 "java.io.Serializable" = none 

 "com.google.protobuf.Message" = proto 

 "learn.proto.messages.Added" = proto 

 "learn.proto.messages.AddedResult" = proto 

 "learn.proto.messages.Subtracted" = proto 

 "learn.proto.messages.SubtractedResult" = proto 

}

main/protobuf/messages.proto

syntax = "proto3"; 

// Brought in from scalapb-runtime 

import "scalapb/scalapb.proto"; 

import "google/protobuf/wrappers.proto"; 

package learn.proto; 

message Added { 

 int32 nbr1 = 1; 

 int32 nbr2 = 2; 

message Subtracted { 

 int32 nbr1 = 1; 

 int32 nbr2 = 2; 

message AddedResult { 

 int32 nbr1 = 1; 

 int32 nbr2 = 2; 

 int32 result = 3; 

message SubtractedResult { 

 int32 nbr1 = 1; 

 int32 nbr2 = 2; 

 int32 result = 3; 

remote/Calculator.scala

package akka.protobuf.calculator 

import akka.actor._ 

import com.typesafe.config.ConfigFactory 

import learn.proto.messages._ 

class Calculator extends Actor with ActorLogging { 


val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2552") .withFallback(ConfigFactory.load()) val calcSystem = ActorSystem("calcSystem",config) calcSystem.actorOf(Calculator.props,"calculator") println("press any key to end program ...") scala.io.StdIn.readLine() calcSystem.terminate() }

CalcService.scala

package akka.protobuf.calcservice 

import akka.actor._ 

import learn.proto.messages._ 

import scala.concurrent.duration._ 


class CalcRunner(path: String) extends Actor with ActorLogging { sendIdentifyRequest() def sendIdentifyRequest(): Unit = { context.actorSelection(path) ! Identify(path) import context.dispatcher context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout) def receive = identifying def identifying : Receive = { case ActorIdentity(calcPath,Some(calcRef)) if (path.equals(calcPath)) = log.info("Remote calculator started!") context.watch(calcRef) context.become(calculating(calcRef)) case ActorIdentity(_,None) = log.info("Remote calculator not found!") case ReceiveTimeout = sendIdentifyRequest() case s @ _ = log.info(s"Remote calculator not ready. [$s]") def calculating(calculator: ActorRef) : Receive = { case (op : Added) = calculator ! op case (op : Subtracted) = calculator ! op case AddedResult(a,b,r) = log.info(s"$a + $b = $r") case SubtractedResult(a,b,r) = log.info(s"$a - $b = $r") case Terminated(calculator) = log.info("Remote calculator terminated, restarting ...") sendIdentifyRequest() context.become(identifying) case ReceiveTimeout = //nothing object CalcRunner { def props(path: String) = Props(new CalcRunner(path)) }

Main.scala

package akka.protobuf.demo 

import akka.actor._ 

import akka.util.Timeout 

import com.typesafe.config.ConfigFactory 

import akka.protobuf.calcservice._ 

import scala.concurrent.duration._ 

import scala.util._ 

import learn.proto.messages._ 

object Main extends App { 

 val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2551") 

 .withFallback(ConfigFactory.load()) 

 val calcSystem = ActorSystem("calcSystem",config) 

 val calcPath = "akka.tcp:[email protected]:2552/user/calculator" 

 val calculator = calcSystem.actorOf(CalcRunner.props(calcPath),"calcRunner") 


calcSystem.scheduler.schedule(1.second, 1.second) { if (Random.nextInt(100) % 2 == 0) calculator ! Added(Random.nextInt(100), Random.nextInt(100)) else calculator ! Subtracted(Random.nextInt(100), Random.nextInt(100))
override def manifest(o: AnyRef): String = o.getClass.getName final val AddedManifest = classOf[Added].getName final val SubtractedManifest = classOf[Subtracted].getName final val AddedResultManifest = classOf[AddedResult].getName final val SubtractedResultManifest = classOf[SubtractedResult].getName
case AddedManifest = Added.parseFrom(bytes) case SubtractedManifest = Subtracted.parseFrom(bytes) case AddedResultManifest = AddedResult.parseFrom(bytes) case SubtractedResultManifest = SubtractedResult.parseFrom(bytes) override def toBinary(o: AnyRef): Array[Byte] = { println("inside toBinary ") o match { case a: Added = a.toByteArray case s :Subtracted = s.toByteArray case aR: AddedResult = aR.toByteArray case sR: SubtractedResult = sR.toByteArray }

 

 

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/12802.html

cgojava