zl程序教程

您现在的位置是:首页 >  工具

当前栏目

Apache FlinkCEP 实现超时状态监控的步骤详解

Apache监控状态 实现 详解 步骤 超时
2023-06-13 09:20:06 时间

 

CEP Complex Event Processing复杂事件处理。

订单下单后超过一定时间还未进行支付确认。

打车订单生成后超过一定时间没有确认上车。

外卖超过预定送达时间一定时限还没有确认送达。

Apache FlinkCEP API

CEPTimeoutEventJob

FlinkCEP源码简析

DataStream和PatternStream

DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter、Map等转换为另一个DataStream。

PatternStream 是对CEP模式匹配的流的抽象,把DataStream和Pattern组合在一块,然后对外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是Map 模式名称,List 事件 )发出去,发到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。

CEPOperatorUtils工具类里的方法和变量使用了「PatternStream」来命名,比如:

public

static
IN, OUT
SingleOutputStreamOperator
OUT createPatternStream( ){ }
public

static
IN, OUT1, OUT2
SingleOutputStreamOperator
OUT1 createTimeoutPatternStream( ){ }

final

SingleOutputStreamOperator
OUT patternStream;

SingleOutputStreamOperator

@Public

public

class

SingleOutputStreamOperator
T
extends

DataStream
T { }

PatternStream的构造方法:


PatternStream

final

DataStream

 T inputStream, 

final

