Spark SQL中UDF和UDAF
2023-09-27 14:19:38 时间
转载自:https://blog.csdn.net/u012297062/article/details/52227909
UDF: User Defined Function,用户自定义的函数,函数的输入是一条具体的数据记录,实现上讲就是普通的Scala函数;
UDAF:User Defined Aggregation Function,用户自定义的聚合函数,函数本身作用于数据集合,能够在聚合操作的基础上进行自定义操作;
实质上讲,例如说UDF会被Spark SQL中的Catalyst封装成为Expression,最终会通过eval方法来计算输入的数据Row(此处的Row和DataFrame中的Row没有任何关系)
不说太多直接上代码
1、创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息
val conf = new SparkConf() //创建SparkConf对象 conf.setAppName("SparkSQLUDFUDAF") //设置应用程序的名称,在程序运行的监控界面可以看到名称 //conf.setMaster("spark://DaShuJu-040:7077") //此时,程序在Spark集群 conf.setMaster("local[4]")
2、创建SparkContext对象和SQLContext对象
//创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息 val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //构建SQL上下文
3、模拟实际使用的数据
val bigData = Array("Spark", "Spark", "Hadoop", "Spark", "Hadoop", "Spark", "Spark", "Hadoop", "Spark", "Hadoop")
4、基于提供的数据创建DataFrame
val bigDataRDD = sc.parallelize(bigData) val bigDataRDDRow = bigDataRDD.map(item => Row(item)) val structType = StructType(Array(StructField("word", StringType, true))) val bigDataDF = sqlContext.createDataFrame(bigDataRDDRow,structType)
5、注册成为临时表
bigDataDF.registerTempTable("bigDataTable")
6、通过SQLContext注册UDF,在Scala 2.10.x版本UDF函数最多可以接受22个输入参数
sqlContext.udf.register("computeLength", (input: String) => input.length) //直接在SQL语句中使用UDF,就像使用SQL自动的内部函数一样 sqlContext.sql("select word, computeLength(word) as length from bigDataTable").show
7、通过SQLContext注册UDAF
sqlContext.udf.register("wordCount", new MyUDAF) sqlContext.sql("select word,wordCount(word) as count,computeLength(word) as length" + " from bigDataTable group by word").show()
8、按照模板实现UDAF
class MyUDAF extends UserDefinedAggregateFunction { // 该方法指定具体输入数据的类型 override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true))) //在进行聚合操作的时候所要处理的数据的结果的类型 override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true))) //指定UDAF函数计算后返回的结果类型 override def dataType: DataType = IntegerType // 确保一致性 一般用true override def deterministic: Boolean = true //在Aggregate之前每组数据的初始化结果 override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) =0} // 在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算 // 本地的聚合操作,相当于Hadoop MapReduce模型中的Combiner override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0) = buffer.getAs[Int](0) + 1 } //最后在分布式节点进行Local Reduce完成后需要进行全局级别的Merge操作 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0) } //返回UDAF最后的计算结果 override def evaluate(buffer: Row): Any = buffer.getAs[Int](0) }
相关文章
- Spark SQL数据源
- Spark SQL与Hive on Spark的比较
- 检测Sql Server服务器SQL语句执行情况
- SQL查询结果自定义排序
- sql视图语句
- [Spark] 03 - Spark SQL
- SQL Server查询性能优化——覆盖索引(二)
- SQL表名,应该用复数还是单数
- SP2-0642: SQL*Plus internal error state 2130, context 0:0:0
- SQL高级查询技巧
- SQL中多条件查询括号的用途
- 【MySQL缓存】怎么验证当前SQL是否走Buffer Pool缓存?
- sql典例分析
- PL/SQ 基础 - 精通Oracle10g PL/SQL编程
- go的database/sql库中db.Exce()
- (转载)SQL的各种连接Join详解
- 使用Docker运行SQL Server
- 数据库-sql-面试-rank
- SPARK 2.2.1 SQL处理各种数据源的案例与解读
- 第70课:Spark SQL内置函数解密与实战 每天晚上20:00YY频道现场授课频道68917580
- 第67课:Spark SQL下采用Java和Scala实现Join的案例综合实战(巩固前面学习的Spark SQL知识)
- 《Spark商业案例与性能调优实战100课》第9课:商业案例之通过Spark SQL 下两种不同方式实现口碑最佳和最热门电影比较
- 大数据Spark “蘑菇云”行动补充内容第70课: Spark SQL代码实战和性能调优 4个spark sql调优技巧有用!!!!
- SPARK 第4期:通过案例实战掌握spark sql(dataframe)
- 3 分钟的高速体验 Apache Spark SQL
- 关于CarbonData+Spark SQL的一些应用实践和调优经验分享
- Sql Server系列:嵌套查询
- 【原创 Hadoop&Spark 动手实践 9】Spark SQL 程序设计基础与动手实践(上)