zl程序教程

您现在的位置是:首页 >  其他

当前栏目

flink keyby指定key方式详解

2023-04-18 16:19:22 时间

1.keyby算子

keyby是flink中非常常见的操作。其作用为在逻辑上将流划分为不相交的分区,而具有相同key的数据都分配到同一个分区。这种操作在各种大数据计算引擎中都非常常见,比如最早的mapreduce,从map阶段到reduce阶段,就是通过shuffle操作将具有相同key的数据分配到同一个reduce端进行处理。在flink内部,keyby是通过哈希分区来实现的,并且自带有多种指定key的方式。

2.源码分析

我们先通过源码,来看看keyby指定key的几种不同方式,flink版本1.7.2

	/**
	 * Partitions the operator state of a {@link DataStream} by the given key positions.
	 *
	 * @param fields
	 *            The position of the fields on which the {@link DataStream}
	 *            will be grouped.
	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
	 */
	public KeyedStream<T, Tuple> keyBy(int... fields) {
		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
			return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType()));
		} else {
			return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
		}
	}

第一种方式,通过指定字段的位置来进行分组,输入参数为一个或多个整数,整数即代表字段对应位置。

	/**
	 * Partitions the operator state of a {@link DataStream} using field expressions.
	 * A field expression is either the name of a public field or a getter method with parentheses
	 * of the {@link DataStream}'s underlying type. A dot can be used to drill
	 * down into objects, as in {@code "field1.getInnerField2()" }.
	 *
	 * @param fields
	 *            One or more field expressions on which the state of the {@link DataStream} operators will be
	 *            partitioned.
	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
	 **/
	public KeyedStream<T, Tuple> keyBy(String... fields) {
		return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
	}

第二种方式,通过指定字段名来指定key。这个字段名是有一定要求的,后面我们再详细解释。

	/**
	 * It creates a new {@link KeyedStream} that uses the provided key for partitioning
	 * its operator states.
	 *
	 * @param key
	 *            The KeySelector to be used for extracting the key for partitioning
	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
	 */
	public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
		Preconditions.checkNotNull(key);
		return new KeyedStream<>(this, clean(key));
	}

第三种方式,通过KeySelector的方式指定。

而KeySelector是一个接口,里面只有一个方法getKey,我们使用的时候实现getKey方法即可。

@FunctionalInterface
public interface KeySelector<IN, KEY> extends Function, Serializable {
	KEY getKey(IN value) throws Exception;
}

3.通过字段号指定key

通过字段号指定key相对比较简单,直接看一个wordcount例子即可。

    public static void baseVersion() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> stream = env.fromElements("java python c python python c");
        DataStream<Tuple2<String, Integer>> flatstream = stream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                for(String word: value.split("\W+")) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        })
                .keyBy(0)
                .sum(1);

        flatstream.print();
        env.execute("keyby base version");
    }

keyBy(0)表示对第一个字段,即word进行分区,而sum(1)则表示对第二个字段即count进行求和。

4.通过字段名指定key

通过字段号指定key使用比较简单方便,但是如果是比较复杂的场景,就不好搞定了。比如如果数据是个比较复杂的嵌套结构Tuple2<Tuple2<String, Integer>, Integer>,如果我们想对内部嵌套的Tuple2的第一个字段进行keyby操作,就无法通过字段号来操作,这个时候我们可以通过字段名的方式来进行代替。
字段名的方式相对来说复杂一些,下面我们来进行示范。

还是先以简单的wordcount为例。

先定义个内部静态类,静态类包含有两个字段,分别为word与count。

    public static final class WC {
        public String word;
        public int count;
        public WC() {}
        public WC(String word, int count) {
            this.word = word;
            this.count = count;
        }
        public String getWord() {
            return word;
        }
        public int getCount() {
            return count;
        }
        public void setWord(String word) {
            this.word = word;
        }
        public void setCount(int count) {
            this.count = count;
        }

        @Override
        public String toString() {
            return this.word + ": " + this.count;
        }
    }

该POLO类中的两个字段word与count,可以传到keyby算子中。

然后再进行flink相关代码的编写。

    public static void nameVersion() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);`在这里插入代码片`
        WC wc1 = new WC("java", 1);
        WC wc2 = new WC("python", 2);
        WC wc3 = new WC("c", 3);
        WC wc4 = new WC("c", 4);
        WC wc5 = new WC("java", 5);

        DataStream<WC> stream = env.fromElements(wc1, wc2, wc3, wc4, wc5);
        stream = stream.keyBy("word").sum("count");
        stream.print();

        env.execute("keyed base");
    }

上面代码输出为

java: 1
python: 2
c: 3
c: 7
java: 6

注意上面的WC pojo类是有要求的
1.keyby中的字段名必须与pojo类的字段名一致。
2.pojo类一定要提供默认的构造函数,否则代码会报如下错误。

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<xxx.xxx.xxx.WC>) cannot be used as key.
	at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
	at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:337)
	...

3.字段需要提供get/set方法。(但是在1.7.2版本测试,如果对字段不提供get/set方法,wordcount代码也可以正常运行)

5.通过嵌套字段名指定key

接下来我们看嵌套的字段名如何在keyby中被指定。

    public static final class WC {
        public int count;
        public InnerClass inner;
        public WC() {}
        public WC(InnerClass inner, int count) {
            this.inner = inner;
            this.count = count;
        }

        public int getCount() {
            return count;
        }
        public WC setCount(int count) {
            this.count = count;
            return this;
        }

        @Override
        public String toString() {
            return this.inner.name + ": " + this.count;
        }
    }
    public static final class InnerClass {
        public String name;
        public String department;

        public InnerClass() {}
        public InnerClass(String name) {
            this.name = name;
        }
        public String getName() {
            return name;
        }
        public InnerClass setName(String name) {
            this.name = name;
            return this;
        }
        public String getDepartment() {
            return department;
        }
        public InnerClass setDepartment(String department) {
            this.department = department;
            return this;
        }
    }

首先我们定义了两个pojo类,一个是WC类,包含有count字段以及InnerClass对象。而InnerClass有name与department两个字段。
有同学可能会问,搞这么复杂干嘛,直接将所有字段定义到WC类中不就好了。同学们,我们这里是演示嵌套字段的用法…

接下来,我们想将name字段指定为keyby中的key

    public static void run() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        InnerClass inn1 = new InnerClass("jojo"); InnerClass inn2 = new InnerClass("jojo");
        InnerClass inn3 = new InnerClass("lili"); InnerClass inn4 = new InnerClass("lili");
        WC wc1 = new WC(inn1, 1);
        WC wc2 = new WC(inn2, 2);
        WC wc3 = new WC(inn3, 3);
        WC wc4 = new WC(inn4, 4);

        DataStream<WC> stream = env.fromElements(wc1, wc2, wc3, wc4)
                .keyBy("inner.name")
                .sum("count");
        stream.print();
        env.execute("keyby complex version");
    }

上面的例子中
count指的是WC中的count字段
inner.word指的是InnerClass中的word字段,inner则表示WC类中的inner属性。

这样就达到了指定复杂嵌套结构中key的目的。

6.通过KeySelector的方式指定key

看一个例子,就能明白上述方式的用法。

    public static void keyselect() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> stream = env.fromElements("java python c python python c");
        DataStream<Tuple2<String, Integer>> flatstream = stream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                for(String word: value.split("\W+")) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        })
                .keyBy((KeySelector<Tuple2<String, Integer>, Object>) value -> value.f0)
                .sum(1);

        flatstream.print();
        env.execute("key select version");
    }