zl程序教程

您现在的位置是:首页 >  其它

当前栏目

SparkCore 之旅

之旅
2023-09-14 09:14:49 时间

Spark概述

Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎

在这里插入图片描述

历史

Hadoop 历史

在这里插入图片描述

2011 年 发布 1.x 版本

  • NameNode 不能高可用
  • MR 框架将资源调度任务调度耦合在一起
  • MR 框架基于磁盘计算,性能比较低

2013 年 发布 2.x 版本

  • NameNode 高可用
  • 将资源调度和任务调度解耦
  • 计算框架可插拔

Spark 历史

在这里插入图片描述

Hadoop :

2013 年 2.x 版本 ( YARN )

Spark :

2013 年 成为 Apache 孵化项目, 将资源和任务调度分开

对比

Hadoop 的 MR 框架和 Spark 框架都是数据处理框架

MR

  • Hadoop是由 Java 语言编写
  • MR 是一种编程模型
  • MR 采用创建新的进程
  • 多个 MR 作业之间的数据交互依赖于磁盘交互
  • 多个作业之间数据通信是基于磁盘
  • 内存资源不够 , 选择 MR
  • MR 不能处理循环迭代式数据流处理

在这里插入图片描述

Spark

  • Spark 由 Scala 语言开发
  • Spark 用于数据计算
  • Spark 采用 fork 线程的方式
  • Spark 在 shuffle 时将数据写入磁盘
  • Spark 多个作业之间数据通信是基于内存
  • Spark 的缓存机制比 HDFS 的缓存机制高效
  • Spark SQL 操作结构化数据
  • Spark Streaming 对实时数据进行流式计算
  • 支持复杂的数据挖掘算法图形计算算法
  • 支持 迭代式计算,图形计算
  • 支持 机器学习中ALS、凸优化梯度下降
  • Spark 将计算单元缩小到更适合并行计算和重复使用的 RDD 计算模型

在这里插入图片描述

Spark 核心模块

在这里插入图片描述

Spark Core

提供 Spark 的基本功能,如 : 任务调度、内存管理、错误恢复、与存储系统交互、弹性分布式数据集(Resilient Distributed DataSet,RDD),扩展了:Spark SQL,Spark Streaming,GraphX , MLlib

Spark SQL

Spark 操作结构化数据的组件。可以使用 SQL 或 HQL 来查询数据 , 支持多种数据源,如 : Hive表、Parquet 以 JSON

Spark Streaming

对实时数据进行流式计算的组件

Spark MLlib

提供的一个机器学习算法库。如 : 分类、回归、聚类、协同过滤等、模型评估、数据导入

Spark GraphX

面向图计算提供的框架与算法库

集群管理器

可以在一个计算节点到数千个计算节点之间伸缩计算 , 支持在各种集群管理器(Cluster Manager)上运行,如 : Hadoop YARN、Apache Mesos,Spark 独立调度器

Spark快速上手

在生产环境中,通常会在 IDEA 中编制程序,然后打成 Jar 包,然后提交到集群

增加Scala插件

Spark 由 Scala 语言开发的,当前使用的 Spark 版本为3.0.0,默认采用的Scala 编译版本为 2.12,IDEA 开发工具中含有 Scala 开发插件

在这里插入图片描述

增加依赖关系

修改 Maven 项目中的 POM 文件,增加 Spark 框架的依赖关系

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>

右击模块 , 点击 Add Framework Support

在这里插入图片描述

在这里插入图片描述

WordCount

流程图 :

在这里插入图片描述

实现一个 WordCount

package com.cpucode.spark.wc.reduceByKey

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author : cpucode
 * @date : 2022/2/9 19:52
 * @github : https://github.com/CPU-Code
 * @csdn : https://blog.csdn.net/qq_44226094
 */
object ReduceByKey {
  def main(args: Array[String]): Unit = {

    // Spark是一个计算【框架】
    // 1. 能找到他 :增加依赖
    // 2. 获取Spark的连接(环境)
    val conf = new SparkConf().setMaster("local").setAppName("wordCount")
    //2.创建SparkContext,该对象是提交Spark App的入口
    val context = new SparkContext(conf)

    // 读取文件
    val lines = context.textFile("wordCount/src/main/resources/word.txt")

    // 将文件中的数据进行了分词
    val word = lines.flatMap(_.split(" "))
    // word => (word, 1)
    val wordToOne = word.map((_, 1))

    // reduceByKey : 按照key分组, 对相同的key的value进行reduce
    // (word, 1)(word, 1)(word, 1)(word, 1)(word, 1)
    // reduce(1,1,1,1,1)
    // 框架的核心就是封装
    val wordCount = wordToOne.reduceByKey(_ + _)

    // 将统计结果打印在控制台上
    wordCount.collect().foreach(println)

	//8.关闭连接
    context.stop()
  }
}

执行过程中,会产生的执行日志,在项目的 resources 目录中创建 log4j.properties 文件,并添加日志配置信息:

