zl程序教程

您现在的位置是:首页 >  Python

当前栏目

《Flink官方文档》Python 编程指南测试版(二)

2023-03-14 22:29:55 时间

为元组定义keys

最简单的情形是对一个数据集中的元组按照一个或多个域进行分组:

reduced = data \
  .group_by(0) \
  .reduce_group(<do something>)

数据集中的元组被按照第一个域分组。对于接下来的group-reduce函数,输入的数据组中,每个元组的第一个域都有相同的值。

grouped = data \
  .group_by(0,1) \
  .reduce(/*do something*/)

在上面的例子中,数据集的分组基于第一个和第二个域形成的复合关键字,因此,reduce函数输入数据组中,每个元组两个域的值均相同。
关于嵌套元组需要注意:如果你有一个使用了嵌套元组的数据集,指定group_by(<index of tuple>)操作,系统将把整个元组作为关键字使用。

向Flink传递函数

一些特定的操作需要采用用户自定义的函数,因此它们都接受lambda表达式和rich functions作为输入参数。

data.filter(lambda x: x > 5)
class Filter(FilterFunction):
    def filter(self, value):
        return value > 5

data.filter(Filter())

Rich functions可以将函数作为输入参数,允许使用broadcast-variables(广播变量),能够由init()函数参数化,是复杂函数的一个可考虑的实现方式。它们也是在reduce操作中,定义一个可选的combine function的唯一方式。
Lambda表达式可以让函数在一行代码上实现,非常便捷。需要注意的是,如果某个操作会返回多个数值,则其使用的lambda表达式应当返回一个迭代器。(所有函数将接收一个collector输入 参数)。

数据类型

Flink的Python API目前仅支持python中的基本数据类型(int,float,bool,string)以及byte arrays。
运行环境对数据类型的支持,包括序列化器serializer,反序列化器deserializer,以及自定义类型的类。

class MyObj(object):
    def __init__(self, i):
        self.value = i


class MySerializer(object):
    def serialize(self, value):
        return struct.pack(">i", value.value)


class MyDeserializer(object):
    def _deserialize(self, read):
        i = struct.unpack(">i", read(4))[0]
        return MyObj(i)


env.register_custom_type(MyObj, MySerializer(), MyDeserializer())

Tuples/Lists

你可以使用元组(或列表)来表示复杂类型。Python中的元组可以转换为Flink中的Tuple类型,它们包含数量固定的不同类型的域(最多25个)。每个域的元组可以是基本数据类型,也可以是其他的元组类型,从而形成嵌套元组类型。

word_counts = env.from_elements(("hello", 1), ("world",2))

counts = word_counts.map(lambda x: x[1])

当进行一些要求指定关键字的操作时,例如对数据记录进行分组或配对。通过设定关键字,可以非常便捷地指定元组中各个域的位置。你可以指定多个位置,从而实现复合关键字(更多信息,查阅Section Data Transformations)。

wordCounts \
    .group_by(0) \
    .reduce(MyReduceFunction())

数据源

数据源创建了初始的数据集,包括来自文件,以及来自数据接口/集合两种方式。

基于文件的:

  • read_text(path) – 按行读取文件,并将每一行以String形式返回。
  • read_csv(path,type) – 解析以逗号(或其他字符)划分数据域的文件。
    返回一个包含若干元组的数据集。支持基本的java数据类型作为字段类型。

基于数据集合的:

  • from_elements(*args) – 基于一系列数据创建一个数据集,包含所有元素。
  • generate_sequence(from, to) – 按照指定的间隔,生成一系列数据。

Examples

env  = get_environment

\# read text file from local files system
localLiens = env.read_text("file:#/path/to/my/textfile")

\# read text file from a HDFS running at nnHost:nnPort
hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")

\# read a CSV file with three fields, schema defined using constants defined in flink.plan.Constants
csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))

\# create a set from some given elements
values = env.from_elements("Foo", "bar", "foobar", "fubar")

\# generate a number sequence
numbers = env.generate_sequence(1, 10000000)

 

数据池

数据池可以接收数据集,并被用来存储或返回它们:

  • write_text() – 按行以String形式写入数据。可通过对每个数据项调用str()函数获取String。
  • write_csv(…) – 将元组写入逗号分隔数值文件。行数和数据字段均可配置。每个字段的值可通过对数据项调用str()方法得到。
  • output() – 在标准输出上打印每个数据项的str()字符串。

一个数据集可以同时作为多个操作的输入数据。程序可以在写入或打印一个数据集的同时,对其进行其他的变换操作。

Examples

标准数据池相关方法示例如下:

write DataSet to a file on the local file system
textData.write_text("file:///my/result/on/localFS")

 write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")

 write DataSet to a file and overwrite the file if it exists
textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)

 tuples as lines with pipe as the separator "a|b|c"
values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")

 this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.write_text("file:///path/to/the/result/file")

 

广播变量

使用广播变量,能够在使用普通输入参数的基础上,使得一个数据集同时被多个并行的操作所使用。这对于实现辅助数据集,或者是基于数据的参数化法非常有用。这样,数据集就可以以集合的形式被访问。

  • 注册广播变量:广播数据集可通过调用with_broadcast_set(DataSet,String)函数,按照名字注册广播变量。
  • 访问广播变量:通过对调用self.context.get_broadcast_variable(String)可获取广播变量。
class MapperBcv(MapFunction):
    def map(self, value):
        factor = self.context.get_broadcast_variable("bcv")[0][0]
        return value * factor

# 1. The DataSet to be broadcasted
toBroadcast = env.from_elements(1, 2, 3)
data = env.from_elements("a", "b")

# 2. Broadcast the DataSet
data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast)

确保在进行广播变量的注册和访问时,应当采用相同的名字(示例中的”bcv”)。
注意:由于广播变量的内容被保存在每个节点的内部存储中,不适合包含过多内容。一些简单的参数,例如标量值,可简单地通过参数化rich function来实现。

并行执行

该章节将描述如何在Flink中配置程序的并行执行。一个Flink程序可以包含多个任务(操作,数据源和数据池)。一个任务可以被划分为多个可并行运行的部分,每个部分处理输入数据的一个子集。并行运行的实例数量被称作它的并行性或并行度degree of parallelism (DOP)。
在Flink中可以为任务指定不同等级的并行度。

运行环境级

Flink程序可在一个运行环境execution environment的上下文中运行。一个运行环境为其中运行的所有操作,数据源和数据池定义了一个默认的并行度。运行环境的并行度可通过对某个操作的并行度进行配置来修改。
一个运行环境的并行度可通过调用set_parallelism()方法来指定。例如,为了将WordCount示例程序中的所有操作,数据源和数据池的并行度设置为3,可以通过如下方式设置运行环境的默认并行度。

env = get_environment()
env.set_parallelism(3)

text.flat_map(lambda x,c: x.lower().split()) \
    .group_by(1) \
    .reduce_group(Adder(), combinable=True) \
    .output()

env.execute()

系统级

通过设置位于./conf/flink-conf.yaml.文件的parallelism.default属性,改变系统级的默认并行度,可设置所有运行环境的默认并行度。具体细节可查阅Configuration文档。

执行方法

为了在Flink中运行计划任务,到Flink目录下,运行/bin文件夹下的pyflink.sh脚本。对于python2.7版本,运行pyflink2.sh;对于python3.4版本,运行pyflink3.sh。包含计划任务的脚本应当作为第一个输入参数,其后可添加一些另外的python包,最后,在“-”之后,输入其他附加参数。

./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]