zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Python小案例(十)利用PySpark循环写入数据

Python案例循环数据 利用 写入 Pyspark
2023-06-13 09:17:15 时间

Python小案例(十)利用PySpark循环写入数据

在做数据分析的时候,往往需要回溯历史数据。但有时候构建历史数据时需要变更参数重复跑数,公司的数仓调度系统往往只支持日期这一个参数,而且为临时数据生产调度脚本显得有点浪费。这个时候就可以结合python的字符串格式化和PySpark的Hive写入,就可以完成循环写入临时数据。

⚠️注意:以下需要在企业服务器上的jupyter上操作,本地jupyter是无法连接企业hive集群的

案例一:多参数循环写入临时表

案例背景:写入每天的热搜数据,热搜类型分为当日、近1日、近2日、近3日。这里为了方便,简化了循环的力度。

from pyspark.sql import *
# spark配置
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.executor.instances", "20") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .enableHiveSupport() \
    .getOrCreate()

# 导入其他相关库
import pandas as pd
from datetime import datetime
# sql创建临时表
sql_create = '''
CREATE TABLE temp.loop_write_example
    (
        cnt string comment "近n日cnt"
    )
PARTITIONED BY (`point_date` string, `dtype` int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''

spark.sql(sql_create)
DataFrame[]

构造日期'{dt}'和热搜类型{num}两个参数

# sql写入临时表
sql_insert = '''
insert overwrite table temp.loop_write_example partition (point_date = '{dt}',dtype={num})


select
    sum(if(dt between date_add('{dt}',-{num}) and '{dt}',cnt,null)) as cnt
from
    temp.loop_write_example_fake_data
where
    dt between date_add('{dt}',-4) and '{dt}'
'''
dates = pd.date_range('2021-01-01','2021-01-10').strftime("%Y-%m-%d").to_list() # 日期范围
# 循环写入临时表
for point_date in dates:
    if point_date>='2021-01-01' and point_date<'2021-01-03':
        for dtype in range(0,4):
            start_time = datetime.now()
            spark.sql(sql_insert.format(dt=point_date, num=dtype))
            end_time=datetime.now()
            print (point_date, dtype, "succeed", '耗时'+str((end_time-start_time).seconds)+'秒')
2021-01-01 0 succeed 耗时8秒
2021-01-01 1 succeed 耗时7秒
2021-01-01 2 succeed 耗时8秒
2021-01-01 3 succeed 耗时8秒
2021-01-02 0 succeed 耗时8秒
2021-01-02 1 succeed 耗时8秒
2021-01-02 2 succeed 耗时8秒
2021-01-02 3 succeed 耗时8秒

案例二:并发批量写入hdfs

案例背景:将2亿+题目按规则分批写入hdfs,供研发通过接口查询,每个hdfs要求最大1000w。

from pyspark.sql import *
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.executor.instances", "20") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .enableHiveSupport() \
    .getOrCreate()

import math
import pandas as pd
from datetime import datetime
import time
import os
# 为了方便,通过规则生成的数据存入临时表temp.hh_qids中,规则细节无需了解
# 查看数据量级
df_cnt = spark.sql('select count(1) as cnt from temp.hh_qids').toPandas()
N = df_cnt['cnt'].loc[0] # 获取数据量级
print(N)
273230858
# 创建表,通过参数i生成表后缀
creat_sql = '''
CREATE TABLE IF NOT EXISTS temp.hh_mult_write_{i}
    (
        questionid string comment "题目ID"
    )
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''
# 写入表,写入上述创建的临时表
insert_sql = '''
insert overwrite table temp.hh_mult_write_{i}

select
    questionid
from
    temp.hh_qids
where
    ceil(rn/10000000)={i}
order by
    questionid
limit 100000000
'''
  • 循环写入
%%time

# 通过循环创建多个临时表并写入
for i in range(1,math.ceil(N/10000000)+1):
    start_time = datetime.now()
    
    spark.sql(creat_sql.format(i=i)) # 创建表
    spark.sql(insert_sql.format(i=i)) # 写入表
    
    end_time=datetime.now()
    
    print(f"成功写入hh_mult_write_{i},"+'耗时'+str((end_time-start_time).seconds)+'秒')
成功写入hh_mult_write_1,耗时38秒
成功写入hh_mult_write_2,耗时59秒
成功写入hh_mult_write_3,耗时36秒
成功写入hh_mult_write_4,耗时34秒
成功写入hh_mult_write_5,耗时29秒
成功写入hh_mult_write_6,耗时26秒
成功写入hh_mult_write_7,耗时44秒
成功写入hh_mult_write_8,耗时43秒
成功写入hh_mult_write_9,耗时32秒
成功写入hh_mult_write_10,耗时49秒
成功写入hh_mult_write_11,耗时33秒
成功写入hh_mult_write_12,耗时34秒
成功写入hh_mult_write_13,耗时38秒
成功写入hh_mult_write_14,耗时24秒
成功写入hh_mult_write_15,耗时40秒
成功写入hh_mult_write_16,耗时34秒
成功写入hh_mult_write_17,耗时39秒
成功写入hh_mult_write_18,耗时45秒
成功写入hh_mult_write_19,耗时50秒
成功写入hh_mult_write_20,耗时35秒
成功写入hh_mult_write_21,耗时46秒
成功写入hh_mult_write_22,耗时38秒
成功写入hh_mult_write_23,耗时29秒
成功写入hh_mult_write_24,耗时31秒
成功写入hh_mult_write_25,耗时28秒
成功写入hh_mult_write_26,耗时36秒
成功写入hh_mult_write_27,耗时32秒
成功写入hh_mult_write_28,耗时17秒
CPU times: user 124 ms, sys: 31.8 ms, total: 156 ms
Wall time: 17min 15s