log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell log level to ERROR. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

链接 :

https://github.com/CPU-Code/spark

Spark运行环境

Spark 作为一个数据处理计算引擎,可以运行多个环境下

在这里插入图片描述

部署模式 :

  • Local 模式:在本地部署单个 Spark 服务
  • Standalone 模式:Spark 自带的任务调度模式(国内常用)
  • YARN 模式:Spark 使用 Hadoop 的 YARN 组件进行资源与任务调度(国内常用)
  • Mesos 模式:Spark 使用 Mesos 平台进行资源与任务的调度
  • K8S 模式:
  • Windows 模式:

Local模式

Local 模式运行在一台计算机上的模式,常用于在本机上练手和测试

解压缩文件

spark-3.0.0-bin-hadoop3.2.tgz 文件上传到 Linux 并解压缩,放置在指定位置,路径中不要包含中文或空格

tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module/

在这里插入图片描述

cd /opt/module

重命名

mv spark-3.0.0-bin-hadoop3.2/ spark-3.0.0-local/

在这里插入图片描述

启动Local环境

./bin/spark-shell

在这里插入图片描述

启动成功后,可以输入网址进行 Web UI 监控页面访问

http://cpucode101:4040

在这里插入图片描述

命令行工具

data 目录中,添加 word.txt 文件

vim word.txt
cpu text code
cpucode cpu
code

在这里插入图片描述

sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

在这里插入图片描述

在这里插入图片描述

退出本地模式

按键

Ctrl+C

输入Scala指令

:quit

在这里插入图片描述

提交应用

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
  • --class : 执行程序的主类
  • --master local[2] : 部署模式,默认 : 本地模式,数字 : 分配的虚拟CPU核数量
  • local: 不指定线程数,所有计算运行在一个线程当中,无并行计算
  • local[*] :默认模式。根据 CPU 核来设置线程数。如 : 8 核,自动设置 8 个线程
  • spark-examples_2.12-3.0.0.jar : 运行的应用类所在的 jar 包
  • 数字10 : 设定当前应用的任务数量

在这里插入图片描述

Standalone 模式

Spark 的 独立部署(Standalone)模式体现了经典的 master-slave 模式

集群规划 :

cpu101cpu102cpu103
SparkMaster
SparkWorker

Master & Worker 关系 :

在这里插入图片描述

Driver & Executor 关系 :

在这里插入图片描述

部署

https://blog.csdn.net/qq_44226094/article/details/123851417

运行流程

Spark 两种模式 , 区别:Driver程序的运行节点 :

  • standalone-client
  • standalone-cluster

standalone-client

在这里插入图片描述

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://cpu101:7077,cpu102:7077 \
--executor-memory 2G \
--total-executor-cores 2 \
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
  • --deploy-mode client : Driver 程序运行在本地客户端

standalone-cluster

在这里插入图片描述

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://cpu101:7077,cpu102:7077 \
--executor-memory 2G \
--total-executor-cores 2 \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
  • --deploy-mode cluster : Driver程序运行在集群

Yarn 模式

Spark 主要是计算框架,而不是资源调度框架,主流 : Yarn

部署

https://blog.csdn.net/qq_44226094/article/details/123851080

K8S & Mesos模式

Mesos 是Apache下的开源分布式资源管理框架,它被称为是分布式系统的内核

在这里插入图片描述

容器化部署是目前业界很流行的一项技术,基于 Docker 镜像运行能够让用户更加方便地对应用进行管理和运维。

容器管理工具中最为流行的就是 Kubernetes(k8s),而 Spark 也在最近的版本中支持了k8s部署模式

https://spark.apache.org/docs/latest/running-on-kubernetes.html

在这里插入图片描述

Windows模式

Spark 提供了可以在 windows 系统下启动本地集群的方式

解压缩文件

将文件 spark-3.0.0-bin-hadoop3.2.tgz 解压缩到无中文无空格的路径中

在这里插入图片描述

启动本地环境

执行解压缩文件路径下 bin 目录中的 spark-shell.cmd 文件,启动 Spark 本地环境

在这里插入图片描述

命令行提交应用

在DOS命令行窗口中执行提交指令

spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ../examples/jars/spark-examples_2.12-3.0.0.jar 10

部署模式对比

模式Spark安装机器数需启动的进程所属者应用场景
Local1Spark测试
Standalone3Master 及 WorkerSpark单独部署
Yarn1Yarn 及 HDFSHadoop混合部署

端口号

  • Spark 查看当前 Spark-shell 运行任务情况端口号:4040(计算)
  • Spark Master 内部通信服务端口号:7077
  • Standalone 模式下,Spark Master Web 端口号:8080(资源)
  • Spark 历史服务器端口号:18080
  • Hadoop YARN 任务运行情况查看端口号:8088
  • Hadoop 历史服务器端口号:19888

Spark运行架构

运行架构

