zl程序教程

您现在的位置是:首页 >  后端

当前栏目

《Flink官方文档》Python 编程指南测试版(二)

2023-09-11 14:16:09 时间

最简单的情形是对一个数据集中的元组按照一个或多个域进行分组:

reduced = data \

 .group_by(0) \

 .reduce_group( do something )

数据集中的元组被按照第一个域分组。对于接下来的group-reduce函数,输入的数据组中,每个元组的第一个域都有相同的值。

grouped = data \

 .group_by(0,1) \

 .reduce(/*do something*/)

在上面的例子中,数据集的分组基于第一个和第二个域形成的复合关键字,因此,reduce函数输入数据组中,每个元组两个域的值均相同。
关于嵌套元组需要注意:如果你有一个使用了嵌套元组的数据集,指定group_by( index of tuple )操作,系统将把整个元组作为关键字使用。

向Flink传递函数

一些特定的操作需要采用用户自定义的函数,因此它们都接受lambda表达式和rich functions作为输入参数。

data.filter(lambda x: x 5)
class Filter(FilterFunction):

 def filter(self, value):

 return value 5

data.filter(Filter())

Rich functions可以将函数作为输入参数,允许使用broadcast-variables(广播变量),能够由init()函数参数化,是复杂函数的一个可考虑的实现方式。它们也是在reduce操作中,定义一个可选的combine function的唯一方式。
Lambda表达式可以让函数在一行代码上实现,非常便捷。需要注意的是,如果某个操作会返回多个数值,则其使用的lambda表达式应当返回一个迭代器。(所有函数将接收一个collector输入 参数)。

Flink的Python API目前仅支持python中的基本数据类型(int,float,bool,string)以及byte arrays。
运行环境对数据类型的支持,包括序列化器serializer,反序列化器deserializer,以及自定义类型的类。

class MyObj(object):

 def __init__(self, i):

 self.value = i


Tuples/Lists

你可以使用元组(或列表)来表示复杂类型。Python中的元组可以转换为Flink中的Tuple类型,它们包含数量固定的不同类型的域(最多25个)。每个域的元组可以是基本数据类型,也可以是其他的元组类型,从而形成嵌套元组类型。

word_counts = env.from_elements(("hello", 1), ("world",2))

counts = word_counts.map(lambda x: x[1])

当进行一些要求指定关键字的操作时,例如对数据记录进行分组或配对。通过设定关键字,可以非常便捷地指定元组中各个域的位置。你可以指定多个位置,从而实现复合关键字(更多信息,查阅Section Data Transformations)。

wordCounts \

 .group_by(0) \

 .reduce(MyReduceFunction())

数据源创建了初始的数据集,包括来自文件,以及来自数据接口/集合两种方式。

基于文件的:

read_text(path) – 按行读取文件,并将每一行以String形式返回。 read_csv(path,type) – 解析以逗号(或其他字符)划分数据域的文件。
返回一个包含若干元组的数据集。支持基本的java数据类型作为字段类型。

基于数据集合的:

from_elements(*args) – 基于一系列数据创建一个数据集,包含所有元素。 generate_sequence(from, to) – 按照指定的间隔,生成一系列数据。 Examples
env = get_environment

\# read text file from local files system

localLiens = env.read_text("file:#/path/to/my/textfile")

\# read text file from a HDFS running at nnHost:nnPort

hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")

\# read a CSV file with three fields, schema defined using constants defined in flink.plan.Constants

csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))

\# create a set from some given elements

values = env.from_elements("Foo", "bar", "foobar", "fubar")

\# generate a number sequence

numbers = env.generate_sequence(1, 10000000)

 

数据池可以接收数据集,并被用来存储或返回它们:

write_text() – 按行以String形式写入数据。可通过对每个数据项调用str()函数获取String。 write_csv(…) – 将元组写入逗号分隔数值文件。行数和数据字段均可配置。每个字段的值可通过对数据项调用str()方法得到。 output() – 在标准输出上打印每个数据项的str()字符串。

一个数据集可以同时作为多个操作的输入数据。程序可以在写入或打印一个数据集的同时,对其进行其他的变换操作。

Examples

标准数据池相关方法示例如下:

write DataSet to a file on the local file system

textData.write_text("file:///my/result/on/localFS")

 write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort

textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")

 write DataSet to a file and overwrite the file if it exists

textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)

 tuples as lines with pipe as the separator "a|b|c"

values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")

 this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines

values.write_text("file:///path/to/the/result/file")

 

