模式API
1. 个体模式
模式(Pattern)其实就是将一组简单事件组合成复杂事件的“匹配规则”。由于流中事件的匹配是有先后顺序的,因此一个匹配规则就可以表达成先后发生的一个个简单事件,按顺序串联组合在一起。
这里的每一个简单事件并不是任意选取的,也需要有一定的条件规则;所以我们就把每个简单事件的匹配规则,叫作“个体模式”(Individual Pattern),例如上面的例子,需要匹配三次连续失败的用户,实际上就是三个个体模式。
2. 量词
个体模式后面可以跟一个“量词”,用来指定循环的次数,在上面例子中,可以在begin或者next之后接量词,注意量词的使用位置。
在Flink CEP中,可以使用不同的方法指定循环模式:
// 匹配事件出现 4 次
pattern.times(4);
// 匹配事件出现 4 次,或者不出现
pattern.times(4).optional();
// 匹配事件出现 2, 3 或者 4 次
pattern.times(2, 4);
// 匹配事件出现 2, 3 或者 4 次,并且尽可能多地匹配,有4次匹配4次
pattern.times(2, 4).greedy();
// 匹配事件出现 2, 3, 4 次,或者不出现
pattern.times(2, 4).optional();
// 匹配事件出现 2, 3, 4 次,或者不出现;并且尽可能多地匹配
pattern.times(2, 4).optional().greedy();
// 匹配事件出现 1 次或多次
pattern.oneOrMore();
// 匹配事件出现 1 次或多次,并且尽可能多地匹配
pattern.oneOrMore().greedy();
// 匹配事件出现 1 次或多次,或者不出现
pattern.oneOrMore().optional();
// 匹配事件出现 1 次或多次,或者不出现;并且尽可能多地匹配
pattern.oneOrMore().optional().greedy();
// 匹配事件出现 2 次或多次
pattern.timesOrMore(2);
// 匹配事件出现 2 次或多次,并且尽可能多地匹配
pattern.timesOrMore(2).greedy();
// 匹配事件出现 2 次或多次,或者不出现
pattern.timesOrMore(2).optional()
// 匹配事件出现 2 次或多次,或者不出现;并且尽可能多地匹配
pattern.timesOrMore(2).optional().greedy();
2.1 oneOrMore()
匹配事件出现一次或多次,假设a是一个个体模式,a.oneOrMore()表示可以匹配1个或多个a的事件组合,匹配结果有三个[a,a,a]、[a,a]、[a],也是说oneOrMore会以每一个匹配的事件为开头,返回最大的匹配项,第一个a,它可以匹配三个a,结束,轮到第二个a,它可以匹配两个a(第二个a本身和第三个a),结束,轮到最后一个a,只有它自己了,结束。
2.2 times(times)
匹配事件发生特定次数(times),例如 a.times(3)表示aaa
2.3 times(fromTimes,toTimes)
指定匹配事件出现的次数范围,最小次数为fromTimes,最大次数为toTimes。例如a.times(2, 3)可以匹配aa,aaa
2.4 greedy()
只能用在循环模式后,使当前循环模式变得“贪心”(greedy),也就是总是尽可能多地去匹配。例如a.times(2, 4).greedy(),如果出现了连续4个a,那么会直接把aaaa检测出来进行处理,其他任意2个a是不算匹配事件的。
2.5 optional()
使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足。
3. 条件where
对于每个个体模式,匹配事件的核心在于定义匹配条件,也就是选取事件的规则。FlinkCEP会按照这个规则对流中的事件进行筛选,判断是否接受当前的事件。
3.1 限定子类型
调用subtype()
方法可以为当前模式增加子类型限制条件:
pattern.subtype(SubEvent.class);
这里SubEvent是流中数据类型Event的子类型。这时只有当事件是SubEvent类型时,才可以满足当前模式pattern的匹配条件。
3.2 简单条件(Simple Conditions)
简单条件是最简单的匹配规则,只根据当前事件的特征来决定是否接受它。这在本质上其实就是一个filter操作。代码中我们为where()
方法传入一个SimpleCondition的实例作为参数。SimpleCondition是表示“简单条件”的抽象类,内部有一个filter()
方法,唯一的参数就是当前事件。所以它可以当作FilterFunction来使用。
3.3 迭代条件(Iterative Conditions)
在 Flink CEP 中,提供了 IterativeCondition 抽象类。这其实是更加通用的条件表达,查看源码可以发现, .where()方法本身要求的参数类型就是 IterativeCondition;而之前 的SimpleCondition 是它的一个子类
在 IterativeCondition 中同样需要实现一个 filter()方法,不过与 SimpleCondition 中不同的是,这个方法有两个参数:除了当前事件之外,还有一个上下文 Context。调用这个上下文的.getEventsForPattern()方法,传入一个模式名称,就可以拿到这个模式中已匹配到的所有数据了。
3.4 组合条件(Combining Conditions)
独立定义多个条件,然后在外部把它们连接起来,构成一个“组合条件”(Combining Condition) 最简单的组合条件,就是where()
后面再接一个where()
。因为前面提到过,一个条件就像是一个filter操作,所以每次调用where()
方法都相当于做了一次过滤,连续多次调用就表示多重过滤,最终匹配的事件自然就会同时满足所有条件。这相当于就是多个条件的“逻辑与”(AND)。而多个条件的逻辑或(OR),则可以通过where()
后加一个or()
来实现。这里的or()
方法与where()
一样,传入一个IterativeCondition作为参数,定义一个独立的条件;它和之前where()
定义的条件只要满足一个,当前事件就可以成功匹配。
3.5 终止条件(Stop Conditions)
对于循环模式而言,还可以指定一个“终止条件”(Stop Condition),表示遇到某个特定事件时当前模式就不再继续循环匹配了终止条件的定义是通过调用模式对象的until()
方法来实现的,同样传入一个IterativeCondition作为参数。需要注意的是,终止条件只与oneOrMore()
或者oneOrMore().optional()
结合使用。因为在这种循环模式下,我们不知道后面还有没有事件可以匹配,只好把之前匹配的事件作为状态缓存起来继续等待,这等待无穷无尽;如果一直等下去,缓存的状态越来越多,最终会耗尽内存。所以这种循环模式必须有个终点,当until()
指定的条件满足时,循环终止,这样就可以清空状态释放内存了。
4. 组合模式
有了定义好的个体模式,就可以尝试按一定的顺序把它们连接起来,定义一个完整的复杂事件匹配规则了。这种将多个个体模式组合起来的完整模式,就叫作“组合模式”(Combining Pattern),为了跟个体模式区分有时也叫作“模式序列”(Pattern Sequence)。
组合模式就是一个“模式序列”,是用诸如begin、next、followedBy 等表示先后顺序的“连接词”将个体模式串连起来得到的。在这样的语法调用中,每个事件匹配的条件是什么、各个事件之间谁先谁后、近邻关系如何都定义得一目了然。每一个“连接词”方法调用之后,得到的都仍然是一个Pattern的对象;所以从Java对象的角度看,组合模式与个体模式是一样的,都是Pattern。
4.1 初始模式(Initial Pattern)
所有的组合模式,都必须以一个“初始模式”开头;而初始模式必须通过调用Pattern的静态方法begin()
来创建。如下所示:
Pattern<Event, ?> start = Pattern.<Event>begin("start");
这里我们调用 Pattern 的.begin()方法创建了一个初始模式。传入的 String 类型的参数就是模式的名称;而 begin 方法需要传入一个类型参数,这就是模式要检测流中事件的基本类型,这里我们定义为 Event。调用的结果返回一个 Pattern 的对象实例。Pattern 有两个泛型参数,第一个就是检测事件的基本类型 Event,跟 begin 指定的类型一致;第二个则是当前模式里事件的子类型,由子类型限制条件指定。我们这里用类型通配符(?)代替,就可以从上下文直接推断了。
4.2 近邻条件(Contiguity Conditions)
在初始模式之后,我们就可以按照复杂事件的顺序追加模式,组合成模式序列了。模式之间的组合是通过一些“连接词”方法实现的,这些连接词指明了先后事件之间有着怎样的近邻关系,这就是所谓的“近邻条件”(Contiguity Conditions,也叫“连续性条件”)
Flink CEP 中提供了三种近邻关系:严格近邻(Strict Contiguity)、宽松近邻(Relaxed Contiguity)、非确定性宽松近邻(Non-Deterministic Relaxed Contiguity)
4.3 严格近邻
匹配的事件严格地按顺序一个接一个出现,中间不会有任何其他事件。代码中对应的就是Pattern的next()
方法,名称上就能看出来,“下一个”自然就是紧挨着的。
4.4 宽松近邻
宽松近邻只关心事件发生的顺序,而放宽了对匹配事件的“距离”要求,也就是说两个匹配的事件之间可以有其他不匹配的事件出现。代码中对应followedBy()
方法,很明显这表示“跟在后面”就可以,不需要紧紧相邻。
4.5 非确定性宽松近邻
从全局找所有符合匹配模式的事件队列。
4.6 其他限制条件
除了上面提到的next()
、followedBy()
、followedByAny()
可以分别表示三种近邻条件,我们还可以用否定的“连接词”来组合个体模式。主要包括:
- notNext() 表示前一个模式匹配到的事件后面,不能紧跟着某种事件。
- notFollowedBy()
表示前一个模式匹配到的事件后面,不会出现某种事件。这里需要注意,由于notFollowedBy()
是没有严格限定的;流数据不停地到来,我们永远不能保证之后“不会出现某种事件”。所以一个模式序列不能以notFollowedBy()
结尾,这个限定条件主要用来表示“两个事件中间不会出现某种事件”。 - within()
方法传入一个时间参数,这是模式序列中第一个事件到最后一个事件之间的最大时间间隔,只有在这期间成功匹配的复杂事件才是有效的。一个模式序列中只能有一个时间限制,调用within()
的位置不限;如果多次调用则会以最小的那个时间间隔为准。
4.7 循环模式中的近邻条件
在循环模式中,近邻关系同样有三种:严格近邻、宽松近邻以及非确定性宽松近邻。对于定义了量词(如oneOrMore()
、times()
)的循环模式,默认内部采用的是宽松近邻。也就是说,当循环匹配多个事件时,它们中间是可以有其他不匹配事件的;相当于用单例模式分别定义、再用followedBy()
连接起来。
- consecutive()
循环模式中的匹配事件增加严格的近邻条件,保证所有匹配事件是严格连续的。也就是说,一旦中间出现了不匹配的事件,当前循环检测就会终止。这起到的效果跟模式序列中的next()
一样,需要与循环量词times()
、oneOrMore()
配合使用。 - allowCombinations()
除严格近邻外,也可以为循环模式中的事件指定非确定性宽松近邻条件,表示可以重复使用已经匹配的事件。这需要调用allowCombinations()
方法来实现,实现的效果与followedByAny()
相同。
4.8 匹配后跳过策略
在Flink CEP中,由于有循环模式和非确定性宽松近邻的存在,同一个事件有可能会重复利用,被分配到不同的匹配结果中。这样会导致匹配结果规模增大,有时会显得非常冗余。当然,非确定性宽松近邻条件,本来就是为了放宽限制、扩充匹配结果而设计的;我们主要是针对循环模式来考虑匹配结果的精简。之前已经讲过,如果对循环模式增加了greedy()
的限制,那么就会“尽可能多地”匹配事件,这样就可以砍掉那些子集上的匹配了。不过这种方式还是略显简单粗暴,如果我们想要精确控制事件的匹配应该跳过哪些情况,那就需要制定另外的策略了。在Flink CEP中,提供了模式的“匹配后跳过策略”(After Match Skip Strategy),专门用来精准控制循环模式的匹配结果。这个策略可以在Pattern的初始模式定义中,作为begin()
的第二个参数传入:
Pattern.begin("start", AfterMatchSkipStrategy.noSkip())
.where(...)
具体的跳过策略有5种:
public abstract class AfterMatchSkipStrategy implements Serializable {
private static final long serialVersionUID = -4048930333619068531L;
public static SkipToFirstStrategy skipToFirst(String patternName) {
return new SkipToFirstStrategy(patternName, false);
}
public static SkipToLastStrategy skipToLast(String patternName) {
return new SkipToLastStrategy(patternName, false);
}
public static SkipPastLastStrategy skipPastLastEvent() {
return SkipPastLastStrategy.INSTANCE;
}
public static AfterMatchSkipStrategy skipToNext() {
return SkipToNextStrategy.INSTANCE;
}
public static NoSkipStrategy noSkip() {
return NoSkipStrategy.INSTANCE;
}
}
- 不跳过(NO_SKIP)
代码调用AfterMatchSkipStrategy.noSkip()
。这是默认策略,所有可能的匹配都会输出 - 跳至下一个(SKIP_TO_NEXT)
代码调用AfterMatchSkipStrategy.skipToNext()
,找到一个匹配项,跳至下一个元素 - 跳过所有子匹配(SKIP_PAST_LAST_EVENT)
代码调用AfterMatchSkipStrategy.skipPastLastEvent()
,只返回第一个匹配也即跳过所有子匹配,这是最为精简的跳过策略 - 跳至第一个(SKIP_TO_FIRST[])
代码调用AfterMatchSkipStrategy.skipToFirst(“second”)
,这里传入一个参数,指明跳至哪个模式的第一个匹配事件 - 跳至最后一个(SKIP_TO_LAST[])
代码调用AfterMatchSkipStrategy.skipToLast(“second”)
,同样传入一个参数,指明跳至哪个模式的最后一个匹配事件
5. 模式的检测处理
5.1 模式应用到数据流
将模式应用到事件流上的代码非常简单,只要调用CEP类的静态方法pattern()
,将数据流(DataStream)和模式(Pattern)作为两个参数传入就可以了。最终得到的是一个 PatternStream:
DataStream<Event> inputStream = ...
Pattern<Event, ?> pattern = ...
PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);
这里的DataStream,也可以通过keyBy进行按键分区得到KeyedStream,接下来对复杂事件的检测就会针对不同的key单独进行了。 模式中定义的复杂事件,发生是有先后顺序的,这里“先后”的判断标准取决于具体的时间语义。默认情况下采用事件时间语义,那么事件会以各自的时间戳进行排序;如果是处理时间语义,那么所谓先后就是数据到达的顺序。对于时间戳相同或是同时到达的事件,我们还可以在CEP.pattern()
中传入一个比较器作为第三个参数,用来进行更精确的排序:
// 可选的事件比较器
EventComparator<Event> comparator = ...
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
得到PatternStream后,接下来要做的就是对匹配事件的检测处理了。
5.2 处理匹配事件
基于PatternStream可以调用一些转换方法,对匹配的复杂事件进行检测和处理,并最终得到一个正常的DataStream。这个转换的过程与窗口的处理类似:将模式应用到流上得到PatternStream,就像在流上添加窗口分配器得到WindowedStream;而之后的转换操作,就像定义具体处理操作的窗口函数,对收集到的数据进行分析计算,得到结果进行输出,最后回到DataStream 的类型来 PatternStream的转换操作主要可以分成两种:简单便捷的选择提取(select)操作,和更加通用、更加强大的处理(process)操作。与DataStream的转换类似,具体实现也是在调用API时传入一个函数类:选择操作传入的是一个 PatternSelectFunction,处理操作传入的则是一个PatternProcessFunction
5.3 匹配事件的选择提取(select)
处理匹配事件最简单的方式,就是从PatternStream中直接把匹配的复杂事件提取出来,包装成想要的信息输出,这个操作就是“选择”(select) 代码中基于PatternStream直接调用
select()
方法,传入一个 PatternSelectFunction 作为参数:
public interface PatternSelectFunction<IN, OUT> extends Function, Serializable {
OUT select(Map<String, List<IN>> var1) throws Exception;
}
它会将检测到的匹配事件保存在一个Map里,对应的key就是这些事件的名称。这里的“事件名称”就对应着在模式中定义的每个个体模式的名称;而个体模式可以是循环模式,一个名称会对应多个事件,所以最终保存在Map里的value就是一个事件的列表(List)
5.4 flatSelect
PatternStream 还有一个类似的方法是.flatSelect(),传入的参数是一个PatternFlatSelectFunction。从名字上就能看出,这是PatternSelectFunction的“扁平化”版本;内部需要实现一个
flatSelect()
方法,它与之前 select()
的不同就在于没有返回值,而是多了一个收集器(Collector)参数out,通过调用out.collet()
方法就可以实现多次发送输出数据了
public interface PatternFlatSelectFunction<IN, OUT> extends Function, Serializable {
void flatSelect(Map<String, List<IN>> var1, Collector<OUT> var2) throws Exception;
}
5.5 process
@PublicEvolving
public abstract class PatternProcessFunction<IN, OUT> extends AbstractRichFunction {
public PatternProcessFunction() {
}
public abstract void processMatch(Map<String, List<IN>> var1, Context var2, Collector<OUT> var3) throws Exception;
public interface Context extends TimeContext {
<X> void output(OutputTag<X> var1, X var2);
}
}
5.6 处理超时事件
复杂事件的检测结果一般只有两种:要么匹配,要么不匹配。检测处理的过程具体如下: (1)如果当前事件符合模式匹配的条件,就接受该事件,保存到对应的Map中。
(2)如果在模式序列定义中,当前事件后面还应该有其他事件,就继续读取事件流进行检测;如果模式序列的定义已经全部满足,那么就成功检测到了一组匹配的复杂事件,调用PatternProcessFunction的processMatch()
方法进行处理。
(3)如果当前事件不符合模式匹配的条件,就丢弃该事件。
(4)如果当前事件破坏了模式序列中定义的限制条件,比如不满足严格近邻要求,那么当前已检测的一组部分匹配事件都被丢弃,重新开始检测。
不过在有时间限制的情况下,需要考虑的问题会有一点特别。比如我们用within()
指定了模式检测的时间间隔,超出这个时间当前这组检测就应该失败了。然而这种“超时失败”跟真正的“匹配失败”不同,它其实是一种“部分成功匹配”;因为只有在开头能够正常匹配的前提下,没有等到后续的匹配事件才会超时。所以往往不应该直接丢弃,而是要输出一个提示或报警信息。这就要求我们有能力捕获并处理超时事件。
5.7 使用PatternProcessFunction的侧输出流
在Flink CEP中,提供了一个专门捕捉超时的部分匹配事件的接口 , 叫 作TimedOutPartialMatchHandler。这个接口需要实现一个processTimedOutMatch()
方法,可以将超时的、已检测到的部分匹配事件放在一个Map中,作为方法的第一个参数;方法的第二个参数则是PatternProcessFunction的上下文Context。所以这个接口必须与 PatternProcessFunction结合使用,对处理结果的输出则需要利用侧输出流来进行。代码中的调用方式如下:
class MyPatternProcessFunction extends PatternProcessFunction<Event, String> implements TimedOutPartialMatchHandler<Event> {
// 正常匹配事件的处理
@Override
public void processMatch(Map<String, List<Event>> match, Context ctx,
Collector<String> out) throws Exception{
...
}
// 超时部分匹配事件的处理
@Override
public void processTimedOutMatch(Map<String, List<Event>> match, Context ctx)
throws Exception{
Event startEvent = match.get("start").get(0);
OutputTag<Event> outputTag = new OutputTag<Event>("time-out"){};
ctx.output(outputTag, startEvent);
}
}
我们在processTimedOutMatch()
方法中定义了一个输出标签(OutputTag)。调用ctx.output()
方法,就可以将超时的部分匹配事件输出到标签所标识的侧输出流了