zl程序教程

您现在的位置是:首页 >  其他

当前栏目

六大方法彻底解决Flink Table & SQL维表Join

2023-03-14 22:45:59 时间

随着 Flink Table & SQL的发展,Flink SQL中用于进行维表Join也成为了很多场景的选择。

基于之前的总结,再次总结下Flink Table & SQL 中维表Join的实现方式,包括DataStream中的维表Join。

  • 定时加载维度数据
  • Distributed Cache(分布式缓存)
  • Async IO(异步IO)
  • Broadcast State(广播状态)
  • UDTF + LATERAL TABLE语法
  • LookupableTableSource

定时加载维度数据

实现方式

  • 实现RichFlatMapFunction, 在open()方法中起个线程定时读取维度数据并加载到内存。
  • 在flatMap()方法中实现维度关联。

代码示例

package com.bigdata.flink.dimJoin;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

import java.sql.*;
import java.util.HashMap;
import java.util.Timer;
import java.util.TimerTask;

/**
 * 定时加载维度数据到内存
 */
@Slf4j
public class DimRichFlatMapFunction extends RichFlatMapFunction<UserBrowseLog, Tuple2<UserBrowseLog, UserInfo>> {

    private final String url;
    private final String user;
    private final String passwd;
    private final Integer reloadInterval;

    private Connection connection;
    private final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
    HashMap dimInfo = new HashMap<String, UserInfo>();

    public DimRichFlatMapFunction(String url, String user, String passwd, Integer reloadInterval) {
        this.url = url;
        this.user = user;
        this.passwd = passwd;
        this.reloadInterval = reloadInterval;
    }

    /**
     * 打开连接
     * 定时加载维度数据
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName(JDBC_DRIVER);

        TimerTask timerTask = new TimerTask() {
            @Override
            public void run() {
                try {
                    if (connection == null || connection.isClosed()) {
                        log.warn("No connection. Trying to reconnect...");
                        connection = DriverManager.getConnection(url, user, passwd);
                    }
                    String sql = "select uid,name,age,address from t_user_info";
                    PreparedStatement preparedStatement = connection.prepareStatement(sql);
                    ResultSet resultSet = preparedStatement.executeQuery();
                    while (resultSet.next()) {
                        UserInfo userInfo = new UserInfo();
                        userInfo.setUid(resultSet.getString("uid"));
                        userInfo.setName(resultSet.getString("name"));
                        userInfo.setAge(resultSet.getInt("age"));
                        userInfo.setAddress(resultSet.getString("address"));

                        dimInfo.put(userInfo.getUid(), userInfo);
                    }
                } catch (SQLException e) {
                    log.error("Get dimension data exception...", e);
                }
            }
        };

        Timer timer = new Timer();
        timer.scheduleAtFixedRate(timerTask, 0, reloadInterval * 1000);

    }

    /**
     * 关闭连接
     */
    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) {
            connection.close();
        }
    }

    /**
     * 维度关联
     * @param value
     * @param out
     * @throws Exception
     */
    @Override
    public void flatMap(UserBrowseLog value, Collector<Tuple2<UserBrowseLog, UserInfo>> out) throws Exception {
        String userID = value.getUserID();
        if (dimInfo.containsKey(userID)) {
            UserInfo dim = (UserInfo) dimInfo.get(userID);
            out.collect(new Tuple2<>(value, dim));
        }
    }
}

  • 注意
  1. 由于数据会存储在内存中,因此,仅支持小数据量维表。
  2. 定时加载,仅适用于更新不太频繁的维表。

Distributed Cache(分布式缓存)

实现方式

  1. 通过env.registerCachedFile(cachedFilePath, cachedFileName)注册本地或HDFS缓存文件。
  2. 程序启动时,Flink会自动将文件分发到TaskManager文件系统中。
  3. 实现RichFlatMapFunction,在open()方法中通过RuntimeContext获取缓存文件并解析。
  4. 解析后的数据在内存中,此时可在flatMap()方法中实现维度关联。

代码示例

package com.bigdata.flink.dimJoin;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.io.File;
import java.util.HashMap;
import java.util.List;

/**
 * 通过Distributed Cache实现维度关联
 */
@Slf4j
public class DistributedCacheJoinDim {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 注册缓存文件 如: file:///some/path 或 hdfs://host:port/and/path
        String cachedFilePath = "./user_info.txt";
        String cachedFileName = "user_info";
        env.registerCachedFile(cachedFilePath, cachedFileName);

        // 添加实时流
        DataStreamSource<Tuple2<String, String>> stream = env.fromElements(
                Tuple2.of("1", "click"),
                Tuple2.of("2", "click"),
                Tuple2.of("3", "browse"));

        // 关联维度
        SingleOutputStreamOperator<String> dimedStream = stream.flatMap(new RichFlatMapFunction<Tuple2<String, String>, String>() {

            HashMap dimInfo = new HashMap<String, Integer>();

            // 读取文件
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                File cachedFile = getRuntimeContext().getDistributedCache().getFile(cachedFileName);
                List<String> lines = FileUtils.readLines(cachedFile);
                for (String line : lines) {
                    String[] split = line.split(",");
                    dimInfo.put(split[0], Integer.valueOf(split[1]));
                }
            }

            // 关联维度
            @Override
            public void flatMap(Tuple2<String, String> value, Collector<String> out) throws Exception {
                if (dimInfo.containsKey(value.f0)) {
                    Integer age = (Integer) dimInfo.get(value.f0);
                    out.collect(value.f0 + "," + value.f1 + "," + age);
                }
            }
        });

        dimedStream.print();

        env.execute();
    }
}

  • 注意
  1. 由于数据会存储在内存中,因此,仅支持小数据量维表。
  2. 启动时加载,在维表变化时,需要重启任务。

