pyspark dataframe 常用操作
spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能。当然主要对类SQL的支持。
在实际工作中会遇到这样的情况,主要是会进行两个数据集的筛选、合并,重新入库。
首先加载数据集,然后在提取数据集的前几行过程中,才找到limit的函数。
而合并就用到union函数,重新入库,就是registerTemple注册成表,再进行写入到HIVE中。
1、union、unionAll、unionByName,row 合并(上下拼接)
data_all = data_neg.unionByName(data_pos)
2、dataframe 样本抽样
data_all.sample(False, 0.5, 1000).count()
3、条件过滤
data_all.filter("label >= 1").count()
4、注册为临时表,再使用spark.sql 对dataframe进行操作
res = predictions.select("user_log_acct", split_udf('probability').alias('probability'))
res.registerTempTable("tmp")
spark.sql("insert overwrite table dev.dev_result_temp select user_log_acct,probability from tmp")
spark.stop()
创建和保存spark dataframe:
spark.createDataFrame(data, schema=None, samplingRatio=None),直接创建
其中data是行或元组或列表或字典的RDD、list、pandas.DataFrame。
df = spark.createDataFrame([ (1, 144.5, 5.9, 33, 'M'), (2, 167.2, 5.4, 45, 'M'), (3, 124.1, 5.2, 23, 'F'), (4, 144.5, 5.9, 33, 'M'), (5, 133.2, 5.7, 54, 'F'), (3, 124.1, 5.2, 23, 'F'), (5, 129.2, 5.3, 42, 'M'), ], ['id', 'weight', 'height', 'age', 'gender']) #直接创建Dataframe df = spark.createDataFrame([{'name':'Alice','age':1}, {'name':'Polo','age':1}]) #从字典创建 schema = StructType([ StructField("id", LongType(), True), StructField("name", StringType(), True), StructField("age", LongType(), True), StructField("eyeColor", StringType(), True) ]) df = spark.createDataFrame(csvRDD, schema) #指定schema。
spark.read 从文件中读数据
>>> airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t') >>> rdd = sc.textFile('python/test_support/sql/ages.csv') #可以用这种方法将用逗号分隔的rdd转为dataframe >>> df2 = spark.read.csv(rdd) >>> df = spark.read.format('json').load('python/test_support/sql/people.json') >>> df1 = spark.read.json('python/test_support/sql/people.json') >>> df1.dtypes [('age', 'bigint'), ('name', 'string')] >>> rdd = sc.textFile('python/test_support/sql/people.json') >>> df2 = spark.read.json(rdd) >>> df = spark.read.text('python/test_support/sql/text-test.txt') >>> df.collect() [Row(value='hello'), Row(value='this')] >>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True) >>> df.collect() [Row(value='hello\nthis')]
Spark function
1)foreach(f),应用f函数,将df的每一行作为f函数的输入
例如:
def f(person):
print(person.name)
df.foreach(f)
2) apply(udf)
3) map(f),应用f函数,作用对象为rdd的每一行
参考:https://blog.csdn.net/kittyzc/article/details/82862089
相关文章
- Linux服务器管理员操作
- 【说站】JavaScript属性描述对象的操作
- Redis相关特性-多数据库及key常用操作
- Vim编辑器及常用操作
- Guava中这些Map的骚操作,让我的代码量减少了50%
- MongoDB入门到进阶笔记:03-MongoDB的客户端使用 常用操作
- mongodb 数据库操作–备份 还原 导出 导入
- Json数据常用操作详解编程语言
- RandomUtil随机操作的相关的工具类常用方法详解编程语言
- 卸载深度操作系:Linux快速复原(卸载深度linux)
- 深入探究MongoDB中的时间操作(mongodb时间)
- MySQL查询中的替换操作(mysql查询替换)
- Oracle触发器:行动事件触发,实现高效数据库操作。(oracle触发器行)
- 深入剖析MySQL,优化数据库操作,提高效率与安全性的好处(mysql好处)
- MySQL修改记录查询常用操作方案(mysql中修改记录查询)
- 使用JSTL操作Oracle数据库(jstl oracle)
- MySQL三表删除操作详解(mysql 三表删除)
- 操作Oracle数据库中常用的连接操作简介(oracle中常用的连接)
- 的字段Oracle中操作带时分秒字段的实用方法(oracle中带时分秒)
- Redis的性能消耗及其优化操作(redis耗性能操作)
- FileSystem对象常用的文件操作函数有哪些?
- extJs常用到的增,删,改,查操作代码
- jdbc操作数据库的基本流程详解
- jquery常用操作小结