zl程序教程

您现在的位置是:首页 >  工具

当前栏目

Apache Flink初体验

Apacheflink 初体验
2023-09-11 14:21:09 时间

缘起

  我为什么学习apache flink呢?因为下面一件事:
在这里插入图片描述
  这种事经常出现,flink经常被提及。我不得不注意了,不得不学习了,否则将被市场无情地淘汰。

启动

  我开始时试着从官网直接下载1.15版本然后启动发现总是启动失败。第一个难点是1.15版本不再支持Windows系统了。然后我就是用cygwin启动,看着是没问题哈:
在这里插入图片描述
  但是实际上没有进程,看看日志原来是内存不够大。日志内容是这样的:

Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
Improperly specified VM option 'MaxMetaspaceSize=268435456
'

  我电脑内存是不够的,然后我把内存调小点,又报这个错:
在这里插入图片描述
  算了,不折腾内存配置了。然后我就去找个低版本1.7,下载地址为:https://archive.apache.org/dist/flink/flink-1.7.2/flink-1.7.2-bin-scala_2.12.tgz,下载了下来。然后解压执行就完了:

PS E:\flink\flink-1.7.2\bin> .\start-cluster.bat
Starting a local cluster with one JobManager process and one TaskManager process.
You can terminate the processes via CTRL-C in the spawned shell windows.
Web interface by default on http://localhost:8081/.

  启动成功后,用浏览器试了下,果然可以。
在这里插入图片描述

第一个flink程序

  我的第一个flink程序员其实不需要启动flink服务器。这个程序也是抄网上教程的代码,首先是依赖:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>
    </dependencies>

  然后是java代码:

package com.youngthing.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * data set api demo
 * created at 18/07/2022
 *
 * @author 醒过来摸鱼
 * @since 1.0.0
 */
public class DataSetDemo {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment
                .getExecutionEnvironment();
        final DataSource<String> source = env
                .fromElements("PC", "Web", "Database", "SQL",
                        "I love MySQL");
        final AggregateOperator<Tuple2<String, Integer>> operator =
                source.flatMap(new LineSplitter())
                        .groupBy(0)
                .sum(1);
        operator.print();
    }

    public static class LineSplitter implements FlatMapFunction<String,
            Tuple2<String, Integer>> {

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            final String[] strings = s.split(" ");
            for (String word: strings) {
                collector.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

  执行结果:

(Database,1)
(love,1)
(MySQL,1)
(I,1)
(PC,1)
(SQL,1)
(Web,1)

分析

  上述代码中第一个出现的是ExecutionEnvironment。执行环境分本地环境和远程环境,默认是创建本地环境的。远程环境就是启动的flink服务器了。
  DataSource是Flink的数据源,创建方式主要有fromElements、generateSequence、fromCollection、readFile等等方式。这些照着英文单词意思理解就行了,fromElements代表来自多个元素,generateSequence代表创建序列,fromCollection代表从集合创建,readFile代表读取文件。
  再看flatMap,如果学习过java8,就对map不陌生。Java8的stream api中,提供了一个map方法,是将一个元素转换为另一个元素的。但是这里的flatMap,多了一个flat,英文的意思为“平”。啥意思呢?说白了就是可以将一个转换为多个。Collector负责收集转换后的多条数据。
  接下来的groupBy里有个参数,是0,0是代表转换的Tuple的第一个字段,也就是单词字段。这个函数调用就是将拆分成的单词-数字对分组而已。至于sum,按照SQL的组函数理解就很快悟了。