Distributed Cache(分布式缓存)

实现方式

  1. 通过env.registerCachedFile(cachedFilePath, cachedFileName)注册本地或HDFS缓存文件。
  2. 程序启动时,Flink会自动将文件分发到TaskManager文件系统中。
  3. 实现RichFlatMapFunction,在open()方法中通过RuntimeContext获取缓存文件并解析。
  4. 解析后的数据在内存中,此时可在flatMap()方法中实现维度关联。

代码示例

package com.bigdata.flink.dimJoin;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.io.File;
import java.util.HashMap;
import java.util.List;

/**
 * Author: Wang Pei
 * Summary:
 * 通过Distributed Cache实现维度关联
 */
@Slf4j
public class DistributedCacheJoinDim {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 注册缓存文件 如: file:///some/path 或 hdfs://host:port/and/path
        String cachedFilePath = "./user_info.txt";
        String cachedFileName = "user_info";
        env.registerCachedFile(cachedFilePath, cachedFileName);

        // 添加实时流
        DataStreamSource<Tuple2<String, String>> stream = env.fromElements(
                Tuple2.of("1", "click"),
                Tuple2.of("2", "click"),
                Tuple2.of("3", "browse"));

        // 关联维度
        SingleOutputStreamOperator<String> dimedStream = stream.flatMap(new RichFlatMapFunction<Tuple2<String, String>, String>() {

            HashMap dimInfo = new HashMap<String, Integer>();

            // 读取文件
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                File cachedFile = getRuntimeContext().getDistributedCache().getFile(cachedFileName);
                List<String> lines = FileUtils.readLines(cachedFile);
                for (String line : lines) {
                    String[] split = line.split(",");
                    dimInfo.put(split[0], Integer.valueOf(split[1]));
                }
            }

            // 关联维度
            @Override
            public void flatMap(Tuple2<String, String> value, Collector<String> out) throws Exception {
                if (dimInfo.containsKey(value.f0)) {
                    Integer age = (Integer) dimInfo.get(value.f0);
                    out.collect(value.f0 + "," + value.f1 + "," + age);
                }
            }
        });

        dimedStream.print();

        env.execute();
    }
}

  • 注意
  1. 由于数据会存储在内存中,因此,仅支持小数据量维表。
  2. 启动时加载,在维表变化时,需要重启任务。

Async IO(异步IO)

实现方式

  1. 维度数据在外部存储中,如ES、Redis、HBase中。
  2. 通过异步IO查询维度数据
  3. 结合本地缓存如Guava Cache 减少对外部存储的访问。

代码示例


package com.bigdata.flink;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.*;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * 用Async I/O实现流表与维表Join
 */
public class FlinkAsyncIO {
    public static void main(String[] args) throws Exception{

        /**解析命令行参数*/
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String kafkaBootstrapServers = parameterTool.get("kafka.bootstrap.servers");
        String kafkaGroupID = parameterTool.get("kafka.group.id");
        String kafkaAutoOffsetReset= parameterTool.get("kafka.auto.offset.reset");
        String kafkaTopic = parameterTool.get("kafka.topic");
        int kafkaParallelism =parameterTool.getInt("kafka.parallelism");

        String esHost= parameterTool.get("es.host");
        Integer esPort= parameterTool.getInt("es.port");
        String esUser = parameterTool.get("es.user");
        String esPassword = parameterTool.get("es.password");
        String esIndex = parameterTool.get("es.index");
        String esType = parameterTool.get("es.type");


        /**Flink DataStream 运行环境*/
        Configuration config = new Configuration();
        config.setInteger(RestOptions.PORT,8081);
        config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);

        /**添加数据源*/
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers);
        kafkaProperties.put("group.id",kafkaGroupID);
        kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset);
        FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(), kafkaProperties);
        kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
        SingleOutputStreamOperator<String> source = env.addSource(kafkaConsumer).name("KafkaSource").setParallelism(kafkaParallelism);

        //数据转换
        SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceMap = source.map((MapFunction<String, Tuple4<String, String, String, Integer>>) value -> {
            Tuple4<String, String, String, Integer> output = new Tuple4<>();
            try {
                JSONObject obj = JSON.parseObject(value);
                output.f0 = obj.getString("userID");
                output.f1 = obj.getString("eventTime");
                output.f2 = obj.getString("eventType");
                output.f3 = obj.getInteger("productID");
            } catch (Exception e) {
                e.printStackTrace();
            }
            return output;
        }).returns(new TypeHint<Tuple4<String, String, String, Integer>>(){}).name("Map: ExtractTransform");

        //过滤掉异常数据
        SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceFilter = sourceMap.filter((FilterFunction<Tuple4<String, String, String, Integer>>) value -> value.f3 != null).name("Filter: FilterExceptionData");

        //Timeout: 超时时间 默认异步I/O请求超时时,会引发异常并重启或停止作业。 如果要处理超时,可以重写AsyncFunction#timeout方法。
        //Capacity: 并发请求数量
        /**Async IO实现流表与维表Join*/
        SingleOutputStreamOperator<Tuple5<String, String, String, Integer, Integer>> result = AsyncDataStream.orderedWait(sourceFilter, new ElasticsearchAsyncFunction(esHost,esPort,esUser,esPassword,esIndex,esType), 500, TimeUnit.MILLISECONDS, 10).name("Join: JoinWithDim");

        /**结果输出*/
        result.print().name("PrintToConsole");
        env.execute();
    }
}

