zl程序教程

您现在的位置是:首页 >  其他

当前栏目

pyspark入门教程(比较全面)

入门教程 比较 全面 Pyspark
2023-09-14 09:01:54 时间
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])

# 统计元素个数
# counts = words.count()
# print("Number of elements in RDD -> %i" % counts)

# 打印所有元素
# coll = words.collect()
# print("Elements in RDD -> %s" % coll)

# foreach(func)打印所有元素
# print(words.foreach(lambda x:print(x)))

# filter(f) 返回一个包含元素的新RDD,它满足过滤器内部的功能。在下面的示例中,我们过滤掉包含’'spark’的字符串。
# print(words.filter(lambda x:"spark" in x).collect())

# map(f, preservesPartitioning = False)通过将该函数应用于RDD中的每个元素来返回新的RDD。
# 在下面的示例中,我们形成一个键值对,并将每个字符串映射为值1
# print(words.map(lambda x:(x,1)).collect())


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/22 14:47:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
# reduce(f) 执行指定的可交换和关联二元操作后,将返回RDD中的元素。
# 在下面的示例中,我们从运算符导入add包并将其应用于’num’以执行简单的加法运算。
# 说白了和Python的reduce一样:假如有一组整数[x1,x2,x3],利用reduce执行加法操作add,对第一个元素执行add后,结果为sum=x1,
# 然后再将sum和x2执行add,sum=x1+x2,最后再将x2和sum执行add,此时sum=x1+x2+x3。

from pyspark import SparkContext
from operator import add
sc=SparkContext("local","hello pyspark")
nums=sc.parallelize([1,2,3,4,5])
print(nums.reduce(add))
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/22 14:48:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


15
# join(other, numPartitions = None) 它返回RDD,其中包含一对带有匹配键的元素以及该特定键的所有值。
from pyspark import SparkContext
sc=SparkContext("local","join test")
rdd1=sc.parallelize([("hadoop",1),("spark",2)])
rdd2=sc.parallelize([("hadoop",2),("spark",4)])
rdd3=rdd1.join(rdd2)
print(rdd3.collect())
# distinct() 去重操作
from pyspark import SparkContext
sc=SparkContext("local","distinct test")
rdd=sc.parallelize([1,1,2,3,3,4,5])
print(rdd.distinct().collect())
# randomSplit() randomSplit运算将整个集合以随机数的方式按照比例分为多个RDD,比如按照0.4和0.6的比例将intRDD分为两个RDD,并输出
from pyspark import SparkContext
sc=SparkContext("local","randomSplit test")
rdd1=sc.parallelize([1,2,3,4,5,6,3,2,5,8,7,5,6,4,9])
rdd2=rdd1.randomSplit([0.4,0.6])
print(len(rdd2))
print("="*20)
print(rdd2[0].collect())
print(rdd2[1].collect())
# groupBy() groupBy运算可以按照传入匿名函数的规则,将数据分为多个Array。比如下面的代码将intRDD分为偶数和奇数
from pyspark import SparkContext
sc=SparkContext("local","groupBy test")
rdd1=sc.parallelize([1,2,3,4,5,6,3,2,5,8,7,5,6,4,9])
rdd2=rdd1.groupBy(lambda x:x%2)
print(sorted([(x, sorted(y)) for (x, y) in rdd2.collect()]))
str="[(1,[6,7,8,9]),(2,[9,6,7,8,9])]"
for(x,y) in eval(str):
    print("%s==>%s"%(x,y))
1==>[6, 7, 8, 9]
2==>[9, 6, 7, 8, 9]
str="[(1,[6,7,8,9]),(2,[9,6,7,8,9])]"
print(sorted([(x,sorted(y)) for(x,y) in eval(str)]))
[(1, [6, 7, 8, 9]), (2, [6, 7, 8, 9, 9])]
from pyspark import SparkContext
sc=SparkContext("local","union")
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([1,2,11,22,33,44,55])
rddunion=rdd1.union(rdd2)
print(rddunion.collect())
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/22 15:10:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[1, 2, 3, 4, 5, 1, 2, 11, 22, 33, 44, 55]
from pyspark import SparkContext
sc=SparkContext("local","union")
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([1,2,11,22,33,44,55])
rddintersect=rdd1.intersection(rdd2)
print(rddintersect.collect())
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/22 15:21:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[2, 1]
from pyspark import SparkContext
sc=SparkContext("local","union")
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([1,2,11,22,33,44,55])
rddsubtract=rdd1.subtract(rdd2)
print(rddsubtract.collect())
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/22 15:24:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[4, 3, 5]
from pyspark import SparkContext
sc=SparkContext("local","union")
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([1,2,11,22,33,44,55])
rddcartesian=rdd1.cartesian(rdd2)
print(rddcartesian.collect())
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/22 15:35:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[(1, 1), (1, 2), (1, 11), (1, 22), (1, 33), (1, 44), (1, 55), (2, 1), (2, 2), (2, 11), (2, 22), (2, 33), (2, 44), (2, 55), (3, 1), (3, 2), (3, 11), (3, 22), (3, 33), (3, 44), (3, 55), (4, 1), (4, 2), (4, 11), (4, 22), (4, 33), (4, 44), (4, 55), (5, 1), (5, 2), (5, 11), (5, 22), (5, 33), (5, 44), (5, 55)]

参考链接:https://blog.csdn.net/wapecheng/article/details/107472312