Pattern

 T, pattern) {


Pattern是模式定义的Base Class,Builder模式,定义好的模式会被NFACompiler用来生成NFA。

如果想要自己实现类似next和followedBy这种方法,比如timeEnd,对Pattern进行扩展重写应该是可行的。

public
class
Pattern
T, F
extends
T {
/** 模式名称 */
private
final
String
name;
/** 前面一个模式 */
private
final
Pattern
T,
extends
T previous;
/** 一个事件如果要被当前模式匹配到,必须满足的约束条件 */
private
IterativeCondition
F condition;
/** 时间窗口长度,在时间长度内进行模式匹配 */
private
Time
windowTime;
/** 模式量词,意思是一个模式匹配几个事件等 默认是匹配到一个 */
private
Quantifier
quantifier =
Quantifier
.one(
ConsumingStrategy
.STRICT);
/** 停止将事件收集到循环状态时,事件必须满足的条件 */
private
IterativeCondition
F untilCondition;
/**
* 适用于{@code times}模式,用来维护模式里事件可以连续发生的次数
*/
private
Times
times;
// 匹配到事件之后的跳过策略
private
final
AfterMatchSkipStrategy
afterMatchSkipStrategy;

Quantifier是用来描述具体模式行为的,主要有三大类:

Single-单一匹配、Looping-循环匹配、Times-一定次数或者次数范围内都能匹配到。

每一个模式Pattern可以是optional可选的(单一匹配或循环匹配),并可以设置ConsumingStrategy。

循环和次数也有一个额外的内部ConsumingStrategy,用在模式中接收的事件之间。


public

class

Quantifier

 * 5个属性,可以组合,但并非所有的组合都是有效的

public

QuantifierProperty

 SINGLE,

 LOOPING,

 TIMES,

 OPTIONAL,

 GREEDY

 * 描述在此模式中匹配哪些事件的策略

public

ConsumingStrategy

 STRICT,

 SKIP_TILL_NEXT,

 SKIP_TILL_ANY,

 NOT_FOLLOW,

 NOT_NEXT

 * 描述当前模式里事件可以连续发生的次数;举个例子,模式条件无非就是boolean,满足true条件的事件连续出现times次,或者一个次数范围,比如2~4次,2次,3次,4次都会被当前模式匹配出来,因此同一个事件会被重复匹配到

public

static

class

Times

private

final

 from;

private

final

private

Times

 from, 

 to) {

Preconditions

.checkArgument(from 

"The from should be a positive number greater than 0."

Preconditions

.checkArgument(to = from, 

"The to should be a number greater than or equal to from: "

 + from + 

.from = from;

.to = to;

public

 getFrom() {

return

 from;

public

 getTo() {

return

// 次数范围

public

static

Times

 from, 

 to) {

return

Times

(from, to);

// 指定具体次数

public

static

Times

 times) {

return

Times

(times, times);

@Override

public

boolean

 equals(

Object

 o) {

 == o) {

return

 (o == 

 || getClass() != o.getClass()) {

return

false

Times

 times = (

Times

return

 from == times.from 

 to == times.to;

@Override

public

 hashCode() {

return

Objects

.hash(from, to);

}

EventComparator,自定义事件比较器,实现EventComparator接口。

public

interface

EventComparator
T
extends

Comparator
T ,
Serializable
{
long
serialVersionUID =
1L
;

NFACompiler和NFA

NFACompiler提供将Pattern编译成NFA或者NFAFactory的方法,使用NFAFactory可以创建多个NFA。

public
class
NFACompiler
{

/**
* NFAFactory 创建NFA的接口
*
* @param T Type of the input events which are processed by the NFA
*/
public
interface
NFAFactory
T
extends
Serializable
{
NFA T createNFA();
}

/**
* NFAFactory的具体实现NFAFactoryImpl
*
* p The implementation takes the input type serializer, the window time and the set of
* states and their transitions to be able to create an NFA from them.
*
* @param T Type of the input events which are processed by the NFA
*/
private
static
class
NFAFactoryImpl
T
implements
NFAFactory
T {

private
static
final
long
serialVersionUID =
8939783698296714379L
;

private
final
long
windowTime;
private
final
Collection

State
T states;
private
final
boolean
timeoutHandling;

private
NFAFactoryImpl
(
long
windowTime,
Collection

State
T states,
boolean
timeoutHandling) {

this
.windowTime = windowTime;
this
.states = states;
this
.timeoutHandling = timeoutHandling;
}

@Override
public
NFA T createNFA() {
// 一个NFA由状态集合、时间窗口的长度和是否处理超时组成
return
new
NFA (states, windowTime, timeoutHandling);
}
}

NFA:Non-deterministic finite automaton 非确定的有限(状态)自动机。

更多内容参见

https://zh.wikipedia.org/wiki/非确定有限状态自动机

public
class
NFA T {
/**
* NFACompiler返回的所有有效的NFA状态集合
* These are directly derived from the user-specified pattern.
*/
private
final
Map

String
,
State
T states;

/**
* Pattern.within(Time)指定的时间窗口长度
*/
private
final
long
windowTime;

/**
* 一个超时匹配的标记
*/
private
final
boolean
handleTimeout;

 

PatternSelectFunction和PatternFlatSelectFunction

当一个包含被匹配到的事件的映射能够通过模式名称访问到的时候,PatternSelectFunction的select()方法会被调用。模式名称是由Pattern定义的时候指定的。select()方法恰好返回一个结果,如果需要返回多个结果,则可以实现PatternFlatSelectFunction。

public

interface

PatternSelectFunction
IN, OUT
extends

Function
,
Serializable
{

/**

* 从给到的事件映射中生成一个结果。这些事件使用他们关联的模式名称作为唯一标识

*/

OUT select(
Map

String
,
List
IN pattern)
throws

Exception
;

 

PatternFlatSelectFunction,不是返回一个OUT,而是使用Collector 把匹配到的事件收集起来。

public
interface
PatternFlatSelectFunction
IN, OUT
extends
Function
,
Serializable
{

/**
* 生成一个或多个结果
*/
void
flatSelect(
Map

String
,
List
IN pattern,
Collector
OUT out)
throws
Exception
;

SelectTimeoutCepOperator、PatternTimeoutFunction

SelectTimeoutCepOperator是在CEPOperatorUtils中调用createTimeoutPatternStream()方法时创建出来。

SelectTimeoutCepOperator中会被算子迭代调用的方法是processMatchedSequences()和processTimedOutSequences()。

模板方法 对应到抽象类AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。

还有FlatSelectTimeoutCepOperator和对应的PatternFlatTimeoutFunction。

public
class
SelectTimeoutCepOperator
IN, OUT1, OUT2, KEY
extends
AbstractKeyedCEPPatternOperator
IN, KEY, OUT1,
SelectTimeoutCepOperator
.
SelectWrapper
IN, OUT1, OUT2 {
private
OutputTag
OUT2 timedOutOutputTag;
public
SelectTimeoutCepOperator
(
TypeSerializer
IN inputSerializer,
boolean
isProcessingTime,
NFACompiler
.
NFAFactory
IN nfaFactory,
final
EventComparator
IN comparator,
AfterMatchSkipStrategy
skipStrategy,
// 参数命名混淆了flat 包括SelectWrapper类中的成员命名
PatternSelectFunction
IN, OUT1 flatSelectFunction,
PatternTimeoutFunction
IN, OUT2 flatTimeoutFunction,
OutputTag
OUT2 outputTag,
OutputTag
IN lateDataOutputTag) {
super
(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
skipStrategy,
new
SelectWrapper
(flatSelectFunction, flatTimeoutFunction),
lateDataOutputTag);
this
.timedOutOutputTag = outputTag;
}

}
public
interface
PatternTimeoutFunction
IN, OUT
extends
Function
,
Serializable
{
OUT timeout(
Map

String
,
List
IN pattern,
long
timeoutTimestamp)
throws
Exception
;
}
public
interface
PatternFlatTimeoutFunction
IN, OUT
extends
Function
,
Serializable
{
void
timeout(
Map

String
,
List
IN pattern,
long
timeoutTimestamp,
Collector
OUT out)
throws
Exception
;

 

CEP和CEPOperatorUtils

CEP是创建PatternStream的工具类,PatternStream只是DataStream和Pattern的组合。

public
class
CEP {

public
static
T
PatternStream
T pattern(
DataStream
T input,
Pattern
T, pattern) {
return
new
PatternStream
(input, pattern);
}

public
static
T
PatternStream
T pattern(
DataStream
T input,
Pattern
T, pattern,
EventComparator
T comparator) {
return
new
PatternStream
(input, pattern, comparator);
}

 

CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被调用的时候,去创建SingleOutputStreamOperator(DataStream)。

public
class
CEPOperatorUtils
{

private
static
IN, OUT, K
SingleOutputStreamOperator
OUT createPatternStream(
final
DataStream
IN inputStream,
final
Pattern
IN, pattern,
final
TypeInformation
OUT outTypeInfo,
final
boolean
timeoutHandling,
final
EventComparator
IN comparator,
final
OperatorBuilder
IN, OUT operatorBuilder) {
final
TypeSerializer
IN inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());

// check whether we use processing time
final
boolean
isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() ==
TimeCharacteristic
.
ProcessingTime
;

// compile our pattern into a NFAFactory to instantiate NFAs later on
final
NFACompiler
.
NFAFactory
IN nfaFactory =
NFACompiler
.compileFactory(pattern, timeoutHandling);

final
SingleOutputStreamOperator
OUT patternStream;

if
(inputStream
instanceof
KeyedStream
) {
KeyedStream
IN, K keyedStream = (
KeyedStream
IN, K ) inputStream;
patternStream = keyedStream.transform(
operatorBuilder.getKeyedOperatorName(),
outTypeInfo,
operatorBuilder.build(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
pattern.getAfterMatchSkipStrategy()));
}
else
{
KeySelector
IN,
Byte
keySelector =
new
NullByteKeySelector
();
patternStream = inputStream.keyBy(keySelector).transform(
operatorBuilder.getOperatorName(),
outTypeInfo,
operatorBuilder.build(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
pattern.getAfterMatchSkipStrategy()
)).forceNonParallel();
}

return
patternStream;
}

FlinkCEP实现步骤 IN: DataSource - DataStream - Transformations - DataStream Pattern: Pattern.begin.where.next.where times PatternStream: CEP.pattern(DataStream, Pattern) DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction) OUT: DataStream - Transformations - DataStream - DataSink

FlinkCEP匹配超时实现步骤

TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,会new一个0字节的Key(上面CEPOperatorUtils源码里有提到)。

KeySelector
IN,
Byte
keySelector =
new

NullByteKeySelector

Pattern最后调用within设置窗口时间。 如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用PatternStream.select( )就可以了。

IN: DataSource - DataStream - Transformations - DataStream - keyBy - KeyedStream Pattern: Pattern.begin.where.next.where within(Time windowTime) PatternStream: CEP.pattern(KeyedStream, Pattern) OutputTag: new OutputTag( ) SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction) DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag) OUT: DataStream - Transformations - DataStream - DataSink

FlinkCEP超时不足

和Flink窗口聚合类似,如果使用事件时间和依赖事件生成的水印向前推进,需要后续的事件到达,才会触发窗口进行计算和输出结果。

FlinkCEP超时完整demo

public
class
CEPTimeoutEventJob
{
private
static
final
String
LOCAL_KAFKA_BROKER =
localhost:9092
;
private
static
final
String
GROUP_ID =
CEPTimeoutEventJob
.
class
.getSimpleName();
private
static
final
String
GROUP_TOPIC = GROUP_ID;

public
static
void
main(
String
[] args)
throws
Exception
{
// 参数
ParameterTool
params =
ParameterTool
.fromArgs(args);

StreamExecutionEnvironment
env =
StreamExecutionEnvironment
.getExecutionEnvironment();
// 使用事件时间
env.setStreamTimeCharacteristic(
TimeCharacteristic
.
EventTime
);
env.enableCheckpointing(
5000
);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.RETAIN_ON_CANCELLATION);
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(
RestartStrategies
.fixedDelayRestart(
5
,
10000
));

// 不使用POJO的时间
final
AssignerWithPeriodicWatermarks
extractor =
new
IngestionTimeExtractor
POJO

// 与Kafka Topic的Partition保持一致
env.setParallelism(
3
);

Properties
kafkaProps =
new
Properties
();
kafkaProps.setProperty(
bootstrap.servers
, LOCAL_KAFKA_BROKER);
kafkaProps.setProperty(
group.id
, GROUP_ID);

// 接入Kafka的消息
FlinkKafkaConsumer011
POJO consumer =
new
FlinkKafkaConsumer011
(GROUP_TOPIC,
new
POJOSchema
(), kafkaProps);
DataStream
POJO pojoDataStream = env.addSource(consumer)
.assignTimestampsAndWatermarks(extractor);
pojoDataStream.print();

// 根据主键aid分组 即对每一个POJO事件进行匹配检测【不同类型的POJO,可以采用不同的within时间】
// 1.
DataStream
POJO keyedPojos = pojoDataStream
.keyBy(
aid
);

// 从初始化到终态-一个完整的POJO事件序列
// 2.
Pattern
POJO, POJO completedPojo =
Pattern
. POJO begin(
init
)
.where(
new
SimpleCondition
POJO () {
private
static
final
long
serialVersionUID =
6847788055093903603L
;

@Override
public
boolean
filter(POJO pojo)
throws
Exception
{
return
02
.equals(pojo.getAstatus());
}
})
.followedBy(
end
)
// .next( end )
.where(
new
SimpleCondition
POJO () {
private
static
final
long
serialVersionUID =
2655089736460847552L
;

@Override
public
boolean
filter(POJO pojo)
throws
Exception
{
return
00
.equals(pojo.getAstatus()) ||
01
.equals(pojo.getAstatus());
}
});

// 找出1分钟内【便于测试】都没有到终态的事件aid
// 如果针对不同类型有不同within时间,比如有的是超时1分钟,有的可能是超时1个小时 则生成多个PatternStream
// 3.
PatternStream
POJO patternStream = CEP.pattern(keyedPojos, completedPojo.within(
Time
.minutes(
1
)));

// 定义侧面输出timedout
// 4.
OutputTag
POJO timedout =
new
OutputTag
POJO (
timedout
) {
private
static
final
long
serialVersionUID =
773503794597666247L
;
};

// OutputTag L timeoutOutputTag, PatternFlatTimeoutFunction T, L patternFlatTimeoutFunction, PatternFlatSelectFunction T, R patternFlatSelectFunction
// 5.
SingleOutputStreamOperator
POJO timeoutPojos = patternStream.flatSelect(
timedout,
new
POJOTimedOut
(),
new
FlatSelectNothing
()
);

// 打印输出超时的POJO
// 6.7.
timeoutPojos.getSideOutput(timedout).print();
timeoutPojos.print();
env.execute(
CEPTimeoutEventJob
.
class
.getSimpleName());
}

/**
* 把超时的事件收集起来
*/
public
static
class
POJOTimedOut
implements
PatternFlatTimeoutFunction
POJO, POJO {
private
static
final
long
serialVersionUID =
4214641891396057732L
;

@Override
public
void
timeout(
Map

String
,
List
POJO map,
long
l,
Collector
POJO collector)
throws
Exception
{
if
(
null
!= map.get(
init
)) {
for
(POJO pojoInit : map.get(
init
)) {
System
.out.println(
timeout init:
+ pojoInit.getAid());
collector.collect(pojoInit);
}
}
// 因为end超时了,还没收到end,所以这里是拿不到end的
System
.out.println(
timeout end:
+ map.get(
end
));
}
}

/**
* 通常什么都不做,但也可以把所有匹配到的事件发往下游;如果是宽松临近,被忽略或穿透的事件就没办法选中发往下游了
* 一分钟时间内走完init和end的数据
*
* @param T
*/
public
static
class
FlatSelectNothing
T
implements
PatternFlatSelectFunction
T, T {
private
static
final
long
serialVersionUID =
3029589950677623844L
;

@Override
public
void
flatSelect(
Map

String
,
List
T pattern,
Collector
T collector) {
System
.out.println(
flatSelect:
+ pattern);
}
}

测试结果(followedBy):

3
POJO{aid=
ID000-0
, a > STYLE000-0
, aname=
NAME-0
, logTime=
1563419728242
, energy=
529.00
, age=
0
, tt=
2019

07

18
, astatus=
02
, createTime=
null
, updateTime=
null
}
3
POJO{aid=
ID000-1
, a > STYLE000-2
, aname=
NAME-1
, logTime=
1563419728783
, energy=
348.00
, age=
26
, tt=
2019

07

18
, astatus=
02
, createTime=
null
, updateTime=
null
}
3
POJO{aid=
ID000-0
, a > STYLE000-0
, aname=
NAME-0
, logTime=
1563419749259
, energy=
492.00
, age=
0
, tt=
2019

07

18
, astatus=
00
, createTime=
null
, updateTime=
null
}
flatSelect: {init=[POJO{aid=
ID000-0
, a > STYLE000-0
, aname=
NAME-0
, logTime=
1563419728242
, energy=
529.00
, age=
0
, tt=
2019

07

18
, astatus=
02
, createTime=
null
, updateTime=
null
}],
end
=[POJO{aid=
ID000-0
, a > STYLE000-0
, aname=
NAME-0
, logTime=
1563419749259
, energy=
492.00
, age=
0
, tt=
2019

07

18
, astatus=
00
, createTime=
null
, updateTime=
null
}]}
timeout init:ID000-
1
3
POJO{aid=
ID000-1
, a > STYLE000-2
, aname=
NAME-1
, logTime=
1563419728783
, energy=
348.00
, age=
26
, tt=
2019

07

18
, astatus=
02
, createTime=
null
, updateTime=
null
}
timeout
end
:
null
3
POJO{aid=
ID000-2
, a > STYLE000-0
, aname=
NAME-0
, logTime=
1563419829639
, energy=
467.00
, age=
0
, tt=
2019

07

18
, astatus=
03
, createTime=
null
, updateTime=
null
}
3
POJO{aid=
ID000-2
, a > STYLE000-0
, aname=
NAME-0
, logTime=
1563419841394
, energy=
107.00
, age=
0
, tt=
2019

07

18
, astatus=
00
, createTime=
null
, updateTime=
null
}
3
POJO{aid=
ID000-3
, a > STYLE000-0
, aname=
NAME-0
, logTime=
1563419967721
, energy=
431.00
, age=
0
, tt=
2019

07

18
, astatus=
02
, createTime=
null
, updateTime=
null
}
3
POJO{aid=
ID000-3
, a > STYLE000-2
, aname=
NAME-0
, logTime=
1563419979567
, energy=
32.00
, age=
26
, tt=
2019

07

18
, astatus=
03
, createTime=
null
, updateTime=
null
}
3
POJO{aid=
ID000-3
, a > STYLE000-2
, aname=
NAME-0
, logTime=
1563419993612
, energy=
542.00
, age=
26
, tt=
2019

07

18
, astatus=
01
, createTime=
null
, updateTime=
null
}
flatSelect: {init=[POJO{aid=
ID000-3
, a > STYLE000-0
, aname=
NAME-0
, logTime=
1563419967721
, energy=
431.00
, age=
0
, tt=
2019

07

18
, astatus=
02
, createTime=
null
, updateTime=
null
}],
end
=[POJO{aid=
ID000-3
, a > STYLE000-2
, aname=
NAME-0
, logTime=
1563419993612
, energy=
542.00
, age=
26
, tt=
2019

07

18
, astatus=
01
, createTime=
null
, updateTime=
null
}]}
3
POJO{aid=
ID000-4
, a > STYLE000-0
, aname=
NAME-0
, logTime=
1563420063760
, energy=
122.00
, age=
0
, tt=
2019

07

18
, astatus=
02
, createTime=
null
, updateTime=
null
}
3
POJO{aid=
ID000-4
, a > STYLE000-0
, aname=
NAME-0
, logTime=
1563420078008
, energy=
275.00
, age=
0
, tt=
2019

07

18
, astatus=
03
, createTime=
null
, updateTime=
null
}
timeout init:ID000-
4
3
POJO{aid=
ID000-4
, a > STYLE000-0
, aname=
NAME-0
, logTime=
1563420063760
, energy=
122.00
, age=
0
, tt=
2019

07

18
, astatus=
02
, createTime=
null
, updateTime=
null
}
timeout
end
:

以上所述是小编给大家介绍的Apache FlinkCEP 实现超时状态监控的步骤,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!


我想要获取技术服务或软件
服务范围:MySQL、ORACLE、SQLSERVER、MongoDB、PostgreSQL 、程序问题
服务方式:远程服务、电话支持、现场服务,沟通指定方式服务
技术标签:数据恢复、安装配置、数据迁移、集群容灾、异常处理、其它问题

本站部分文章参考或来源于网络,如有侵权请联系站长。
数据库远程运维 Apache FlinkCEP 实现超时状态监控的步骤详解