其中的 ElasticsearchAsyncFunction:

package com.bigdata.flink;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

/**
 * 自定义ElasticsearchAsyncFunction,实现从ES中查询维度数据
 */
public class ElasticsearchAsyncFunction extends RichAsyncFunction<Tuple4<String, String, String, Integer>, Tuple5<String, String, String, Integer,Integer>> {


    private String host;

    private Integer port;

    private String user;

    private String password;

    private String index;

    private String type;

    public ElasticsearchAsyncFunction(String host, Integer port, String user, String password, String index, String type) {
        this.host = host;
        this.port = port;
        this.user = user;
        this.password = password;
        this.index = index;
        this.type = type;
    }

    private RestHighLevelClient restHighLevelClient;

    private Cache<String,Integer> cache;

    /**
     * 和ES建立连接
     * @param parameters
     */
    @Override
    public void open(Configuration parameters){

        //ES Client
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
        restHighLevelClient = new RestHighLevelClient(
                RestClient
                        .builder(new HttpHost(host, port))
                        .setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)));

        //初始化缓存
        cache=CacheBuilder.newBuilder().maximumSize(2).expireAfterAccess(5, TimeUnit.MINUTES).build();
    }

    /**
     * 关闭连接
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        restHighLevelClient.close();
    }


    /**
     * 异步调用
     * @param input
     * @param resultFuture
     */
    @Override
    public void asyncInvoke(Tuple4<String, String, String, Integer> input, ResultFuture<Tuple5<String, String, String, Integer, Integer>> resultFuture) {

        // 1、先从缓存中取
        Integer cachedValue = cache.getIfPresent(input.f0);
        if(cachedValue !=null){
            System.out.println("从缓存中获取到维度数据: key="+input.f0+",value="+cachedValue);
            resultFuture.complete(Collections.singleton(new Tuple5<>(input.f0,input.f1,input.f2,input.f3,cachedValue)));

        // 2、缓存中没有,则从外部存储获取
        }else {
            searchFromES(input,resultFuture);
        }
    }

    /**
     * 当缓存中没有数据时,从外部存储ES中获取
     * @param input
     * @param resultFuture
     */
    private void searchFromES(Tuple4<String, String, String, Integer> input, ResultFuture<Tuple5<String, String, String, Integer, Integer>> resultFuture){

        // 1、构造输出对象
        Tuple5<String, String, String, Integer, Integer> output = new Tuple5<>();
        output.f0=input.f0;
        output.f1=input.f1;
        output.f2=input.f2;
        output.f3=input.f3;

        // 2、待查询的Key
        String dimKey = input.f0;

        // 3、构造Ids Query
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices(index);
        searchRequest.types(type);
        searchRequest.source(SearchSourceBuilder.searchSource().query(QueryBuilders.idsQuery().addIds(dimKey)));

        // 4、用异步客户端查询数据
        restHighLevelClient.searchAsync(searchRequest, new ActionListener<SearchResponse>() {

            //成功响应时处理
            @Override
            public void onResponse(SearchResponse searchResponse) {
                SearchHit[] searchHits = searchResponse.getHits().getHits();
                if(searchHits.length >0 ){
                    JSONObject obj = JSON.parseObject(searchHits[0].getSourceAsString());
                    Integer dimValue=obj.getInteger("age");
                    output.f4=dimValue;
                    cache.put(dimKey,dimValue);
                    System.out.println("将维度数据放入缓存: key="+dimKey+",value="+dimValue);
                }

                resultFuture.complete(Collections.singleton(output));
            }

            //响应失败时处理
            @Override
            public void onFailure(Exception e) {
                output.f4=null;
                resultFuture.complete(Collections.singleton(output));
            }
        });

    }

    //超时时处理
    @Override
    public void timeout(Tuple4<String, String, String, Integer> input, ResultFuture<Tuple5<String, String, String, Integer, Integer>> resultFuture) {
        searchFromES(input,resultFuture);
    }
}
  • 注意
  1. 此方式不受限于内存,可支持数据量较大的维度数据。
  2. 需要外部存储支持。
  3. 应尽量减少对外部存储访问。

Broadcast State

实现方式

  1. 将维度数据发送到Kafka作为流S1。事实数据是流S2。
  2. 定义状态描述符MapStateDescriptor,如descriptor。
  3. 结合状态描述符,将S1广播出去,如S1.broadcast(descriptor),形成广播流(BroadcastStream) B1。
  4. 事实流S2和广播流B1连接,形成连接后的流BroadcastConnectedStream BC。
  5. 基于BC流,在KeyedBroadcastProcessFunction/BroadcastProcessFunction中实现Join的逻辑处理。

代码示例

package com.bigdata.flink;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 *  基于Broadcast State 动态更新配置以实现实时过滤数据并增加字段
 */
@Slf4j
public class TestBroadcastState {
    public static void main(String[] args) throws Exception{

        //1、解析命令行参数
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("applicationProperties"));

