zl程序教程

您现在的位置是:首页 >  其他

当前栏目

Scala基于Akka模拟Spark Master Worker进程间通信(二):Worker定时向Master心跳

模拟scala进程Spark 基于 定时 Master 间通信
2023-09-14 09:02:03 时间

最终效果

在这里插入图片描述
在这里插入图片描述

Master

package cn.zxl.spark.master

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import cn.zxl.spark.common.{HeartBeat, RegisterWorkerInfo, RegisteredWorkerInfo, WorkerInfo}
import com.typesafe.config.ConfigFactory

import scala.collection.mutable

/**
 * @description:
 * @author: zhangxueliang
 * @create: 2021-05-29 16:37
 * @version: 1.0
 * */
class SparkMaster extends Actor {

  //定义hashmap。管理workers
  private val workers: mutable.Map[String, WorkerInfo] = mutable.Map[String, WorkerInfo]()

  override def receive: Receive = {
    case "start" => println("Spark Master服务器启动了!")
    case RegisterWorkerInfo(id, cpu, ram) => {
      //接收到worker注册信息
      if (!workers.contains(id)) {
        //创建WorkerInfo对象
        val workerInfo = new WorkerInfo(id, cpu, ram)
        //加入到workers
        workers += (id -> workerInfo)
        println("服务器当前的workers=" + workers)
        sender() ! RegisteredWorkerInfo
      }
    }
    case HeartBeat(id) => {
      //更新对应的worker的心跳时间
      //1.从workers中取出WorkerInfo
      val workerInfo: WorkerInfo = workers(id)
      workerInfo.lastHeartBeat = System.currentTimeMillis()
      println("master更新了 "+id+" 的心跳时间")
    }
  }
}

object SparkMaster {
  def main(args: Array[String]): Unit = {
    //先创建ActorSystem
    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname=127.0.0.1
         |akka.remote.netty.tcp.port=10005
""".stripMargin)
    val sparkMasterSystem: ActorSystem = ActorSystem("SparkMaster", config)
    //创建SparkMaster  actor
    val sparkMasterRef: ActorRef = sparkMasterSystem.actorOf(Props[SparkMaster], "SparkMaster-01")
    //启动SparkMaster
    sparkMasterRef ! "start"

  }
}

Worker

package cn.zxl.spark.worker

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import cn.zxl.spark.common.{HeartBeat, RegisterWorkerInfo, RegisteredWorkerInfo, SendHeartBeat}
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._

/**
 * @description:
 * @author: zhangxueliang
 * @create: 2021-05-29 16:47
 * @version: 1.0
 * */
class SparkWorker(masterHost: String, masterPort: Int) extends Actor {
  //masterProxy是Master的代理/引用
  var masterProxy: ActorSelection = _
  //随机生成一个ID
  private val id: String = java.util.UUID.randomUUID().toString

  override def preStart(): Unit = {
    println("preStart()调用")
    //初始化masterProxy
    masterProxy = context.actorSelection(s"akka.tcp://SparkMaster@${masterHost}:${masterPort}/user/SparkMaster-01")
    println("masterProxy=" + masterProxy)
  }

  override def receive: Receive = {
    case "start" => {
      println("worker启动了")
      masterProxy ! RegisterWorkerInfo(id, 16, 16 * 1024)
    }
    case RegisteredWorkerInfo => {
      println("workerid=" + id + "注册成功")
      import context.dispatcher
      //0 millis: 表示不延时,立即执行定时器
      //3000 millis:每隔3000毫秒执行一次
      //self:表示发送给自己
      //SendHeartBeat:发送的内容
      context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)
    }
    case SendHeartBeat => {
      println("worker= " + id + " 向master发送了心跳")
      masterProxy ! HeartBeat(id)
    }
  }
}

object SparkWorker {
  def main(args: Array[String]): Unit = {
    val workerHost = "127.0.0.1"
    val workerPort = 10001
    val masterHost = "127.0.0.1"
    val masterPort = 10005
    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname=127.0.0.1
         |akka.remote.netty.tcp.port=10002
         """.stripMargin)
    //创建ActorSystem
    val sparkWorkerSystem: ActorSystem = ActorSystem("SparkWorker", config)
    //创建SparkWorker的引用/代理
    val sparkWorkerRef: ActorRef = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort)), "SparkWorker-01")
    sparkWorkerRef ! "start"

  }
}

MessageProtocol

package cn.zxl.spark.common

/**
 * @description:
 * @author: zhangxueliang
 * @create: 2021-05-29 17:12
 * @version: 1.0
 * */
//worker注册信息
case class RegisterWorkerInfo(id:String,cpu:Int,ram:Int){}
//这个是WorkerInfo 此信息是用老保存到master的hashmap(管理worker)
//将来这个workerInfo会扩展(如扩展worker上一次的心跳时间)
class WorkerInfo(val id:String,val cpu:Int,val ram:Int){
  var lastHeartBeat: Long = System.currentTimeMillis()
}
//当worker注册成功,服务器返回 RegisteredWorkerInfo 对象
case object RegisteredWorkerInfo

//worker每隔一定时间由定时器发送给自己的一个消息
case object SendHeartBeat
//worker每隔一定时间由定时器触发,而向master发送的协议消息
case class HeartBeat(id:String)

pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zxl</groupId>
    <artifactId>scala-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>scala-demo</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.12</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <akka.version>2.5.12</akka.version>
        <scala.actors.version>2.10.0-M6</scala.actors.version>
    </properties>

    <dependencies>
        <!--scala 2.11起过时 无法使用-->
        <!--<dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-actors</artifactId>
            <version>${scala.actors.version}</version>
        </dependency>-->

        <!--akka actor依赖-->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_${scala.compat.version}</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <!--多进程之间的Actor通信-->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_${scala.compat.version}</artifactId>
            <version>${akka.version}</version>
        </dependency>
    </dependencies>

    <build>
        <!--指定源码包和测试包的位置-->
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到 maven 的 compile 阶段 -->
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_depencencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!--maven打包的插件-->
            <!--            maven-assembly-plugin 和 maven-shade-plugin都是打包插件。遇到同名文件assembly是覆盖,shade是追加。所以此处选择shade插件-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <!--指定main方法-->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>xxx</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>