这次通过大量级数据实战演示,可以发现效率还可以,写入28个文件仅需17min 15s。但日常业务中可能存在更复杂的写入或者更大的量级,那有没有办法提高效率呢? 大家都知道python的循环是单线程的,在一次循环结束前是不会调起下次循环的。而调度系统一般也可以支持并发,那python是不是也能通过并发实现多线程呢?当然可以了,方法有不少,但我实验后发现还是joblib好用。

这里通过一个简单的小case演示joblib的效果

# 查看集群服务器cpu数量
print(os.cpu_count())

48

%%time

# 查看简单循环的执行时间:15s
for i in range(5):
 for j in range(3):
     time.sleep(1)
     print(i*j)

0 0 0 0 1 2 0 2 4 0 3 6 0 4 8 CPU times: user 12.2 ms, sys: 6.18 ms, total: 18.3 ms Wall time: 15 s

%%time

# 查看多线程下的执行时间:1.35s(好家伙,快了10倍多!)
from joblib import Parallel, delayed

def product2(x,y):
 time.sleep(1)
 return x*y

# n_jobs=-1表示使用全部cpu
Parallel(n_jobs=-1)(delayed(product2)(i,j) for i in range(5) for j in range(3))

CPU times: user 111 ms, sys: 233 ms, total: 344 ms Wall time: 1.35 s

[0, 0, 0, 0, 1, 2, 0, 2, 4, 0, 3, 6, 0, 4, 8]

大家可以看到,提速效果还是杠杠滴,那实际应用会不会也如此优秀呢?

  • 并发写入
# 构造函数-将单次循环的主要过程包装成函数以便Parallel调用
def creat_insert(i):
    start_time = datetime.now()
    
    spark.sql(creat_sql.format(i=i)) # 创建表
    spark.sql(insert_sql.format(i=i)) # 写入表
    
    end_time=datetime.now()
    
    print_str = f"成功写入hh_mult_test_{i},"+'耗时'+str((end_time-start_time).seconds)+'秒'
    return print_str
%%time

# 并发写入
from joblib import Parallel, delayed

# 集群服务器大家都在用,在做大任务处理时,不建议使用全部cpu,这里使用一半足矣
Parallel(n_jobs=24, prefer="threads")(delayed(creat_insert)(i) for i in range(1,math.ceil(N/10000000)+1))
CPU times: user 87.6 ms, sys: 18.8 ms, total: 106 ms
Wall time: 1min 49s

['成功写入hh_mult_test_1,耗时44秒',
 '成功写入hh_mult_test_2,耗时41秒',
 '成功写入hh_mult_test_3,耗时83秒',
 '成功写入hh_mult_test_4,耗时49秒',
 '成功写入hh_mult_test_5,耗时89秒',
 '成功写入hh_mult_test_6,耗时71秒',
 '成功写入hh_mult_test_7,耗时89秒',
 '成功写入hh_mult_test_8,耗时72秒',
 '成功写入hh_mult_test_9,耗时83秒',
 '成功写入hh_mult_test_10,耗时77秒',
 '成功写入hh_mult_test_11,耗时80秒',
 '成功写入hh_mult_test_12,耗时65秒',
 '成功写入hh_mult_test_13,耗时53秒',
 '成功写入hh_mult_test_14,耗时109秒',
 '成功写入hh_mult_test_15,耗时81秒',
 '成功写入hh_mult_test_16,耗时73秒',
 '成功写入hh_mult_test_17,耗时41秒',
 '成功写入hh_mult_test_18,耗时78秒',
 '成功写入hh_mult_test_19,耗时84秒',
 '成功写入hh_mult_test_20,耗时93秒',
 '成功写入hh_mult_test_21,耗时68秒',
 '成功写入hh_mult_test_22,耗时78秒',
 '成功写入hh_mult_test_23,耗时48秒',
 '成功写入hh_mult_test_24,耗时88秒',
 '成功写入hh_mult_test_25,耗时54秒',
 '成功写入hh_mult_test_26,耗时59秒',
 '成功写入hh_mult_test_27,耗时62秒',
 '成功写入hh_mult_test_28,耗时37秒']

可以看到,每个文件的写入时间与循环差不多,都是在60秒左右。但整体只花了1min 49s,快了10倍以上。

  • 删除测试数据
%%time

# 测试数据量较大,无端占用公司资源是不对的,所以需要删除下。
# 但要我手动一个个删除那也是不可能的,做个简单的for循环即可
for i in range(1,29):
    drop_sql='''
    DROP TABLE IF EXISTS temp.hh_mult_test_1{i};
    '''
    
    spark.sql(drop_sql.format(i=i)) # 删除表
CPU times: user 3.94 ms, sys: 1.96 ms, total: 5.91 ms
Wall time: 148 ms

总结

至此,python小案例系列也结束了,案例基本来源于我的日常业务。在处理复杂需求,提升工作效率方面,Python还是有一席之地的。不知道大家有没有什么实用的python处理日常需求的小案例呢?

共勉~