        //checkpoint配置
        String checkpointDirectory = parameterTool.getRequired("checkpointDirectory");
        long checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval");

        //事件流配置
        String fromKafkaBootstrapServers = parameterTool.getRequired("fromKafka.bootstrap.servers");
        String fromKafkaGroupID = parameterTool.getRequired("fromKafka.group.id");
        String fromKafkaTopic = parameterTool.getRequired("fromKafka.topic");

        //配置流配置
        String fromMysqlHost = parameterTool.getRequired("fromMysql.host");
        int fromMysqlPort = parameterTool.getInt("fromMysql.port");
        String fromMysqlDB = parameterTool.getRequired("fromMysql.db");
        String fromMysqlUser = parameterTool.getRequired("fromMysql.user");
        String fromMysqlPasswd = parameterTool.getRequired("fromMysql.passwd");
        int fromMysqlSecondInterval = parameterTool.getInt("fromMysql.secondInterval");

        //2、配置运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置StateBackend
        env.setStateBackend((StateBackend) new FsStateBackend(checkpointDirectory, true));
        //设置Checkpoint
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointInterval(checkpointSecondInterval * 1000);
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //3、Kafka事件流
        //从Kafka中获取事件数据
        //数据:某个用户在某个时刻浏览或点击了某个商品,如
        //{"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers",fromKafkaBootstrapServers);
        kafkaProperties.put("group.id",fromKafkaGroupID);

        FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(fromKafkaTopic, new SimpleStringSchema(), kafkaProperties);
        kafkaConsumer.setStartFromLatest();
        DataStream<String> kafkaSource = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id-kafka-source");

        SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> eventStream = kafkaSource.process(new ProcessFunction<String, Tuple4<String, String, String, Integer>>() {
            @Override
            public void processElement(String value, Context ctx, Collector<Tuple4<String, String, String, Integer>> out){
                try {
                    JSONObject obj = JSON.parseObject(value);
                    String userID = obj.getString("userID");
                    String eventTime = obj.getString("eventTime");
                    String eventType = obj.getString("eventType");
                    int productID = obj.getIntValue("productID");
                    out.collect(new Tuple4<>(userID, eventTime, eventType, productID));
                }catch (Exception ex){
                    log.warn("异常数据:{}",value,ex);
                }
            }
        });

        //4、Mysql配置流
        //自定义Mysql Source,周期性地从Mysql中获取配置,并广播出去
        //数据: 用户ID,用户姓名,用户年龄
        DataStreamSource<HashMap<String, Tuple2<String, Integer>>> configStream = env.addSource(new MysqlSource(fromMysqlHost, fromMysqlPort, fromMysqlDB, fromMysqlUser, fromMysqlPasswd, fromMysqlSecondInterval));

