zl程序教程

您现在的位置是:首页 >  其他

当前栏目

FlinkCEP的底层理论:NFA-b Automaton原理介绍

原理 介绍 理论 底层
2023-09-11 14:16:24 时间

1. 基本概念

1.1. NFAb介绍

FlinkCEP是基于《Efficient Pattern Matching over Event Streams》这篇论文的思想实现的。

该论文提出了一种在事件流上进行高效模式匹配的方法,即带匹配缓存非确定有限自动机【Non-deterministic finite automaton】,又称为NFAb自动机。

1.2. SASE+

SASE+语言:一个复杂的事件语言,它支持事件流上的Kleene闭包,并对该语言的可表达性提供了形式化的分析。
SASE+是一种专门用来描述CEP pattern的通用语言

1.3. CEP模式操作符

CEP模式操作符
【org.apache.flink.cep.operator.CepOperator】

  • 针对键控输入流【keyed input stream】,对于每个键,CepOperator创建一个NFA和一个优先队列来缓冲顺序乱序的事件,这两种数据结构都使用托管键控状态进行存储。
  • 针对于非键控输入流,CepOperator创建一个的全局NFA。

当事件被处理时,它更新NFA的内部状态机。

属于部分匹配序列的事件被保存在内部的SharedBuffer缓冲区【一个内存优化的数据结构】中。
当包含事件的所有匹配序列时碰到如下情况时,将删除缓冲区中的事件:

  • emitted (success)
  • discarded (patterns containing NOT)
  • timed-out (windowed patterns)

2. NFAbAutomaton原理

2.1. NFAbAutomaton的定义与构造

在大学《形式语言与自动机》课程中的非确定有限状态机(NFA),用一句话概括就是:对于每个<状态,输入符号>二元组,其状态转移可以有多个,而不是确定的一个。
NFAbAutomaton的定义与普通NFA略有不同,为五元组:A = (Q, E, θ, q1, F)
说明如下:

  • Q: 表示状态集合 a set of states
  • E: 表示状态转移的有向边集合 a set of directed edges
  • θ: 表示状态转移的公式集合,与E共同作用 a set of formulas
  • q1: 表示边的初始状态;labelling those edges a start state
  • F: 表示最终状态 a final state

下面通过实例来构造NFAbAutomaton。
先通过SASE+语言]定义如下的股票趋势事件模式:

PATTERN SEQ(Stock+ a[ ], Stock b)
WHERE skip_till_next_match(a[ ], b) {
 [symbol]
 and a[1].volume > 1000
 and a[i].price > avg(a[..i-1].price)
 and b.volume < 80%*a[a.LEN].volume }
WITHIN 1 hour

此事件模式以1小时作为时间窗口的长度,以“交易量大于1000”作为匹配序列的起始,且要求序列中股票的最近价格必须高于之前所有交易价格的均值,当检测到该股票的交易量下跌到最近一次交易量的80%以下时,则成功匹配整个事件模式。

根据此股票趋势事件模式,构造出NFAbAutomaton,如下图所示。这也是Flink CEP中NFACompiler组件需要做的事情。
在这里插入图片描述
匹配序列a[]的生成过程 就是构造符合谓词约束的事件的Kleene +闭包的过程
NFAbAutomaton的每个状态都有各自的匹配缓存,用于在运行时存储当前的匹配结果。

the start state, a[1], is where the matching process begins. It awaits input to start the Kleene plus and to select an event into the a[1] unit of the match buffer. At the next state a[i], it attempts to select another event into the a[i] (i > 1) unit of the buffer. The subsequent state b denotes that the matching process has fulfilled the Kleene plus (for a particular match) and is ready to process the next pattern component. The final state, F, represents the completion of the process, resulting in the creation of a pattern match.

  • 初始状态a[1]是匹配过程开始的位置,即NFAbAutomaton的初始状态。它等待输入以启动Kleene plus并将一个事件【交易量大于1000的事件】选择到匹配缓冲区的a[1]单元中。
  • 在下一个状态a[i]时,它尝试选择另一个事件【最近价格高于之前所有交易价格的均值】到缓冲区的a[i] (i > 1)单元中。a[i]是正在构造Kleene +的状态
  • 随后的状态b表示匹配过程已经完成了Kleene +(对于特定的匹配)闭包,并准备处理下一个模式组件【交易量下跌到最近一次交易量的80%以下】。
  • 最后的状态F表示流程的完成,从而创建模式匹配。

2.2. 状态转移语义

复杂事件模式的匹配过程:本质上是输入事件流驱动NFAbAutomaton进行状态转移的过程

根据θ集合定义的条件,在有向边集合E上可以定义4种状态转移语义:

  • begin:消费输入事件,存入缓存,并转移到下一个状态;
  • take:消费输入事件,存入缓存,并保持当前状态;
  • ignore:忽略输入事件,不存入缓存,并保持当前状态;
  • proceed:感知输入事件,转移到下一个状态,同时保留该事件给下一个状态处理。