使用广播变量,能够在使用普通输入参数的基础上,使得一个数据集同时被多个并行的操作所使用。这对于实现辅助数据集,或者是基于数据的参数化法非常有用。这样,数据集就可以以集合的形式被访问。

注册广播变量:广播数据集可通过调用with_broadcast_set(DataSet,String)函数,按照名字注册广播变量。 访问广播变量:通过对调用self.context.get_broadcast_variable(String)可获取广播变量。
class MapperBcv(MapFunction):

 def map(self, value):

 factor = self.context.get_broadcast_variable("bcv")[0][0]

 return value * factor

# 1. The DataSet to be broadcasted

toBroadcast = env.from_elements(1, 2, 3)

data = env.from_elements("a", "b")

# 2. Broadcast the DataSet

data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast)

确保在进行广播变量的注册和访问时,应当采用相同的名字(示例中的”bcv”)。
注意:由于广播变量的内容被保存在每个节点的内部存储中,不适合包含过多内容。一些简单的参数,例如标量值,可简单地通过参数化rich function来实现。

该章节将描述如何在Flink中配置程序的并行执行。一个Flink程序可以包含多个任务(操作,数据源和数据池)。一个任务可以被划分为多个可并行运行的部分,每个部分处理输入数据的一个子集。并行运行的实例数量被称作它的并行性或并行度degree of parallelism (DOP)。
在Flink中可以为任务指定不同等级的并行度。

运行环境级

Flink程序可在一个运行环境execution environment的上下文中运行。一个运行环境为其中运行的所有操作,数据源和数据池定义了一个默认的并行度。运行环境的并行度可通过对某个操作的并行度进行配置来修改。
一个运行环境的并行度可通过调用set_parallelism()方法来指定。例如,为了将WordCount示例程序中的所有操作,数据源和数据池的并行度设置为3,可以通过如下方式设置运行环境的默认并行度。

env = get_environment()

env.set_parallelism(3)

text.flat_map(lambda x,c: x.lower().split()) \

 .group_by(1) \

 .reduce_group(Adder(), combinable=True) \

 .output()

env.execute()

通过设置位于./conf/flink-conf.yaml.文件的parallelism.default属性,改变系统级的默认并行度,可设置所有运行环境的默认并行度。具体细节可查阅Configuration文档。

为了在Flink中运行计划任务,到Flink目录下,运行/bin文件夹下的pyflink.sh脚本。对于python2.7版本,运行pyflink2.sh;对于python3.4版本,运行pyflink3.sh。包含计划任务的脚本应当作为第一个输入参数,其后可添加一些另外的python包,最后,在“-”之后,输入其他附加参数。

./bin/pyflink 2/3 .sh Script [ pathToPackage1 [ pathToPackageX]][ - param1 [ paramX ]]

转载自 并发编程网 - ifeve.com


如何在 Apache Flink 1.10 中使用 Python UDF? 本文将为大家介绍用户如何定义 UDF,并完整展示了如何安装 PyFlink,如何在 PyFlink 中定义/注册/调用 UDF,以及如何执行作业。
如何在 Apache Flink 1.10 中使用 Python UDF? 在刚刚发布的 ApacheFlink 1.10 中,PyFlink 添加了对 Python UDFs 的支持。这意味着您可以从现在开始用 Python 编写 UDF 并扩展系统的功能。此外,本版本还支持 Python UDF 环境和依赖管理,因此您可以在 UDF 中使用第三方库,从而利用 Python 生态丰富的第三方库资源。
如何在 Apache Flink 中使用 Python API? 为大家介绍 Flink Python API 的现状及未来规划,主要内容包括:Apache Flink Python API 的前世今生和未来发展;Apache Flink Python API 架构及开发环境搭建;Apache Flink Python API 核心算子介绍及应用。
Apache Flink 1.9.0 为什么将支持 Python API ? 众所周知,Apache Flink(以下简称 Flink)的 Runtime 是用 Java 编写的,而即将发布的 Apache Flink 1.9.0 版本则会开启新的 ML 接口和新的 flink-python 模块,Flink 为什么要增加对 Python 的支持,想必大家一定好奇。
别开心太早,Python 官方文档的翻译差远了 近几天,很多公众号发布了 Python 官方文档的消息。然而,一个特别奇怪的现象就发生了,让人啼笑皆非。 Python 文档的中文翻译工作一直是“默默无闻”,几个月前,我还吐槽过这件事《再聊聊Python中文社区的翻译》,当时我们的进度是 10.3%,远远落后于日本和法国,甚至落后于巴西! 这次所谓的中文版,当然是未完成翻译的残品。