        /*
          (1) 先建立MapStateDescriptor
          MapStateDescriptor定义了状态的名称、Key和Value的类型。
          这里,MapStateDescriptor中,key是Void类型,value是Map<String, Tuple2<String,Int>>类型。
         */
        MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> configDescriptor = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));

        /*
          (2) 将配置流广播,形成BroadcastStream
         */
        BroadcastStream<HashMap<String, Tuple2<String, Integer>>> broadcastConfigStream = configStream.broadcast(configDescriptor);

        //5、事件流和广播的配置流连接,形成BroadcastConnectedStream
        BroadcastConnectedStream<Tuple4<String, String, String, Integer>, HashMap<String, Tuple2<String, Integer>>> connectedStream = eventStream.connect(broadcastConfigStream);

        //6、对BroadcastConnectedStream应用process方法,根据配置(规则)处理事件
        SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> resultStream = connectedStream.process(new CustomBroadcastProcessFunction());

        //7、输出结果
        resultStream.print();

        //8、生成JobGraph,并开始执行
        env.execute();

    }

    /**
     * 自定义BroadcastProcessFunction
     * 当事件流中的用户ID在配置中出现时,才对该事件处理, 并在事件中补全用户的基础信息
     * Tuple4<String, String, String, Integer>: 第一个流(事件流)的数据类型
     * HashMap<String, Tuple2<String, Integer>>: 第二个流(配置流)的数据类型
     * Tuple6<String, String, String, Integer,String, Integer>: 返回的数据类型
     */
    static class CustomBroadcastProcessFunction extends BroadcastProcessFunction<Tuple4<String, String, String, Integer>, HashMap<String, Tuple2<String, Integer>>, Tuple6<String, String, String, Integer, String, Integer>>{

        /**定义MapStateDescriptor*/
        MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> configDescriptor = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));

        /**
         * 读取状态,并基于状态,处理事件流中的数据
         * 在这里,从上下文中获取状态,基于获取的状态,对事件流中的数据进行处理
         * @param value 事件流中的数据
         * @param ctx 上下文
         * @param out 输出零条或多条数据
         * @throws Exception
         */
        @Override
        public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {

            //事件流中的用户ID
            String userID = value.f0;

            //获取状态
            ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(configDescriptor);
            Map<String, Tuple2<String, Integer>> broadcastStateUserInfo = broadcastState.get(null);

            //配置中有此用户,则在该事件中添加用户的userName、userAge字段。
            //配置中没有此用户,则丢弃
            Tuple2<String, Integer> userInfo = broadcastStateUserInfo.get(userID);
            if(userInfo!=null){
                out.collect(new Tuple6<>(value.f0,value.f1,value.f2,value.f3,userInfo.f0,userInfo.f1));
            }

        }

        /**
         * 处理广播流中的每一条数据,并更新状态
         * @param value 广播流中的数据
         * @param ctx 上下文
         * @param out 输出零条或多条数据
         * @throws Exception
         */
        @Override
        public void processBroadcastElement(HashMap<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {

            //获取状态
            BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(configDescriptor);

            //清空状态
            broadcastState.clear();

            //更新状态
            broadcastState.put(null,value);

        }
    }



    /**
     * 自定义Mysql Source,每隔 secondInterval 秒从Mysql中获取一次配置
     */
    static class MysqlSource extends RichSourceFunction<HashMap<String, Tuple2<String, Integer>>> {

        private String host;
        private Integer port;
        private String db;
        private String user;
        private String passwd;
        private Integer secondInterval;

        private volatile boolean isRunning = true;

        private Connection connection;
        private PreparedStatement preparedStatement;

        MysqlSource(String host, Integer port, String db, String user, String passwd,Integer secondInterval) {
            this.host = host;
            this.port = port;
            this.db = db;
            this.user = user;
            this.passwd = passwd;
            this.secondInterval = secondInterval;
        }

        /**
         * 开始时, 在open()方法中建立连接
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            Class.forName("com.mysql.jdbc.Driver");
            connection= DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/"+db+"?useUnicode=true&characterEncoding=UTF-8", user, passwd);
            String sql="select userID,userName,userAge from user_info";
            preparedStatement=connection.prepareStatement(sql);
        }

        /**
         * 执行完,调用close()方法关系连接,释放资源
         * @throws Exception
         */
        @Override
        public void close() throws Exception {
            super.close();

            if(connection!=null){
                connection.close();
            }

            if(preparedStatement !=null){
                preparedStatement.close();
            }
        }

        /**
         * 调用run()方法获取数据
         * @param ctx
         */
        @Override
        public void run(SourceContext<HashMap<String, Tuple2<String, Integer>>> ctx) {
            try {
                while (isRunning){
                    HashMap<String, Tuple2<String, Integer>> output = new HashMap<>();
                    ResultSet resultSet = preparedStatement.executeQuery();
                    while (resultSet.next()){
                        String userID = resultSet.getString("userID");
                        String userName = resultSet.getString("userName");
                        int userAge = resultSet.getInt("userAge");
                        output.put(userID,new Tuple2<>(userName,userAge));
                    }
                    ctx.collect(output);
                    //每隔多少秒执行一次查询
                    Thread.sleep(1000*secondInterval);
                }
            }catch (Exception ex){
                log.error("从Mysql获取配置异常...",ex);
            }


        }
        /**
         * 取消时,会调用此方法
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}

  • 注意
  1. 需要将维度数据的变化转换成Kafka中的流。
  2. 维度的变化可实时感知。
  3. 维度数据保存在内存中,支持的数据量相对较小。

UDTF + LATERAL TABLE语法

实现方式

  1. 假设你用的是Flink SQL。首先,自定义UTDF, 继承TableFunction抽象类,实现open()、close()、eval()方法。
  2. 注册TableFunction。
  3. 在SQL中使用LATERAL TABLE语法和UDTF运行的结果进行关联。
package com.bigdata.flink.tableSqlTemporalTable;

import com.alibaba.fastjson.JSON;
import com.bigdata.flink.beans.table.ProductInfo;
import com.bigdata.flink.beans.table.UserBrowseLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;


/**
 * Summary:
 *  时态表(Temporal Table)
 */
@Slf4j
public class Test {
    public static void main(String[] args) throws Exception{

        args=new String[]{"--application","flink/src/main/java/com/bigdata/flink/tableSqlTemporalTable/application.properties"};

        //1、解析命令行参数
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));

        //browse log
        String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");
        String browseTopic = parameterTool.getRequired("browseTopic");
        String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID");

        //product history info
        String productInfoTopic = parameterTool.getRequired("productHistoryInfoTopic");
        String productInfoGroupID = parameterTool.getRequired("productHistoryInfoGroupID");

        //2、设置运行环境
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
        streamEnv.setParallelism(1);

        //3、注册Kafka数据源
        //注意: 为了在北京时间和时间戳之间有直观的认识,这里的UserBrowseLog中增加了一个字段eventTimeTimestamp作为eventTime的时间戳
        Properties browseProperties = new Properties();
        browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
        browseProperties.put("group.id",browseTopicGroupID);
        DataStream<UserBrowseLog> browseStream=streamEnv
                .addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))
                .process(new BrowseKafkaProcessFunction())
                .assignTimestampsAndWatermarks(new BrowseTimestampExtractor(Time.seconds(0)));

        tableEnv.registerDataStream("browse",browseStream,"userID,eventTime,eventTimeTimestamp,eventType,productID,productPrice,browseRowtime.rowtime");
        //tableEnv.toAppendStream(tableEnv.scan("browse"),Row.class).print();

        //4、注册时态表(Temporal Table)
        //注意: 为了在北京时间和时间戳之间有直观的认识,这里的ProductInfo中增加了一个字段updatedAtTimestamp作为updatedAt的时间戳
        Properties productInfoProperties = new Properties();
        productInfoProperties.put("bootstrap.servers",kafkaBootstrapServers);
        productInfoProperties.put("group.id",productInfoGroupID);
        DataStream<ProductInfo> productInfoStream=streamEnv
                .addSource(new FlinkKafkaConsumer010<>(productInfoTopic, new SimpleStringSchema(), productInfoProperties))
                .process(new ProductInfoProcessFunction())
                .assignTimestampsAndWatermarks(new ProductInfoTimestampExtractor(Time.seconds(0)));

        tableEnv.registerDataStream("productInfo",productInfoStream, "productID,productName,productCategory,updatedAt,updatedAtTimestamp,productInfoRowtime.rowtime");
        //设置Temporal Table的时间属性和主键
        TemporalTableFunction productInfo = tableEnv.scan("productInfo").createTemporalTableFunction("productInfoRowtime", "productID");
        //注册TableFunction
        tableEnv.registerFunction("productInfoFunc",productInfo);
        //tableEnv.toAppendStream(tableEnv.scan("productInfo"),Row.class).print();

        //5、运行SQL
        String sql = ""
                + "SELECT "
                + "browse.userID, "
                + "browse.eventTime, "
                + "browse.eventTimeTimestamp, "
                + "browse.eventType, "
                + "browse.productID, "
                + "browse.productPrice, "
                + "productInfo.productID, "
                + "productInfo.productName, "
                + "productInfo.productCategory, "
                + "productInfo.updatedAt, "
                + "productInfo.updatedAtTimestamp "
                + "FROM "
                + " browse, "
                + " LATERAL TABLE (productInfoFunc(browse.browseRowtime)) as productInfo "
                + "WHERE "
                + " browse.productID=productInfo.productID";

        Table table = tableEnv.sqlQuery(sql);
        tableEnv.toAppendStream(table,Row.class).print();

        //6、开始执行
        tableEnv.execute(Test.class.getSimpleName());


    }


    /**
     * 解析Kafka数据
     */
    static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {
        @Override
        public void processElement(String value, Context ctx, Collector<UserBrowseLog> out) throws Exception {
            try {

                UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class);

                // 增加一个long类型的时间戳
                // 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
                DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+08:00"));
                // 转换成毫秒时间戳
                long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
                log.setEventTimeTimestamp(eventTimeTimestamp);

                out.collect(log);
            }catch (Exception ex){
                log.error("解析Kafka数据异常...",ex);
            }
        }
    }

    /**
     * 提取时间戳生成水印
     */
    static class BrowseTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog> {

        BrowseTimestampExtractor(Time maxOutOfOrderness) {
            super(maxOutOfOrderness);
        }

        @Override
        public long extractTimestamp(UserBrowseLog element) {
            return element.getEventTimeTimestamp();
        }
    }





    /**
     * 解析Kafka数据
     */
    static class ProductInfoProcessFunction extends ProcessFunction<String, ProductInfo> {
        @Override
        public void processElement(String value, Context ctx, Collector<ProductInfo> out) throws Exception {
            try {

                ProductInfo log = JSON.parseObject(value, ProductInfo.class);

                // 增加一个long类型的时间戳
                // 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
                DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                OffsetDateTime eventTime = LocalDateTime.parse(log.getUpdatedAt(), format).atOffset(ZoneOffset.of("+08:00"));
                // 转换成毫秒时间戳
                long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
                log.setUpdatedAtTimestamp(eventTimeTimestamp);

                out.collect(log);
            }catch (Exception ex){
                log.error("解析Kafka数据异常...",ex);
            }
        }
    }

    /**
     * 提取时间戳生成水印
     */
    static class ProductInfoTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<ProductInfo> {

        ProductInfoTimestampExtractor(Time maxOutOfOrderness) {
            super(maxOutOfOrderness);
        }

        @Override
        public long extractTimestamp(ProductInfo element) {
            return element.getUpdatedAtTimestamp();
        }
    }

}

