Apache Spark源码走读(五)部署模式下的容错性分析 &standalone cluster模式下资源的申请与释放
本文就standalone部署方式下的容错性问题做比较细致的分析,主要回答standalone部署方式下的包含哪些主要节点,当某一类节点出现问题时,系统是如何处理的。
Standalone部署的节点组成介绍Spark的资料中对于RDD这个概念涉及的比较多,但对于RDD如何运行起来,如何对应到进程和线程的,着墨的不是很多。
在实际的生产环境中,Spark总是会以集群的方式进行运行的,其中standalone的部署方式是所有集群方式中最为精简的一种,另外是Mesos和YARN,要理解其内部运行机理,显然要花更多的时间才能了解清楚。
standalone cluster的组成standalone集群由三个不同级别的节点组成,分别是:
Worker 工作节点 ,这个是manager,是分舵主, 在整个集群中,可以有多个worker,如果worker为零,什么事也做不了 Executor 干苦力活的,直接受worker掌控,一个worker可以启动多个executor,启动的个数受限于机器中的cpu核数
这三种不同类型的节点各自运行于自己的JVM进程之中。
Driver Application提交到standalone集群的应用程序称之为Driver Applicaton。
Standalone集群启动及任务提交过程详解
上图总结了正常情况下Standalone集群的启动以及应用提交时,各节点之间有哪些消息交互。下面分集群启动和应用提交两个过程来作详细说明。
集群启动过程正常启动过程如下所述
step 1: 启动master$SPARK_HOME/sbin/start-master.shstep 2: 启动worker
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077
worker启动之后,会做两件事情
将自己注册到Master, RegisterWorker 定期发送心跳消息给Master 任务提交过程 step 1: 提交application利用如下指令来启动spark-shell
MASTER=spark://127.0.0.1:7077 $SPARK_HOME/bin/spark-shell
运行spark-shell时,会向Master发送RegisterApplication请求
日志位置: master运行产生的日志在$SPARK_HOME/logs目录下
step 2: Master处理RegisterApplication的请求之后收到RegisterApplication请求之后,Mastet会做如下处理
如果有worker已经注册上来,发送LaunchExecutor指令给相应worker 如果没有,则什么事也不做 step 3: 启动ExecutorWorker在收到LaunchExecutor指令之后,会启动Executor进程
step 4: 注册Executor启动的Executor进程会根据启动时的入参,将自己注册到Driver中的SchedulerBackend
日志位置: executor的运行日志在$SPARK_HOME/work目录下
step 5: 运行TaskSchedulerBackend收到Executor的注册消息之后,会将提交到的Spark Job分解为多个具体的Task,然后通过LaunchTask指令将这些Task分散到各个Executor上真正的运行
如果在调用runJob的时候,没有任何的Executor注册到SchedulerBackend,相应的处理逻辑是什么呢?
SchedulerBackend会将Task存储在TaskManager中 一旦有Executor注册上来,就将TaskManager管理的尚未运行的task提交到executor中 如果有多个job处于pending状态,默认调度策略是FIFO,即先提交的先运行 启动Master 启动spark-shell 执行 sc.textFile("README.md").count 启动worker 注意worker启动之后,spark-shell中打印出来的日志消息 Job执行结束任务运行结束时,会将相应的Executor停掉。
可以做如下的试验
停止spark-shell 利用ps -ef|grep -i java查看java进程,可以发现CoarseGrainedExecutorBackend进程已经退出通过上面的控制消息原语之间的先后顺序可以看出
Master和worker进程必须显式启动 executor是被worker隐式的带起 集群的启动顺序 Master必须先于其它节点启动 worker和driver哪个先启动,无所谓 但driver提交的job只有在有相应的worker注册到Master之后才可以被真正的执行异常场景分析
上面说明的是正常情况下,各节点的消息分发细节。那么如果在运行中,集群中的某些节点出现了问题,整个集群是否还能够正常处理Application中的任务呢?
异常分析1: worker异常退出在Spark运行过程中,经常碰到的问题就是worker异常退出,当worker退出时,整个集群会有哪些故事发生呢? 请看下面的具体描述:
worker异常退出,比如说有意识的通过kill指令将worker杀死 worker在退出之前,会将自己所管控的所有小弟executor全干掉 worker需要定期向master改善心跳消息的,现在worker进程都已经玩完了,哪有心跳消息,所以Master会在超时处理中意识到有一个“分舵”离开了 Master非常伤心,伤心的Master将情况汇报给了相应的Driver Driver通过两方面确认分配给自己的Executor不幸离开了,一是Master发送过来的通知,二是Driver没有在规定时间内收到Executor的StatusUpdate,于是Driver会将注册的Executor移除worker异常退出会带来哪些影响
executor退出导致提交的task无法正常结束,会被再一次提交运行 如果所有的worker都异常退出,则整个集群不可用 需要有相应的程序来重启worker进程,比如使用supervisord或runitworkerThread = new Thread("ExecutorRunner for " + fullId) { override def run() { fetchAndRunExecutor() } workerThread.start() // Shutdown hook that kills actors on shutdown. shutdownHook = new Thread() { override def run() { killProcess(Some("Worker shutting down")) Runtime.getRuntime.addShutdownHook(shutdownHook)
killProcess的过程就是停止相应CoarseGrainedExecutorBackend的过程。
worker停止的时候,一定要先将自己启动的Executor停止掉。这是不是很像水浒中宋江的手段,李逵就是这样不明不白的把命给丢了。
需要特别指出的是,当worker在启动Executor的时候,是通过ExecutorRunner来完成的,ExecutorRunner是一个独立的线程,和Executor是一对一的关系,这很重要。Executor作为一个独立的进程在运行,但会受到ExecutorRunner的严密监控。
异常分析2: executor异常退出Executor作为Standalone集群部署方式下的最底层员工,一旦异常退出,其后果会是什么呢?
executor异常退出,ExecutorRunner注意到异常,将情况通过ExecutorStateChanged汇报给Master Master收到通知之后,非常不高兴,尽然有小弟要跑路,那还了得,要求Executor所属的worker再次启动 Worker收到LaunchExecutor指令,再次启动executor作为一名底层员工,想轻易摞挑子不干是不成的。"人在江湖,身不由己“啊。
启动Master 启动Worker 启动spark-shell 手工kill掉CoarseGrainedExecutorBackend fetchAndRunExecutorfetchAndRunExecutor负责启动具体的Executor,并监控其运行状态,具体代码逻辑如下所示
def fetchAndRunExecutor() { try { // Create the executors working directory val executorDir = new File(workDir, appId + "/" + execId) if (!executorDir.mkdirs()) { throw new IOException("Failed to create directory " + executorDir) // Launch the process val command = getCommandSeq logInfo("Launch command: " + command.mkString("\"", "\" \"", "\"")) val builder = new ProcessBuilder(command: _*).directory(executorDir) val env = builder.environment() for ((key, value) { logInfo("Runner thread for executor " + fullId + " interrupted") state = ExecutorState.KILLED killProcess(None) case e: Exception = { logError("Error running executor", e) state = ExecutorState.FAILED killProcess(Some(e.toString))异常分析3: master 异常退出
worker和executor异常退出的场景都讲到了,我们剩下最后一种情况了,master挂掉了怎么办?
带头大哥如果不在了,会是什么后果呢?
worker没有汇报的对象了,也就是如果executor再次跑飞,worker是不会将executor启动起来的,大哥没给指令 无法向集群提交新的任务 老的任务即便结束了,占用的资源也无法清除,因为资源清除的指令是Master发出的怎么样,知道后果很严重了吧?别看老大平时不干活,要真的不在,仅凭小弟们是不行的。
Master单点失效问题的解决那么怎么解决Master单点失效的问题呢?
你说再加一个Master就是了,两个老大。两个老大如果同时具有指挥权,结果也将是灾难性的。设立一个副职人员,当目前的正职挂掉之后,副职接管。也就是同一时刻,有且只有一个active master。
注意不错,如何实现呢?使用zookeeper的ElectLeader功能,效果图如下
如何搭建zookeeper集群,这里不再废话,哪天有空的话再整一整,或者可以参考写的storm系列中谈到的zookeeper的集群安装步骤。
假设zookeeper集群已经设置成功,那么如何启动standalone集群中的节点呢?有哪些特别的地方?
conf/spark-env.sh在conf/spark-env.sh中,为SPARK_DAEMON_JAVA_OPTS添加如下选项
spark.deploy.recoveryMode Set to ZOOKEEPER to enable standby Master recovery mode (default: NONE).
spark.deploy.zookeeper.url The ZooKeeper cluster url (e.g., 192.168.1.100:2181,192.168.1.101:2181).
spark.deploy.zookeeper.dir The directory in ZooKeeper to store recovery state (default: /spark).
Master 掌管整个cluster的资源,主要是指cpu core和memory,但Master自身并不拥有这些资源 Worker 计算资源的实际贡献者,须向Master汇报自身拥有多少cpu core和memory, 在master的指示下负责启动executor Executor 执行真正计算的苦力,由master来决定该进程拥有的core和memory数值 Driver 资源的实际占用者,Driver会提交一到多个job,每个job在拆分成多个task之后,会分发到各个executor真正的执行
这些内容在standalone cluster模式下的容错性分析中也有所涉及,今天主要讲一下资源在分配之后不同场景下是如何被顺利回收的。
资源上报汇聚过程standalone cluster下最主要的当然是master,master必须先于worker和driver程序正常启动。
当master顺利启动完毕,可以开始worker的启动工作,worker在启动的时候需要向master发起注册,在注册消息中带有本worker节点的cpu core和内存。
调用顺序如下preStart- registerWithMaster- tryRegisterAllMasters
看一看tryRegisterAllMasters的代码
def tryRegisterAllMasters() { for (masterUrl - masterUrls) { logInfo("Connecting to master " + masterUrl + "...") val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
我们的疑问是RegisterWorker构造函数所需的参数memory和cores是从哪里获取的呢?
注意一下Worker中的main函数会创建WorkerArguments,
def main(argStrings: Array[String]) { SignalLogger.register(log) val args = new WorkerArguments(argStrings) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir) actorSystem.awaitTermination()
memory通过函数inferDefaultMemory获取,而cores通过inferDefaultCores获取。
def inferDefaultCores(): Int = { Runtime.getRuntime.availableProcessors() def inferDefaultMemory(): Int = { val ibmVendor = System.getProperty("java.vendor").contains("IBM") var totalMb = 0 try { val bean = ManagementFactory.getOperatingSystemMXBean() if (ibmVendor) { val beanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean") val method = beanClass.getDeclaredMethod("getTotalPhysicalMemory") totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt } else { val beanClass = Class.forName("com.sun.management.OperatingSystemMXBean") val method = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize") totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt } catch { case e: Exception = { totalMb = 2*1024 System.out.println("Failed to get total physical memory. Using " + totalMb + " MB") // Leave out 1 GB for the operating system, but dont return a negative memory size math.max(totalMb - 1024, 512)
如果已经在配置文件中为显示指定了每个worker的core和memory,则使用配置文件中的值,具体配置参数为SPARK_WORKER_CORES和SPARK_WORKER_MEMORY
Master在收到RegisterWork消息之后,根据上报的信息为每一个worker创建相应的WorkerInfo.
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) = logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { // ignore, dont send response } else if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerUiPort, publicAddress) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() } else { val workerAddress = worker.actor.path.address logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress) sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)资源分配过程
如果在worker注册上来的时候,已经有Driver Application注册上来,那么就需要将原先处于未分配资源状态的driver application启动相应的executor。
WorkerInfo在schedule函数中会被使用到,schedule函数处理逻辑概述如下
查看目前存活的worker中剩余的内存是否能够满足application每个task的最低需求,如果是则将该worker加入到可分配资源的队列 根据分发策略,如果是决定将工作平摊到每个worker,则每次在一个worker上占用一个core,直到所有可分配资源耗尽或已经满足driver的需求 如果分发策略是分发到尽可能少的worker,则一次占用尽worker上的可分配core,直到driver的core需求得到满足 根据步骤2或3的结果在每个worker上添加相应的executor,处理函数是addExecutor为了叙述简单,现仅列出平摊到各个worker的分配处理过程
for (worker workers if worker.coresFree 0 worker.state == WorkerState.ALIVE) { for (app - waitingApps if app.coresLeft 0) { if (canUse(app, worker)) { val coresToUse = math.min(worker.coresFree, app.coresLeft) if (coresToUse 0) { val exec = app.addExecutor(worker, coresToUse) launchExecutor(worker, exec) app.state = ApplicationState.RUNNING
launchExecutor主要负责两件事情:
记录下新添加的executor使用掉的cpu core和内存数目,记录过程发生在worker.addExecutor 向worker发送LaunchExecutor指令def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory) exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
worker在收到LaunchExecutor指令后,也会记一笔账,将要使用掉的cpu core和memory从可用资源中减去,然后使用ExecutorRunner来负责生成Executor进程,注意Executor运行于独立的进程。代码如下
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) = if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host, appDesc.sparkHome.map(userSparkHome = new File(userSparkHome)).getOrElse(sparkHome), workDir, akkaUrl, conf, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ masterLock.synchronized { master ! ExecutorStateChanged(appId, execId, manager.state, None, None) } catch { case e: Exception = { logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId masterLock.synchronized { master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
在资源分配过程中需要注意到的是如果有多个Driver Application处于等待状态,资源分配的原则是FIFO,先到先得。
资源回收过程worker中上报的资源最终被driver application中提交的job task所占用,如果application结束(包括正常和异常退出),application所占用的资源就应该被顺利回收,即将占用的资源重新归入可分配资源行列。
现在的问题转换成Master和Executor如何知道Driver Application已经退出了呢?
有两种不同的处理方式,一种是先道别后离开,一种是不告而别。现分别阐述。
何为先道别后离开,即driver application显式的通知master和executor,任务已经完成了,我要bye了。应用程序显式的调用SparkContext.stop
def stop() { postApplicationEnd() ui.stop() // Do this only if not stopped already - best case effort. // prevent NPE if stopped more than once. val dagSchedulerCopy = dagScheduler dagScheduler = null if (dagSchedulerCopy != null) { metadataCleaner.cancel() cleaner.foreach(_.stop()) dagSchedulerCopy.stop() taskScheduler = null // TODO: Cache.stop()? env.stop() SparkEnv.set(null) ShuffleMapTask.clearCache() ResultTask.clearCache() listenerBus.stop() eventLogger.foreach(_.stop()) logInfo("Successfully stopped SparkContext") } else { logInfo("SparkContext already stopped")
显式调用SparkContext.stop的一个主要功能是会去显式的停止Executor,具体下达StopExecutor指令的代码见于CoarseGrainedSchedulerBackend中的stop函数
override def stop() { stopExecutors() try { if (driverActor != null) { val future = driverActor.ask(StopDriver)(timeout) Await.ready(future, timeout) } catch { case e: Exception = throw new SparkException("Error stopping standalone schedulers driver actor", e)
那么Master又是如何知道Driver Application退出的呢?这要归功于Akka的通讯机制了,当相互通讯的任意一方异常退出,另一方都会收到DisassociatedEvent, Master也就是在这个消息处理中移除已经停止的Driver Application。
case DisassociatedEvent(_, address, _) = { // The disconnected client couldve been either a worker or an app; remove whichever it was logInfo(s"$address got disassociated, removing it.") addressToWorker.get(address).foreach(removeWorker) addressToApp.get(address).foreach(finishApplication) if (state == RecoveryState.RECOVERING canCompleteRecovery) { completeRecovery() }
不告而别的方式下Executor是如何知道自己所服务的application已经顺利完成使命了呢?道理和master的一样,还是通过DisassociatedEvent来感知。详见CoarseGrainedExecutorBackend中的receive函数
case x: DisassociatedEvent = logError(s"Driver $x disassociated! Shutting down.") System.exit(1)异常情况下的资源回收
由于Master和Worker之间的心跳机制,如果worker异常退出, Master会由心跳机制感知到其消亡,进而将其上报的资源移除。
Executor异常退出时,Worker中的监控线程ExecutorRunner会立即感知,进而上报给Master,Master会回收资源,并重新要求worker启动executor。
在阿里云ECS上配置Apache+wsgi实现blog的部署 利用Django框架搭建个人博客网站,将网站通过Apache+wsgi部署到阿里云服务器。主要采用html、css、javascript作为前端,并使用了JQuery框架和Bootstrap框架;采用django框架作为后台开发技术、后台数据库使用mysql。本篇幅着重于Django框架介绍、数据库mysql配置和服务器部署。
相关文章
- 使用 等空格实现最小成本中文对齐
- Apache Spark源码走读(十)ShuffleMapTask计算结果的保存与读取 &WEB UI和Metrics初始化及数据更新过程分析
- Apache Spark技术实战(三)利用Spark将json文件导入Cassandra &SparkR的安装及使用
- Apache Storm 衍生项目 & Apache Flink初接触
- [AngularJS] Services, Factories, and Providers -- value & Providers
- Interop type 'Microsoft.Office.Interop.Word.ApplicationClass' cannot be embedded. Use the applicable
- 华为OD机试 - 整型数组按个位值排序(Java & JS & Python)
- 华为OD机试 - 在字符串中找出连续最长的数字串(含“+-”号)(Java & JS & Python)
- 华为OD机试 - 组成最大数(Java & JS & Python)
- 适合做app的前端框架有哪些?webAPP&移动端App:react native、weex、flutter
- ML之PDP/ICE/PFI/GS&LS/LIME/SHAP:《Interpretability Methods in Machine Learning: A Brief Survey机器学习可解释性
- Django 认证系统 cookie & session & auth模块
- 线程异常:undefined reference to 'pthread_create' 处理
- consul分布式集群搭建&简单功能测试&故障恢复【h】
- Eureka&Zookeeper&Consul 原理与对比
- Android 8.1 Sim卡联通3G&4G信号增强