spark sql简单示例
2023-09-14 08:59:49 时间
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.DataType;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.hive.api.java.JavaHiveContext;
* 使用JavaHiveContext时 * 1:需要在classpath下面增加三个配置文件:hive-site.xml,core-site.xml,hdfs-site.xml * 2:需要增加postgresql或mysql驱动包的依赖 * 3:需要增加hive-jdbc,hive-exec的依赖 public class SimpleDemo { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("simpledemo").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaSQLContext sqlCtx = new JavaSQLContext(sc); JavaHiveContext hiveCtx = new JavaHiveContext(sc); // testQueryJson(sqlCtx); // testUDF(sc, sqlCtx); testHive(hiveCtx); sc.stop(); sc.close(); } //测试spark sql直接查询JSON格式的数据 public static void testQueryJson(JavaSQLContext sqlCtx) { JavaSchemaRDD rdd = sqlCtx.jsonFile("file:///D:/tmp/tmp/json.txt"); rdd.printSchema(); // Register the input schema RDD rdd.registerTempTable("account"); JavaSchemaRDD accs = sqlCtx.sql("SELECT address, email,id,name FROM account ORDER BY id LIMIT 10"); List Row result = accs.collect(); for (Row row : result) { System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getInt(2) + "," + row.getString(3)); } JavaRDD String names = accs.map(new Function Row, String () { @Override public String call(Row row) throws Exception { return row.getString(3); } }); System.out.println(names.collect()); }
//测试spark sql的自定义函数 public static void testUDF(JavaSparkContext sc, JavaSQLContext sqlCtx) { // Create a account and turn it into a Schema RDD ArrayList AccountBean accList = new ArrayList AccountBean accList.add(new AccountBean(1, "lily", "lily@163.com", "gz tianhe")); JavaRDD AccountBean accRDD = sc.parallelize(accList); JavaSchemaRDD rdd = sqlCtx.applySchema(accRDD, AccountBean.class); rdd.registerTempTable("acc"); // 编写自定义函数UDF sqlCtx.registerFunction("strlength", new UDF1 String, Integer () { @Override public Integer call(String str) throws Exception { return str.length(); } }, DataType.IntegerType); // 数据查询 List Row result = sqlCtx.sql("SELECT strlength(name),name,address FROM acc LIMIT 10").collect(); for (Row row : result) { System.out.println(row.getInt(0) + "," + row.getString(1) + "," + row.getString(2)); } } //测试spark sql查询hive上面的表 public static void testHive(JavaHiveContext hiveCtx) { List Row result = hiveCtx.sql("SELECT foo,bar,name from pokes2 limit 10").collect(); for (Row row : result) { System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getString(2)); } }
* 使用JavaHiveContext时 * 1:需要在classpath下面增加三个配置文件:hive-site.xml,core-site.xml,hdfs-site.xml * 2:需要增加postgresql或mysql驱动包的依赖 * 3:需要增加hive-jdbc,hive-exec的依赖 public class SimpleDemo { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("simpledemo").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaSQLContext sqlCtx = new JavaSQLContext(sc); JavaHiveContext hiveCtx = new JavaHiveContext(sc); // testQueryJson(sqlCtx); // testUDF(sc, sqlCtx); testHive(hiveCtx); sc.stop(); sc.close(); } //测试spark sql直接查询JSON格式的数据 public static void testQueryJson(JavaSQLContext sqlCtx) { JavaSchemaRDD rdd = sqlCtx.jsonFile("file:///D:/tmp/tmp/json.txt"); rdd.printSchema(); // Register the input schema RDD rdd.registerTempTable("account"); JavaSchemaRDD accs = sqlCtx.sql("SELECT address, email,id,name FROM account ORDER BY id LIMIT 10"); List Row result = accs.collect(); for (Row row : result) { System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getInt(2) + "," + row.getString(3)); } JavaRDD String names = accs.map(new Function Row, String () { @Override public String call(Row row) throws Exception { return row.getString(3); } }); System.out.println(names.collect()); }
//测试spark sql的自定义函数 public static void testUDF(JavaSparkContext sc, JavaSQLContext sqlCtx) { // Create a account and turn it into a Schema RDD ArrayList AccountBean accList = new ArrayList AccountBean accList.add(new AccountBean(1, "lily", "lily@163.com", "gz tianhe")); JavaRDD AccountBean accRDD = sc.parallelize(accList); JavaSchemaRDD rdd = sqlCtx.applySchema(accRDD, AccountBean.class); rdd.registerTempTable("acc"); // 编写自定义函数UDF sqlCtx.registerFunction("strlength", new UDF1 String, Integer () { @Override public Integer call(String str) throws Exception { return str.length(); } }, DataType.IntegerType); // 数据查询 List Row result = sqlCtx.sql("SELECT strlength(name),name,address FROM acc LIMIT 10").collect(); for (Row row : result) { System.out.println(row.getInt(0) + "," + row.getString(1) + "," + row.getString(2)); } } //测试spark sql查询hive上面的表 public static void testHive(JavaHiveContext hiveCtx) { List Row result = hiveCtx.sql("SELECT foo,bar,name from pokes2 limit 10").collect(); for (Row row : result) { System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getString(2)); } }
相关文章
- 【SQL 分析函数】wm_concat 行列转换
- sql server案例总结
- SQL Tune Report–sqltrpt.sql
- hive Spark SQL分析窗口函数
- Spark修炼之道(进阶篇)——Spark入门到精通:第九节 Spark SQL运行流程解析
- Spark SQL JOIN操作代码示例
- SQL GROUP BY 语句
- Sql Server中sql语句自动换行
- VB.net:VB.net编程语言学习之基于VS软件连接SQL Server(利用ADO.NET操作数据库/添加新数据源/DataGridView数据表格控件)的简介、案例应用之详细攻略
- 通过PreparedStatement预防SQL注入
- Spark SQL 源代码分析之 In-Memory Columnar Storage 之 in-memory query
- Spark的Streaming和Spark的SQL简单入门学习
- 【SQL开发实战技巧】系列(十七):数据仓库中时间类型操作(初级)确定两个日期之间的工作天数、计算—年中周内各日期出现次数、确定当前记录和下一条记录之间相差的天数
- SQL注入 Sqli-labs-Less-21(笔记)——还是回显注入 使用union select即可 但是要注意sql括号闭合 也可以报错注入
- 大数据不就是写sql吗?—— Hive:把sql解析后用MapReduce跑 SparkSQL:把sql解析后用Spark跑,比hive快点 Drill/Impala/Presto:交互式查询OLAP Druid/Kylin:强调预计算,同样是OLAP
- 【Apache Spark 】第 4 章Spark SQL 和 DataFrames:内置数据源简介