定义UDTF:

package com.bigdata.flink.dimJoin;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import redis.clients.jedis.Jedis;

/**
 * UDTF
 */
public class UDTFRedis extends TableFunction<Row> {

    private Jedis jedis;

    /**
     * 打开连接
     * @param context
     * @throws Exception
     */
    @Override
    public void open(FunctionContext context) throws Exception {
        jedis = new Jedis("localhost", 6379);
        jedis.select(0);
    }

    /**
     * 关闭连接
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        if (jedis != null) {
            jedis.close();
        }
    }

    /**
     * 从Redis中查找维度数据
     * @param key
     */
    public void eval(String key) {
        String value = jedis.get(key);
        if (value != null) {
            String[] valueSplit = value.split(",");
            Row row = new Row(2);
            row.setField(0, valueSplit[0]);
            row.setField(1, Integer.valueOf(valueSplit[1]));
            collector.collect(row);
        }
    }

    /**
     * 定义返回的数据类型,返回数据为userName,userAge,所以这里为String,Int。
     * @return
     */
    @Override
    public TypeInformation<Row> getResultType() {
        return new RowTypeInfo(Types.STRING, Types.INT);
    }
}

Kafka Join Redis-Dim

package com.bigdata.flink.dimJoin;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * Author: Wang Pei
 * Summary:
 * Kafka Join Redis-Dim
 */