结合这4种状态转移语义,就可以读懂上图中的转移公式了。

FlinkCEP的StateTransitionAction定义中没有begin语义,仅有take、ignore和proceed语义,但是它和NFAbAutomaton是等价的

/** Set of actions when doing a state transition from a {@link State} to another. */
public enum StateTransitionAction {
    TAKE, // take the current event and assign it to the current state
    IGNORE, // ignore the current event
    PROCEED // do the state transition and keep the current event for further processing (epsilon transition)
}

2.3. 事件选择策略

事件选择策略:指选择符合条件的事件进入正闭包——即扩展匹配序列的方法。在时间窗口的限制之内,常用的有以下三种策略。

  • Strict contiguity(严格连续):在最严格的事件选择策略中,两个选定的事件必须在输入流中是连续的。这种要求在正则表达式匹配字符串、DNA序列等时很常见。
    • 严格按顺序选择所有符合条件的事件,途中不能出现不符合条件的事件
    • 对应FlinkCEP API中的Pattern.next()/notNext();
  • Partition contiguity:两个选定的事件不需要是连续的; 但是如果事件是根据一个条件在概念上划分的,那么在同一分区中,下一个相关事件必须与前一个相关事件相邻
    • 例如示例中的[symbol],通常用于形成分区。然而如果事件模式的目的是检测价格上涨的总体趋势,尽管存在一些局部波动值,那么分区相邻可能不够灵活导致无法支持。
    • 对应FlinkCEP在键控输入流【keyed input stream】上使用Strict contiguity
  • skip till next match(宽松连续):进一步宽松,完全删除连续性要求: 所有不相关的事件将被跳过,直到读取下一个相关事件
    • 按顺序选择所有符合条件的事件,而途中不符合条件的事件被忽略,
    • 对应FlinkCEP API中的Pattern.followedBy()/notFollowedBy()。上述SASE+语言描述的pattern使用的就是这个策略;
  • skip till any match(可变宽松连续):Finally, skip till any match relaxes the previous one by further allowing non-deterministic actions on relevant events
    • 在skip till next match的基础上,还允许忽略一些符合条件的事件,以尽量延长匹配序列的长度,
    • 对应FlinkCEP API中的Pattern.followedByAny()。

以skip till next match策略为例,给出如下的示例数据,可以产生3个匹配序列R1、R2、R3,如图所示。
在这里插入图片描述

2.3. 共享版本匹配缓存

仍然考虑上一节的图,回顾一下a[i]状态的take和proceed转移逻辑:

θ*a[i]_take = θa[i]_take ∧ a[i].time<a[1].time+1 hour
θ*a[i]_proceed = θb_begin ∨ (¬θ*a[i]_take ∧ ¬θ*a[i]_ignore)

可见,在e6到达NFA时,可以同时满足a[i]_take和a[i]_proceed的转移(这里正好体现出了NFA的不确定性),所以原本的一个序列会在此分裂成两个:其中一个(R1)终止匹配,另一个(R3)继续匹配。同理,当e3到达NFA时,同时满足a[1]_begin和a[i]_take的转移,所以又会出现一个序列R2。

由上可知,这些序列之间的重合是比较大的,如果都按原样存储在匹配缓存中,会造成比较大的膨胀。为了避免这个问题,论文中设计了一种科学的缓存结构,称为shared versioned match buffer,即“共享版本匹配缓存”,如下图所示:
在这里插入图片描述
其中图a、b、c是原始的R1、R2、R3缓存,图d则是整合在一起的共享版本缓存
它会将所有序列的前向指针附加上一个版本号(采用杜威十进制法,点号分隔),并且遵循以下两个规则:

  • 迁移到下一个状态时,版本号增加一位,如a[1]状态的版本号是1(为了符合习惯写作1.0),a[i]状态的版本号是1.0、1.1,b状态的版本号是1.0.0、1.1.0……以此类推;
  • 当序列发生分裂时,处于当前状态的版本号位加1。例如e3事件产生了2.0版本,e6事件产生了1.1版本。
    依照这种规则,就可以根据前向指针上版本号的递增规律和前缀来回溯出正确的序列了。
    FlinkCEP中将此缓存设计为SharedBuffer类,但是版本的设计有些不同。

2.4. 计算状态

对于每一个序列,NFAbAutomaton还需要维护一些最基础的状态数据,以方便执行状态转移和匹配逻辑,论文中将其称为computation state,即计算状态。

基础的计算状态结构如下图所示,包含以下数据项:

  • 当前的版本号;
  • 当前的状态;
  • 指向匹配缓存中最近一个事件的指针;
  • 整个序列的起始时间;
  • 其他必要的上下文数据存储。以股票趋势数据为例,会维护Kleene +闭包内的事件数、价格之和以及交易量等。

运行计算状态【Computation state of runs】如下:
在这里插入图片描述
Flink CEP框架用ComputationState类来维护计算状态,大体思路与论文相同。