Flink(44):Flink之TableAPI和FlinkSQL的案例一
2023-09-14 09:01:24 时间
目录
0. 相关文章链接
1. 需求
将DataStream注册为Table和View并进行SQL统计
2. 代码
package cn.itcast.sql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Arrays;
import static org.apache.flink.table.api.Expressions.$;
public class FlinkSQL_Table_Demo01 {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
DataStream<Order> orderA = env.fromCollection(Arrays.asList(
new Order(1L, "beer", 3),
new Order(1L, "diaper", 4),
new Order(3L, "rubber", 2)));
DataStream<Order> orderB = env.fromCollection(Arrays.asList(
new Order(2L, "pen", 3),
new Order(2L, "rubber", 3),
new Order(4L, "beer", 1)));
//3.注册表
// convert DataStream to Table
Table tableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"));
// register DataStream as Table
tEnv.createTemporaryView("OrderB", orderB, $("user"), $("product"), $("amount"));
//4.执行查询
System.out.println(tableA);
// union the two tables
Table resultTable = tEnv.sqlQuery(
"SELECT * FROM " + tableA + " WHERE amount > 2 " +
"UNION ALL " +
"SELECT * FROM OrderB WHERE amount < 2"
);
//5.输出结果
DataStream<Order> resultDS = tEnv.toAppendStream(resultTable, Order.class);
resultDS.print();
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
public Long user;
public String product;
public int amount;
}
}
注:其他相关文章链接由此进 -> Flink文章汇总
相关文章
- 使用pip安装软件时各种失败案例
- Ajax基本案例详解之$.get的实现
- Mybatis+mysql动态分页查询数据案例——房屋信息的接口(IHouseDao)
- 性能调优之案例分析
- 数据挖掘案例:基于 ReliefF和K-means算法的应用
- Flink 案例整合
- Flink(22):Flink之Window案例一(基于时间的滚动和滑动窗口)
- ML之R:回归预测任务之模型训练部分代码案例—单个模型推理并输出、各个模型基于单个参数训练调优、选择几个最佳模型再进行交叉训练确保模型稳定性实习代码
- DL之LSTM:LSTM算法论文简介(原理、关键步骤、RNN/LSTM/GRU比较、单层和多层的LSTM)、案例应用之详细攻略
- ML之MF:基于MovieLens电影评分数据集利用基于矩阵分解算法(NMF)实现对用户进行Top5电影推荐案例
- ML之分类预测:分类预测评估指标之AUC计算的的两种函数具体代码案例实现
- 100天精通Python(数据分析篇)——第65天:Pandas聚合操作与案例
- 案例:网页自动化截图接口推送到手机(截图维格表数据)
- 案例精析—2021语言与智能技术竞赛:多形态信息抽取任务
- 【SPSS】多选项分析详细操作教程(附案例实战)
- QT案例实战1 - 从零开始编写一个OCR工具软件 (9) 使用QTcpServer建立一个Tcp服务端
- a22.ansible 生产实战案例 -- haproxy playbook