public class KafkaJoinRedisDimWithUDTF {
    public static void main(String[] args) throws Exception {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

        // Source DDL
        // Kafka数据: {"userID":"user_1","eventType":"click","eventTime":"2015-01-01 00:00:00"}
        String sourceDDL = ""
                + "create table source_kafka "
                + "( "
                + "    userID String, "
                + "    eventType String, "
                + "    eventTime String "
                + ") with ( "
                + "    'connector.type' = 'kafka', "
                + "    'connector.version' = '0.10', "
                + "    'connector.properties.bootstrap.servers' = 'kafka01:9092', "
                + "    'connector.properties.zookeeper.connect' = 'kafka01:2181', "
                + "    'connector.topic' = 'test_1', "
                + "    'connector.properties.group.id' = 'c1_test_1', "
                + "    'connector.startup-mode' = 'latest-offset', "
                + "    'format.type' = 'json' "
                + ")";
        tableEnv.sqlUpdate(sourceDDL);
        tableEnv.toAppendStream(tableEnv.from("source_kafka"), Row.class).print();

        // UDTF DDL
        // Redis中的数据 userID userName,userAge
        // 127.0.0.1:6379> get user_1
        // "name1,10"
        String udtfDDL = ""
                + "CREATE TEMPORARY FUNCTION "
                + "  IF NOT EXISTS UDTFRedis "
                + "  AS 'com.bigdata.flink.dimJoin.UDTFRedis'";
        tableEnv.sqlUpdate(udtfDDL);

        // Query
        // Left Join
        String execSQL = ""
                + "select "
                + " source_kafka.*,dim.* "
                + "from source_kafka "
                + "LEFT JOIN LATERAL TABLE(UDTFRedis(userID)) as dim (userName,userAge) ON TRUE";
        Table table = tableEnv.sqlQuery(execSQL);
        tableEnv.toAppendStream(table, Row.class).print();

        tableEnv.execute(KafkaJoinRedisDimWithUDTF.class.getSimpleName());
    }
}

  • 注意
  1. 需要定义UDTF和使用LATERAL TABLE语法。
  2. 不是很通用,如想用一个UDTF实现所有从Redis获取维度数据的场景,很难实现。
  3. 依赖外部存储,当数据变化时,可及时获取。

LookupableTableSource

实现方式

数据源实现LookupableTableSource接口。

在Flink SQL中直接注册Lookup表即可,在Flink Table API中需要注册LookupFunction 。

本质上,还是通过TableFunction来获取维度数据。

package com.bigdata.flink.tableSqlLookableTableSource;

import com.alibaba.fastjson.JSON;
import com.bigdata.flink.beans.table.UserBrowseLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.addons.hbase.HBaseTableSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.io.jdbc.JDBCLookupOptions;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCTableSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

/**
 *  Lookup Table Source
 */
@Slf4j
public class Test {
    public static void main(String[] args) throws Exception{

        args=new String[]{"--application","flink/src/main/java/com/bigdata/flink/tableSqlLookableTableSource/application.properties"};

        //1、解析命令行参数
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));

        String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");
        String browseTopic = parameterTool.getRequired("browseTopic");
        String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID");

        String hbaseZookeeperQuorum = parameterTool.getRequired("hbaseZookeeperQuorum");
        String hbaseZnode = parameterTool.getRequired("hbaseZnode");
        String hbaseTable = parameterTool.getRequired("hbaseTable");

        String mysqlDBUrl = parameterTool.getRequired("mysqlDBUrl");
        String mysqlUser = parameterTool.getRequired("mysqlUser");
        String mysqlPwd = parameterTool.getRequired("mysqlPwd");
        String mysqlTable = parameterTool.getRequired("mysqlTable");

        //2、设置运行环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
        streamEnv.setParallelism(1);

        //3、注册Kafka数据源
        //自己造的测试数据,某个用户在某个时刻点击了某个商品,以及商品的价值,如下
        //{"userID": "user_1", "eventTime": "2016-01-01 10:02:00", "eventType": "browse", "productID": "product_1", "productPrice": 20}
        Properties browseProperties = new Properties();
        browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
        browseProperties.put("group.id",browseTopicGroupID);
        DataStream<UserBrowseLog> browseStream=streamEnv
                .addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))
                .process(new BrowseKafkaProcessFunction());
        tableEnv.registerDataStream("kafka",browseStream,"userID,eventTime,eventTimeTimestamp,eventType,productID,productPrice");
        //tableEnv.toAppendStream(tableEnv.scan("kafka"),Row.class).print();

        //4、注册HBase数据源(Lookup Table Source)
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
        conf.set("zookeeper.znode.parent",hbaseZnode);
        HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, hbaseTable);
        hBaseTableSource.setRowKey("uid",String.class);
        hBaseTableSource.addColumn("f1","name",String.class);
        hBaseTableSource.addColumn("f1","age",Integer.class);
        tableEnv.registerTableSource("hbase",hBaseTableSource);
        //注册TableFunction
        tableEnv.registerFunction("hbaseLookup", hBaseTableSource.getLookupFunction(new String[]{"uid"}));

        //5、注册Mysql数据源(Lookup Table Source)
        String[] mysqlFieldNames={"pid","productName","productCategory","updatedAt"};
        DataType[] mysqlFieldTypes={DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()};
        TableSchema mysqlTableSchema = TableSchema.builder().fields(mysqlFieldNames, mysqlFieldTypes).build();
        JDBCOptions jdbcOptions = JDBCOptions.builder()
                .setDriverName("com.mysql.jdbc.Driver")
                .setDBUrl(mysqlDBUrl)
                .setUsername(mysqlUser)
                .setPassword(mysqlPwd)
                .setTableName(mysqlTable)
                .build();

        JDBCLookupOptions jdbcLookupOptions = JDBCLookupOptions.builder()
                .setCacheExpireMs(10 * 1000) //缓存有效期
                .setCacheMaxSize(10) //最大缓存数据条数
                .setMaxRetryTimes(3) //最大重试次数
                .build();

        JDBCTableSource jdbcTableSource = JDBCTableSource.builder()
                .setOptions(jdbcOptions)
                .setLookupOptions(jdbcLookupOptions)
                .setSchema(mysqlTableSchema)
                .build();
        tableEnv.registerTableSource("mysql",jdbcTableSource);
        //注册TableFunction
        tableEnv.registerFunction("mysqlLookup",jdbcTableSource.getLookupFunction(new String[]{"pid"}));


        //6、查询
        //根据userID, 从HBase表中获取用户基础信息
        //根据productID, 从Mysql表中获取产品基础信息
        String sql = ""
                + "SELECT "
                + "       userID, "
                + "       eventTime, "
                + "       eventType, "
                + "       productID, "
                + "       productPrice, "
                + "       f1.name AS userName, "
                + "       f1.age AS userAge, "
                + "       productName, "
                + "       productCategory "
                + "FROM "
                + "     kafka, "
                + "     LATERAL TABLE(hbaseLookup(userID)), "
                + "     LATERAL TABLE (mysqlLookup(productID))";

        tableEnv.toAppendStream(tableEnv.sqlQuery(sql),Row.class).print();

        //7、开始执行
        tableEnv.execute(Test.class.getSimpleName());
    }

    /**
     * 解析Kafka数据
     */
    static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {
        @Override
        public void processElement(String value, Context ctx, Collector<UserBrowseLog> out) throws Exception {
            try {

                UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class);

                // 增加一个long类型的时间戳
                // 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
                DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+08:00"));
                // 转换成毫秒时间戳
                long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
                log.setEventTimeTimestamp(eventTimeTimestamp);

                out.collect(log);
            }catch (Exception ex){
                log.error("解析Kafka数据异常...",ex);
            }
        }
    }

}

