zl程序教程

您现在的位置是:首页 >  云平台

当前栏目

Spark 操作练习

Spark 操作 练习
2023-06-13 09:17:25 时间
# coding=utf-8

from pyspark import SparkConf, SparkContext
from pyspark import Row
from pyspark.sql import SparkSession

# 初始化spark,生成一个sparkcontext
sc = SparkContext()

print "======================\n========================\n======================\n"
print "=======firt part======\n"

# 用sc创建一个RDD -- resilient distributed dataset
lines = sc.textFile("D:/spark-2.1.2-bin-hadoop2.7/bin/readme.txt")

# RDD支持转化操作和行动操作
# 转化操作是返回一个新的RDD
# 行动操作是向驱动器程序返回结果,或将结果写入输出,会触发实际的计算

# 转化操作例子:filter
pyline = lines.filter(lambda line: "a" in line)

# 行动操作:
c = pyline.first()
count = pyline.count()
print c
print count

sq_only = lines.distinct()
print sq_only.collect()

print "=======second part======\n"

nums = sc.parallelize([1, 2, 3, 4, 4, 4])

sq_rdd = nums.map(lambda x: x * x)
sq = sq_rdd.collect()  # map是转化操作,collect是行动操作

# 注意:collect用于获取整个RDD的数据,只有确保本地机器可以放得下所有数据时才可以使用该函数

for i in sq:
    print i

nums_2 = sc.parallelize([4, 5, 6, 7])

#  union() 生成一个包含两个RDD中所有元素的RDD
number_all = nums.union(nums_2).distinct()
print type(number_all)
for i in number_all.collect():
    print i
# intersection() 求两个RDD共同元素的RDD
number_in = nums.intersection(nums_2)

number_dis = number_all.subtract(nums_2)  # number_all没有变化

print number_dis.collect()

print "=======third part======\n"

lin2 = sc.parallelize(["hello message", "hi fank", "one"])
# flatmap 将函数应用于RDD中的每一个元素,将返回的迭代器的所有内容构成新的RDD
words = lin2.flatMap(lambda line: line.split(" "))

# 计数
print words.count()
print words.collect()

# 求和
sum_num = number_all.reduce(lambda x, y: x + y)
print sum_num

# 统计
value_cnt = nums.countByValue()
print value_cnt

print "=======fourth part======\n"
# 键值对操作
# 用map生成一个键值对

pairs = lines.map(lambda x: (x.split(" ")[0], x))

pairs_1 = sc.parallelize([('c', 7), ('b', 1), ('d', 3)])
pairs2 = sc.parallelize([('a', 3), ('b', 4), ('a', 1), ('c', 6)])

# 合并相同键的值
pairs_3 = pairs2.reduceByKey(lambda x, y: x + y)
print pairs_3.collect()

# 按键值分组
pairs_4 = pairs2.groupByKey()
print pairs_4.collect()

# 对每个值应用函数
pairs_5 = pairs2.mapValues(lambda x: x ** 2)
print pairs_5.collect()

# 获取返回key值的RDD

pairs_key = pairs2.keys()
print pairs_key.collect()

values = pairs2.values()

# 聚合

pair_animal = sc.parallelize([('panda', 0), ('pink', 3), ('pirate', 3), ('panda', 1), ('pink', 4)])
# 统计pair rdd中每个键对应的值的和并计数,可用于求平均
animal_a = pair_animal.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
animal_avg = animal_a.mapValues(lambda x: x[0] / float(x[1]))

print animal_avg.collect()

print animal_avg.sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x: str(x)).collect()

'''
# 针对2个 pari RDD 的转化操作

# substractByKey ,删掉RDD中与other RDD 键相同的元素


# join
pairs_all=pairs_1.join(pairs2)
for i in pairs_all.collect():
    print i[1]               # (’c',(7,6))

# rightOuterJoin  右外连接
pairs_right=pairs_1.rightOuterJoin(pairs2)
for i in pairs_right.collect():
    print i[1]

# lefOuterJoin 左外连接
pairs_left=pairs_1.leftOuterJoin(pairs2)
for i in pairs_left.collect():
    print i[1]

# 从hdfs获取文件

# hdfs_file = sc.textFile(
#     "hdfs://sh.hdfs.cr.ied.com:9000/tdw-transfer-data/ihocpro/20171030111201736/train_result/2017112301/model_param.csv")
# print "======test HDFS====="
# print hdfs_file.collect()

# DataFrame及spark sql
# 从文件生成DataFrame

# 用sc创建一个RDD -- resilient distributed dataset
table_rdd = sc.textFile("D:/spark-2.1.2-bin-hadoop2.7/bin/people.txt")
people_sp = table_rdd.map(lambda r: r.split(" "))
people = people_sp.map(lambda p: Row(name=p[0], age=int(p[1]),country=p[2]))

# 创建DataFrame的方法
# 首先创建一个sparksession,不然没有toDF方法
print hasattr(table_rdd,"toDF") #验证rdd是否有toDF方法
spark=SparkSession(sc)
print hasattr(table_rdd,"toDF")


# 方法1:toDF()
df_people = people.toDF()
print df_people.show()
'''
+---+-----+
|age| name|
+---+-----+
| 33|  jim|
| 34|  tom|
| 23|alice|
| 19|gorge|
| 41|saddy|
| 55|marry|
+---+-----+
'''
# 创建datafram的方法二:createDataFrame

df_people2=spark.createDataFrame(people)
# 建立视图
df_people2.createOrReplaceTempView("people")
# 执行sql查询
print spark.sql("select name,age from people where age >30").show()
'''
+-----+---+
| name|age|
+-----+---+
|  jim| 33|
|  tom| 34|
|saddy| 41|
|marry| 55|
+-----+---+
'''
df_people2.groupBy("country")
#执行sql会产生新的dataframe
group_p=spark.sql("select country,count(name) from people group by country")
print group_p.show()
'''
+-------+-----------+
|country|count(name)|
+-------+-----------+
| Janpan|          2|
|  Frach|          1|
|America|          1|
|  china|          2|
|England|          1|
+-------+-----------+
'''
# dataframe转化为rdd
print group_p.rdd.collect()

# 获取列
print group_p.select(group_p.country).alias("CON").collect()
#