zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

4种方式优化你的 Flink 应用程序

2023-04-18 12:54:41 时间

原文翻译自 DZone,根据原文意译。

PS: 腾讯云流计算 Oceanus 是大数据实时化分析利器,兼容 Apache Flink 应用程序。新用户可以 1 元购买流计算 Oceanus(Flink) 集群,欢迎读者们体验使用。

Apache Flink 是一个流式数据处理框架。阅读文章以了解如何使您的 Flink 应用程序运行的更快!

Flink 是一个复杂的框架,并提供了许多方法来调整其执行。在本文中,我将展示四种不同的方法来提高 Flink 应用程序的性能。

如果您不熟悉 Flink,您可以阅读其他介绍性文章,如thisthisthis。但是如果你已经熟悉 Apache Flink,这篇文章将帮助你让你的应用程序运行地更快一点。

一、使用 Flink 元组

当你使用groupByjoin、 或keyBy等操作时,Flink 为您提供了许多方式来选择数据集中的键。您可以使用键选择器功能:

// Join movies and ratings datasets
movies.join(ratings)
        // Use movie id as a key in both cases
        .where(new KeySelector<Movie, String>() {
            @Override
            public String getKey(Movie m) throws Exception {
                return m.getId();
            }
        })
        .equalTo(new KeySelector<Rating, String>() {
            @Override
            public String getKey(Rating r) throws Exception {
                return r.getMovieId();
            }
        })

或者您可以在 POJO 类型中指定字段名称:

movies.join(ratings)
    // Use same fields as in the previous example
    .where("id")
    .equalTo("movieId")

但是如果你正在使用 Flink 元组类型,你可以直接指定将用作键的字段元组的位置:

DataSet<Tuple2<String, String>> movies = ...
DataSet<Tuple3<String, String, Double>> ratings = ...

movies.join(ratings)
    // Specify fields positions in tuples
    .where(0)
    .equalTo(1)

最后一个选项将为您提供最佳性能,但可读性如何?这是否意味着您的代码现在看起来像下面这样?

DataSet<Tuple3<Integer, String, Double>> result = movies.join(ratings)
    .where(0)
    .equalTo(0)
    .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,Double>, Tuple3<Integer, String, Double>>() {
        // What is happening here?
        @Override
        public Tuple3<Integer, String, Double> join(Tuple2<Integer, String> first, Tuple2<Integer, Double> second) throws Exception {
            // Some tuples are joined with some other tuples and some fields are returned???
            return new Tuple3<>(first.f0, first.f1, second.f1);
        }
    });

在这种情况下,提高可读性的一个常见习惯用法是创建一个类,该类继承自其中一个TupleX类并为这些字段实现 getter 和 setter。这是Edge类,源于 Flink Gelly 库中的一个类,它包含三个类并扩展了Tuple3该类:

public class Edge<K, V> extends Tuple3<K, K, V> {

    public Edge(K source, K target, V value) {
        this.f0 = source;
        this.f1 = target;
        this.f2 = value;
    }
    
    // Getters and setters for readability
    public void setSource(K source) {
        this.f0 = source;
    }

    public K getSource() {
        return this.f0;
    }
    
    // Also has getters and setters for other fields
    ...
}

二、重用 Flink 对象

另一个可以用来提高 Flink 应用程序性能的选项是在从用户自定义的函数返回数据时使用可变对象。看看这个例子:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
            long changesCount = ...
            // A new Tuple instance is created on every execution
            collector.collect(new Tuple2<>(userName, changesCount));
        }
    }

apply函数的每次执行中我们可以看到,创建了Tuple2类的一个新实例,这会增加垃圾收集器的压力。解决此问题的一种方法是重复的使用同一个实例:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        // Create an instance that we will reuse on every call
        private Tuple2<String, Long> result = new Tuple<>();
    
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
            long changesCount = ...
            
            // Set fields on an existing object instead of creating a new one
            result.f0 = userName;
            // Auto-boxing!! A new Long value may be created
            result.f1 = changesCount;
            
            // Reuse the same Tuple2 object
            collector.collect(result);
        }
    }

上面的例子优化了一点。我们Tuple2在每次调用时都会创建一个新实例,但我们仍然间接地创建了Long类的一个实例。为了解决这个问题,Flink 有许多所谓的值类:IntValueLongValueStringValueFloatValue等。 这些类的目的是提供内置类型的可变版本,以便我们可以在用户定义的函数中重用它们。以下是我们如何使用它们:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        // Create a mutable count instance
        private LongValue count = new IntValue();
        // Assign mutable count to the tuple
        private Tuple2<String, LongValue> result = new Tuple<>("", count);
    
        @Override
        // Notice that now we have a different return type
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, LongValue>> collector) throws Exception {
            long changesCount = ...
            
            // Set fields on an existing object instead of creating a new one
            result.f0 = userName;
            // Update mutable count value
            count.setValue(changesCount);
            
            // Reuse the same tuple and the same LongValue instance
            collector.collect(result);
        }
    }