Spark : 一个计算引擎,采用标准 master-slave 的结构

Spark 执行结构 :

  • Driver : master,负责管理整个集群中的作业任务调度
  • Executor : slave,负责实际执行任务

在这里插入图片描述

核心组件

Spark 两个核心组件:

  • Driver
  • Executor

Driver

Driver : 驱使整个应用运行起来的程序 ( Driver 类 )

Spark 驱动器节点 : 执行 Spark 任务中的 main 方法,负责实际代码的执行工作

Driver 执行时主要负责:

  • 将用户程序转化为作业( job )
  • 在 Executor 之间调度任务( task )
  • 跟踪 Executor 的执行情况
  • 通过 UI 展示查询运行情况

Executor

Spark Executor 是集群中工作节点(Worker)中的一个JVM进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立

Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在

如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行

Executor 有两个核心功能:

  • 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
  • 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算

Master & Worker

Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调度的功能

环境中还有其他两个核心组件:

  • Master
  • Worker

Master 是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于 Yarn 环境中的 RM

Worker 也是进程,一个 Worker 运行在集群中的一台服务器上,由Master分配资源对数据进行并行的处理和计算,类似于Yarn环境中NM

ApplicationMaster

Hadoop 用户向 YARN 集群提交应用程序时 , 提交程序中应该包含 ApplicationMaster,用于向资源调度器申请执行任务的资源容器 Container,运行用户自己的程序任务 job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况

ResourceManager(资源)和 Driver(计算)之间的解耦合靠的就是 ApplicationMaster

Executor与Core(核)

Spark Executor 是集群中运行在工作节点(Worker)中的一个JVM进程,是整个集群中的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源

资源 : 工作节点 Executor 的内存大小和 使用的虚拟CPU核(Core)数量

应用程序相关启动参数如下:

名称说明
–num-executors配置 Executor 的数量
–executor-memory配置每个 Executor 的内存大小
–executor-cores配置每个 Executor 的虚拟 CPU core 数量

并行度(Parallelism)

在分布式计算框架中一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算,所以能够真正地实现多任务并行执行

将整个集群并行执行任务的数量称之为并行度

一个作业到底并行度取决于框架的默认配置 , 应用程序也可以在运行过程中动态修改

有向无环图(DAG)

在这里插入图片描述

大数据计算引擎框架 :

  • 第一代计算引擎 : Hadoop 所承载的 MapReduce,它将计算分为两个阶段,分别为 Map阶段Reduce阶段。上层应用设法去拆分算法,要在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。 DAG 框架克服了弊端
  • 第二代计算引擎 : 支持 DAG 的框架。如 Tez 以及更上层的 Oozie , 是批处理的任务
  • 第三代计算引擎 : Spark 是 Job 内部的 DAG 支持(不跨越 Job),以及实时计算

有向无环图 : 由 Spark 程序直接映射成的数据流的高级抽象模型。就是将整个程序计算的执行过程用图形表示出来 , 用于表示程序的拓扑结构

DAG(Directed Acyclic Graph)有向无环图是由线组成的拓扑图形,该图形具有方向不会闭环

提交流程

提交流程 : 应用程序通过 Spark 客户端提交给 Spark 运行环境执行计算的流程

提交流程是基于 Yarn 环境

在这里插入图片描述

Spark应用程序提交到 Yarn 环境中执行的两种部署执行的方式:

  • Client
  • Cluster

两种模式主要区别在于:Driver程序的运行节点位置

Spark 两种模式 , 区别:Driver程序的运行节点 :

  • yarn-client : Driver 程序运行在客户端,适用于交互、调试
  • yarn-cluster : Driver 程序运行在由 ResourceManager 启动 , 适用于生成

Yarn Client模式

在这里插入图片描述

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10

Client 模式将用于监控调度的 Driver 模块在客户端执行,不在 Yarn中,一般用于测试

  • Driver 在任务提交的本地机器上运行
  • Driver 启动后会和 ResourceManager 通讯申请启动 ApplicationMaster
  • ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster,负责向 ResourceManager 申请 Executor 内存
  • ResourceManager 接到 ApplicationMaster 的资源申请后会分配 container,然后ApplicationMaster 在资源分配指定的 NodeManager 上启动 Executor 进程
  • Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行 main 函数
  • 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行

Yarn Cluster模式

在这里插入图片描述

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10

Cluster 模式将用于监控调度的 Driver 模块启动在 Yarn集群资源中执行。一般应用于实际生产环境

  • 在 YARN Cluster 模式下,任务提交后会和 ResourceManager 通讯申请启动 ApplicationMaster
  • 随后ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster ,此时的 ApplicationMaster 就是 Driver
  • Driver 启动后向 ResourceManager 申请 Executor 内存,ResourceManager 接到ApplicationMaster 的资源申请后会分配 container,然后在合适的 NodeManager 上启动Executor 进程
  • Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行main函数
  • 之后执行到 Action 算子时,触发一个Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行