package com.bigdata.flink.dimJoin;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * Author: Wang Pei
 * Summary:
 *  Kafka Join Mysql-Dim
 */
public class KafkaJoinMysqlDim {
    public static void main(String[] args) throws Exception {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

        // Source DDL
        // Kafka数据: {"userID":"user_1","eventType":"click","eventTime":"2015-01-01 00:00:00"}
        String sourceDDL = ""
                + "create table source_kafka "
                + "( "
                + "    userID STRING, "
                + "    eventType STRING, "
                + "    eventTime STRING, "
                + "    proctime AS PROCTIME() "
                + ") with ( "
                + "    'connector.type' = 'kafka', "
                + "    'connector.version' = '0.10', "
                + "    'connector.properties.bootstrap.servers' = 'kafka01:9092', "
                + "    'connector.properties.zookeeper.connect' = 'kafka01:2181', "
                + "    'connector.topic' = 'test_1', "
                + "    'connector.properties.group.id' = 'c1_test_1', "
                + "    'connector.startup-mode' = 'latest-offset', "
                + "    'format.type' = 'json' "
                + ")";
        tableEnv.sqlUpdate(sourceDDL);
        //tableEnv.toAppendStream(tableEnv.from("source_kafka"), Row.class).print();

        // Dim DDL
        // Mysql维度数据
        // mysql> select * from t_user_info limit 1;
        // +--------+----------+---------+
        // | userID | userName | userAge |
        // +--------+----------+---------+
        // | user_1 | name1    |      10 |
        // +--------+----------+---------+
        String dimDDL = ""
                + "CREATE TABLE dim_mysql ( "
                + "    userID STRING, "
                + "    userName STRING, "
                + "    userAge INT "
                + ") WITH ( "
                + "    'connector.type' = 'jdbc', "
                + "    'connector.url' = 'jdbc:mysql://localhost:3306/bigdata', "
                + "    'connector.table' = 't_user_info', "
                + "    'connector.driver' = 'com.mysql.jdbc.Driver', "
                + "    'connector.username' = '****', "
                + "    'connector.password' = '******' "
                + ")";
        tableEnv.sqlUpdate(dimDDL);

        // Query
        // Left Join
        String execSQL = ""
                + "SELECT "
                + "  kafka.*,mysql.userName,mysql.userAge "
                + "FROM "
                + "  source_kafka as kafka"
                + "  LEFT JOIN dim_mysql FOR SYSTEM_TIME AS OF kafka.proctime AS mysql "
                + "  ON kafka.userID = mysql.userID";
        Table table = tableEnv.sqlQuery(execSQL);
        tableEnv.toAppendStream(table, Row.class).print();

        tableEnv.execute(KafkaJoinMysqlDim.class.getSimpleName());

    }
}

  • 注意
  1. 需要实现LookupableTableSource接口。
  2. 比较通用。
  3. 依赖外部存储,当数据变化时,可及时获取。
  4. 目前仅支持Blink Planner。

你好,我是王知无,一个大数据领域的硬核原创作者。 做过后端架构、数据中间件、数据平台&架构&、算法工程化。