这个习惯用法在 Flink 库中常用,比如 Flink Gelly。

三、使用函数注解

优化 Flink 应用程序的另一种方法是提供一些有关用户自定义函数对输入数据执行的操作的信息。当Flink 无法解析和理解代码,您可以提供有助于构建更高效执行计划的关键信息。我们可以以下使用三种注解来实现:

  1. @ForwardedFields:指定输入值中的哪些字段保持不变并用于输出值。
  2. @NotForwardedFields:在输出的相同位置指定未保留的字段。
  3. @ReadFields:指定用于计算结果值的字段。您应该只指定在计算中使用的字段,而不仅仅是复制到输出中。

让我们来看看我们如何使用ForwardedFields注解:

// Specify that the first element is copied without any changes
@ForwardedFields("0")
class MyFunction implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
    @Override
    public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
       // Copy first field without change
        return new Tuple2<>(value.f0, value.f1 + 123);
    }
}

这意味着输入元组中的第一个元素没有被改变,它在相同的位置返回。

如果您不更改字段而只是将其移动到不同的位置,也可以使用ForwardedFields注释指定这一点。在下一个示例中,我们交换输入元组中的字段并警告 Flink:

// 1st element goes into the 2nd position, and 2nd element goes into the 1st position
@ForwardedFields("0->1; 1->0")
class SwapArguments implements MapFunction<Tuple2<Long, Double>, Tuple2<Double, Long>> {
    @Override
    public Tuple2<Double, Long> map(Tuple2<Long, Double> value) {
       // Swap elements in a tuple
        return new Tuple2<>(value.f1, value.f0);
    }
}

上面提到的注解只能应用于具有一个输入参数的函数,例如mapflatMap。如果您有两个输入参数,则可以使用分别提供有关第一个和第二个参数的信息的ForwardedFieldsFirstForwardedFieldsSecond注释。

以下是我们如何在JoinFunction接口的实现中使用这些注释:

// Two fields from the input tuple are copied to the first and second positions of the output tuple
@ForwardedFieldsFirst("0; 1")
// The third field from the input tuple is copied to the third position of the output tuple
@ForwardedFieldsSecond("2")
class MyJoin implements JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,Double>, Tuple3<Integer, String, Double>>() {
    @Override
    public Tuple3<Integer, String, Double> join(Tuple2<Integer, String> first, Tuple2<Integer, Double> second) throws Exception {
        return new Tuple3<>(first.f0, first.f1, second.f1);
    }
})

Flink 还为类似的目的提供了NotForwardedFieldsFirstNotForwardedFieldsSecondReadFieldsFirstReadFirldsSecond注解。

四、选择 Join 类型

如果你给 Flink 另一个提示,你可以让你的 join 更快,但在我们讨论它为什么工作之前,让我们先谈谈 Flink 是如何执行 join 的。

Flink 在处理批处理数据时,集群中的每台机器都会存储部分数据。为了执行连接,Apache Flink 需要找到满足连接条件的所有两个数据集对。为此,Flink 首先必须将具有相同键的两个数据集中的项目放在集群中的同一台机器上。为此,有两种策略:

  1. Repartition-repartition 策略:在这种情况下,两个数据集都按其键进行分区并通过网络发送。这意味着如果数据集很大,则可能需要花费大量时间将它们复制到网络中。
  2. Broadcast-forward 策略:在这种情况下,一个数据集保持不变,但第二个数据集被复制到集群中包含第一个数据集一部分的每台机器。

如果您将一个小数据集与一个很大的数据集连接起来,您可以使用 broadcast-forward 策略并避免对第一个数据集进行昂贵的分区的代价。这真的很容易做到:

ds1.join(ds2, JoinHint.BROADCAST_HASH_FIRST) 

这暗示第一个数据集比第二个数据集小得多。

您还可以使用其他连接提示:

  • BROADCAST_HASH_SECOND: 第二个数据集要小得多
  • REPARTITION_HASH_FIRST: 第一个数据集小一点
  • REPARTITION_HASH_SECOND: 第二个数据集小一点
  • REPARTITION_SORT_MERGE: 重新分区两个数据集并使用排序和合并策略
  • OPTIMIZER_CHOOSES:  Flink 优化器将决定如何 Join 数据集

您可以在本文中阅读有关 Flink 如何执行连接的更多信息。

更多信息

我希望你喜欢这篇文章并发现它很有用。

近期我会写更多关于Flink的文章,敬请期待!您可以在此处阅读我的其他文章,也可以查看我的 Pluralsight 课程,其中我更详细地介绍了 Apache Flink:了解 Apache Flink。这是本课程的简短预览