Apache Flink初体验
文章目录
缘起
我为什么学习apache flink呢?因为下面一件事:
这种事经常出现,flink经常被提及。我不得不注意了,不得不学习了,否则将被市场无情地淘汰。
启动
我开始时试着从官网直接下载1.15版本然后启动发现总是启动失败。第一个难点是1.15版本不再支持Windows系统了。然后我就是用cygwin启动,看着是没问题哈:
但是实际上没有进程,看看日志原来是内存不够大。日志内容是这样的:
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
Improperly specified VM option 'MaxMetaspaceSize=268435456
'
我电脑内存是不够的,然后我把内存调小点,又报这个错:
算了,不折腾内存配置了。然后我就去找个低版本1.7,下载地址为:https://archive.apache.org/dist/flink/flink-1.7.2/flink-1.7.2-bin-scala_2.12.tgz,下载了下来。然后解压执行就完了:
PS E:\flink\flink-1.7.2\bin> .\start-cluster.bat
Starting a local cluster with one JobManager process and one TaskManager process.
You can terminate the processes via CTRL-C in the spawned shell windows.
Web interface by default on http://localhost:8081/.
启动成功后,用浏览器试了下,果然可以。
第一个flink程序
我的第一个flink程序员其实不需要启动flink服务器。这个程序也是抄网上教程的代码,首先是依赖:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.10.0</version>
</dependency>
</dependencies>
然后是java代码:
package com.youngthing.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* data set api demo
* created at 18/07/2022
*
* @author 醒过来摸鱼
* @since 1.0.0
*/
public class DataSetDemo {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
final DataSource<String> source = env
.fromElements("PC", "Web", "Database", "SQL",
"I love MySQL");
final AggregateOperator<Tuple2<String, Integer>> operator =
source.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
operator.print();
}
public static class LineSplitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
final String[] strings = s.split(" ");
for (String word: strings) {
collector.collect(new Tuple2<>(word, 1));
}
}
}
}
执行结果:
(Database,1)
(love,1)
(MySQL,1)
(I,1)
(PC,1)
(SQL,1)
(Web,1)
分析
上述代码中第一个出现的是ExecutionEnvironment。执行环境分本地环境和远程环境,默认是创建本地环境的。远程环境就是启动的flink服务器了。
DataSource是Flink的数据源,创建方式主要有fromElements、generateSequence、fromCollection、readFile等等方式。这些照着英文单词意思理解就行了,fromElements代表来自多个元素,generateSequence代表创建序列,fromCollection代表从集合创建,readFile代表读取文件。
再看flatMap,如果学习过java8,就对map不陌生。Java8的stream api中,提供了一个map方法,是将一个元素转换为另一个元素的。但是这里的flatMap,多了一个flat,英文的意思为“平”。啥意思呢?说白了就是可以将一个转换为多个。Collector负责收集转换后的多条数据。
接下来的groupBy里有个参数,是0,0是代表转换的Tuple的第一个字段,也就是单词字段。这个函数调用就是将拆分成的单词-数字对分组而已。至于sum,按照SQL的组函数理解就很快悟了。
相关文章
- Linux下Apache、PHP、MySQL默认安装路径
- 深入浅出Mesos(六):亲身体会Apache Mesos
- Apache中KeepAlive 配置
- [Django]Windows下Django配置Apache示范设置
- linux下apache服务器的安装、启动、查看
- Apache RocketMQ 入选 SegmentFault 年度中国技术品牌影响力企业榜单!
- Apache Flink 1.10.0 发布 | 云原生生态周报 Vol. 38
- Apache Shiro 身份验证绕过漏洞复现 (cve-2020-1957)
- zookeeper 3.6.2启动报错:找不到或无法加载主类 org.apache.zookeeper.server.quorum.QuorumPeerMain
- Atitit java zip compress use apache tool jar 压缩的问题 static voidzip(java.lang.String zipFileName,
- java 邮件发送 apache commons-email
- 如何在windows环境中搭建apache+subversion(ZT)
- Mybatis 拦截器报错org.apache.ibatis.executor.statement.StatementHandler.prepare(java.sql.Connection)
- Building real-time dashboard applications with Apache Flink, Elasticsearch, and Kibana
- 配置apache、php、mysql之间的关系
- Apache HTTPD 多后缀解析漏洞