目录

入门指南

 Pattern API

单个模式

Quantifiers

条件

组合模式

循环模式中的连续性

模式组

匹配后跳过策略 

检测模式

从模式选择

处理超时的部分模式

简单的API

 CEP library 中的时间

Event Time处理迟到的数据

时间Context

Examples

从较老的Flink版本迁移(1.3之前)

迁移到1.4 +

迁移到1.3 +


FlinkCEP是在Flink之上实现的复杂事件处理(CEP)库。在无穷无尽的事件流中检测事件模式,获得数据中重要的内容。

这个页面描述了Flink CEP中可用的API调用。首先介绍Pattern API,它允许指定希望在流中检测的模式,然后介绍如何在匹配事件序列时进行检测和操作。然后,介绍CEP库在处理事件时间延迟时所做的假设,以及如何从较早的Flink版本迁移到Flink-1.3。

入门指南

将 FlinkCEP 依赖项添加到项目的pom.xml中。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep-scala_2.11</artifactId>
  <version>1.9.0</version>
</dependency>

 注意:FlinkCEP的 jar 不是Flink安装包的一部分,将FlinkCEP加到程序中:Flink基础之配置Maven依赖、连接、库

请注意,要应用模式匹配的 DataStream 中的事件必须实现正确的 equals()和 hashCode()方法,因为 FlinkCEP 使用这两个方法来比较和匹配事件。

val input: DataStream[Event] = ...

val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
  .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
  .followedBy("end").where(_.getName == "end")

val patternStream = CEP.pattern(input, pattern)

val result: DataStream[Alert] = patternStream.process(
    new PatternProcessFunction[Event, Alert]() {
        override def processMatch(
              `match`: util.Map[String, util.List[Event]],
              ctx: PatternProcessFunction.Context,
              out: Collector[Alert]): Unit = {
            out.collect(createAlertFrom(pattern))
        }
    })

 Pattern API

Pattern API 允许定义要从输入流中提取的复杂模式序列。

每个复杂模式序列由多个简单模式组成,即寻找具有相同属性的单个事件的模式。从现在开始,我们将把这些简单的模式称为模式,以及我们在流中搜索的最终复杂模式序列称为模式序列。您可以将模式序列视为此类模式的图,其中从一个模式到下一个模式的转换是基于用户指定的条件进行的,例如event.getName().equals("end")。匹配是一个输入事件序列,它通过一个有效的序列访问复杂模式图的所有模式。

注意:

每个模式必须有一个惟一的名称,用来标识匹配的事件。

模式名称不能包含字符“:”。

在本节的其余部分中,我们将首先描述如何定义单个模式,然后介绍如何将单个模式组合成复杂模式。

单个模式

模式可以是单例模式,也可以是循环模式。单例模式接受单个事件,而循环模式可以接受多个事件。在模式匹配符号中,“a b+ c?”d"(或"a",后面跟着一个或多个"b",后面可选跟着一个"c",后面跟着一个"d"), a, c?, d是单例模式,而b+是循环模式。默认情况下,模式是单例模式,您可以使用 Quantifiers 将其转换为循环模式。每个模式都可以基于一个或多个接受事件的条件。

Quantifiers

在FlinkCEP中,可以使用 pattern.oneOrMore() 方法指定循环模式,用于期望给定事件发生一次或多次的模式(例如前面提到的b+);使用 pattern.times(#ofTimes) 方法对于期望某一特定类型事件发生特定次数的模式,例如4 a;使用 pattern.times(#fromTimes, #toTimes) 方法对于期望给定类型事件的特定最小出现次数和最大出现次数的模式,例如2-4 as。

可以使用 pattern.greedy() 方法重复循环模式,但还不能重复组模式。可以使用 pattern.optional() 方法将所有模式(无论是否循环)设置为可选的。

对于名为 start 的模式,以下是有效的 quantifiers:

// expecting 4 occurrences
 start.times(4)

 // expecting 0 or 4 occurrences
 start.times(4).optional()

 // expecting 2, 3 or 4 occurrences
 start.times(2, 4)

 // expecting 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).greedy()

 // expecting 0, 2, 3 or 4 occurrences
 start.times(2, 4).optional()

 // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).optional().greedy()

 // expecting 1 or more occurrences
 start.oneOrMore()

 // expecting 1 or more occurrences and repeating as many as possible
 start.oneOrMore().greedy()

 // expecting 0 or more occurrences
 start.oneOrMore().optional()

 // expecting 0 or more occurrences and repeating as many as possible
 start.oneOrMore().optional().greedy()

 // expecting 2 or more occurrences
 start.timesOrMore(2)

 // expecting 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).greedy()

 // expecting 0, 2 or more occurrences
 start.timesOrMore(2).optional()

 // expecting 0, 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).optional().greedy()

条件

对于每个模式,你可以指定一个条件,一个传入的事件必须满足,以便被模式“接受”,例如,它的值应该大于5,或大于先前接受的事件的平均值。可以通过 pattern.where()、pattern.or() 或 pattern.until() 方法指定事件属性的条件。可以是 IterativeConditions,也可以是 SimpleConditions。

Iterative Conditions:这是最常见的情况。这就是您如何指定一个条件来接受后续事件的方法,该条件基于先前接受的事件的属性或其中一个子集上的统计信息。

下面是一个迭代条件的代码,它接受名为“middle”的模式的下一个事件,如果其名称以“foo”开头,并且该模式之前接受的事件的价格与当前事件的价格之和不超过5.0的值。迭代条件可能非常强大,特别是与循环模式相结合时,例如oneOrMore()。

middle.oneOrMore()
    .subtype(classOf[SubEvent])
    .where(
        (value, ctx) => {
            lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
            value.getName.startsWith("foo") && sum + value.getPrice < 5.0
        }
    )

对ctx.getEventsForPattern(…)的调用将为给定的潜在匹配查找以前接受的所有事件。此操作的成本可能有所不同,因此在实现您的条件时,请尽量减少使用它。

描述的上下文还提供了对事件时间特征的访问。

Simple Conditions:这种类型的条件扩展了前面提到的 IterativeCondition 类,并仅根据事件本身的属性来决定是否接受事件。

start.where(event => event.getName.startsWith("foo"))

最后,还可以通过pattern.subtype(子类)方法将接受的事件类型限制为初始事件类型(这里是事件)的子类型。

start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)

Combining Conditions:如上所示,可以将子类型条件与其他条件组合起来。这适用于任何情况。您可以通过顺序调用where()来任意组合条件。最终的结果将是符合逻辑的,并且是各个条件的结果。要使用OR组合条件,可以使用OR()方法,如下所示。

pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)

Stop condition:对于循环模式(oneOrMore()和oneOrMore().optional()),您还可以指定一个停止条件,例如,接受大于5的事件,直到值的和小于50。

为了更好地理解它,请看下面的示例。鉴于

  • 例如“(a+直到b)”(一个或多个a直到b)
  • 传入事件序列“a1”“c”“a2”“b”“a3”
  • 库将输出结果:{a1 a2} {a1} {a2} {a3}。

可以看到,由于停止条件,{a1a2a3}或{a2a3}没有返回。

Pattern OperationDescription
where(condition)

定义当前模式的条件。要匹配模式,事件必须满足条件。多个连续的where()子句导致它们的条件被关联:

pattern.where(event => ... /* some condition */)
or(condition)

添加一个与现有条件相匹配的新条件。事件只有通过至少一个条件才能匹配模式:

pattern.where(event => ... /* some condition */)
    .or(event => ... /* alternative condition */)
until(condition)

指定循环模式的停止条件。这意味着,如果发生与给定条件匹配的事件,则不再接受模式中的事件。

只适用于oneOrMore()方法

NOTE: 它允许在基于事件的条件下清除相应模式的状态。

pattern.oneOrMore().until(event => ... /* some condition */)
subtype(subClass)

定义当前模式的子类型条件。一个事件只能匹配模式,如果它是这个子类型:

pattern.subtype(classOf[SubEvent])
oneOrMore()

指定此模式期望匹配事件至少出现一次。

默认情况下使用松散的内部连续(在后续事件之间)。

NOTE: 建议使用until()或within()来启用状态清除

pattern.oneOrMore()
timesOrMore(#times)

指定此模式期望匹配事件至少出现#次。

默认情况下使用松散的内部连续(在后续事件之间)。

pattern.timesOrMore(2)
times(#ofTimes)

指定此模式期望匹配事件的精确出现次数。

默认情况下使用松散的内部连续(在后续事件之间)。

pattern.times(2)
times(#fromTimes, #toTimes)

指定此模式期望匹配事件在#fromTimes和#toTimes之间出现。

默认情况下使用松散的内部连续(在后续事件之间)。

pattern.times(2, 4)
optional()

指定此模式是可选的,即它可能根本不会发生。这适用于前面提到的所有量词。

pattern.oneOrMore().optional()
greedy()

指定此模式是贪婪的,即它将重复尽可能多的。这仅适用于量词,目前不支持组模式。

pattern.oneOrMore().greedy()

组合模式

既然您已经了解了单个模式的外观,现在就该看看如何将它们组合成完整的模式序列。

一个模式序列必须从一个初始模式开始,如下图所示:

val start : Pattern[Event, _] = Pattern.begin("start")

接下来,您可以通过指定模式序列之间所需的连续条件,向模式序列追加更多的模式。FlinkCEP支持以下事件间的连续性形式:

  1. Strict Contiguity:期望所有匹配的事件严格地一个接一个出现,中间不包含任何不匹配的事件。
  2. Relaxed Contiguity:忽略在匹配事件之间出现的非匹配事件。
  3. Non-Deterministic Relaxed Contiguity:进一步放宽了连续性,允许忽略一些匹配事件的附加匹配。

要在连续模式之间应用它们,您可以使用:

  1. next(),for strict
  2. followedBy(),for relaxed
  3. followedByAny(),for non-deterministic relaxed contiguity
  4. 或者
  5. notNext(),如果不希望事件类型直接跟随另一个事件类型
  6. notFollowedBy(),如果不希望事件类型位于其他两个事件类型之间。

模式序列不能以notFollowedBy()结束。

NOT模式之前不能有一个可选的模式。

// strict contiguity
val strict: Pattern[Event, _] = start.next("middle").where(...)

// relaxed contiguity
val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)

// non-deterministic relaxed contiguity
val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)

// NOT pattern with strict contiguity
val strictNot: Pattern[Event, _] = start.notNext("not").where(...)

// NOT pattern with relaxed contiguity
val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...)

放宽的邻近性意味着只匹配第一个后续匹配事件,而对于非确定性放宽的邻近性,同一个开始将发出多个匹配。例如,一个模式“a b”,给定事件序列“a”、“c”、“b1”、“b2”,将得到以下结果:

  1. “a”和“b”之间的严格连续性:{}(没有匹配),“a”后面的“c”导致“a”被丢弃。
  2. “a”和“b”之间的松弛连续性:{a b1},因为松弛连续性被视为“跳过非匹配事件,直到下一个匹配事件”。
  3. “a”和“b”之间的非确定性松弛邻近:{a b1}, {a b2},因为这是最普遍的形式。

还可以为模式定义有效的时间约束。例如,您可以通过pattern.within()方法定义一个模式应该在10秒内发生。processing 和 event time 都支持时态模式。

注意:一个模式序列只能有一个时间约束。如果在不同的模式上定义了多个这样的约束,则应用最小的约束。

next.within(Time.seconds(10))

循环模式中的连续性

您可以在循环模式中应用与前一节中讨论的相同的连续性条件。在这种模式中接受的元素之间将应用连续性。为举例说明上述情况,输入“a”、“b1”、“d1”、“b2”、“d2”、“b3”、“c”的模式序列“ab + c”(“a”后面跟着一个或多个“b”的任意(非确定性松弛)序列,其结果如下:

  1. 严格的连续性:{a b3 c}——b1之后的“d1”导致“b1”被丢弃,同样的情况也发生在“b2”上,因为“d2”的存在。
  2. 松弛邻近:{a b1 c}、{a b1 b2 c}、{a b1 b2 b3 c}、{a b2 c}、{a b2 b3 c}、{a b3 c} -“d”被忽略。
  3. 非确定性松弛邻近:{a b1 c}, {a b1 b2 c}, {a b1 b3 c}, {a b1 b2 b3 c}, {a b2 c}, {a b3 c} -注意{a b1 b3 c},这是“b”之间的松弛邻近的结果。

对于循环模式(例如oneOrMore()和times()),默认是松散的连续性。如果你想要严格的连续性,你必须使用consecutive()调用来明确地指定它,如果你想要非确定性的松散的连续性,你可以使用allowCombinations()调用。

Pattern OperationDescription
consecutive()

Works in conjunction with oneOrMore() and times() and imposes strict contiguity between the matching events, i.e. any non-matching element breaks the match (as in next()).

If not applied a relaxed contiguity (as in followedBy()) is used.

E.g. a pattern like:

Pattern.begin("start").where(_.getName().equals("c"))
  .followedBy("middle").where(_.getName().equals("a"))
                       .oneOrMore().consecutive()
  .followedBy("end1").where(_.getName().equals("b"))

Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B

with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}

without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

allowCombinations()

Works in conjunction with oneOrMore() and times() and imposes non-deterministic relaxed contiguity between the matching events (as in followedByAny()).

If not applied a relaxed contiguity (as in followedBy()) is used.

E.g. a pattern like:

Pattern.begin("start").where(_.getName().equals("c"))
  .followedBy("middle").where(_.getName().equals("a"))
                       .oneOrMore().allowCombinations()
  .followedBy("end1").where(_.getName().equals("b"))

Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B

with combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B}

without combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

模式组

也可以将模式序列定义为begin、followedBy、followedByAny和next的条件。模式序列将被视为逻辑上的匹配条件,并返回GroupPattern,可以将oneOrMore()、times(#ofTimes)、times(#fromTimes、#toTimes)、optional()、continuous()、allow()应用到GroupPattern。

val start: Pattern[Event, _] = Pattern.begin(
    Pattern.begin[Event]("start").where(...).followedBy("start_middle").where(...)
)

// strict contiguity
val strict: Pattern[Event, _] = start.next(
    Pattern.begin[Event]("next_start").where(...).followedBy("next_middle").where(...)
).times(3)

// relaxed contiguity
val relaxed: Pattern[Event, _] = start.followedBy(
    Pattern.begin[Event]("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore()

// non-deterministic relaxed contiguity
val nonDetermin: Pattern[Event, _] = start.followedByAny(
    Pattern.begin[Event]("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional()
Pattern OperationDescription
begin(#name)

Defines a starting pattern:

val start = Pattern.begin[Event]("start")
begin(#pattern_sequence)

Defines a starting pattern:

val start = Pattern.begin(
    Pattern.begin[Event]("start").where(...).followedBy("middle").where(...)
)
next(#name)

Appends a new pattern. A matching event has to directly succeed the previous matching event (strict contiguity):

val next = start.next("middle")
next(#pattern_sequence)

Appends a new pattern. A sequence of matching events have to directly succeed the previous matching event (strict contiguity):

val next = start.next(
    Pattern.begin[Event]("start").where(...).followedBy("middle").where(...)
)
followedBy(#name)

Appends a new pattern. Other events can occur between a matching event and the previous matching event (relaxed contiguity) :

val followedBy = start.followedBy("middle")
followedBy(#pattern_sequence)

Appends a new pattern. Other events can occur between a sequence of matching events and the previous matching event (relaxed contiguity) :

val followedBy = start.followedBy(
    Pattern.begin[Event]("start").where(...).followedBy("middle").where(...)
)
followedByAny(#name)

Appends a new pattern. Other events can occur between a matching event and the previous matching event, and alternative matches will be presented for every alternative matching event (non-deterministic relaxed contiguity):

val followedByAny = start.followedByAny("middle")
followedByAny(#pattern_sequence)

Appends a new pattern. Other events can occur between a sequence of matching events and the previous matching event, and alternative matches will be presented for every alternative sequence of matching events (non-deterministic relaxed contiguity):

val followedByAny = start.followedByAny(
    Pattern.begin[Event]("start").where(...).followedBy("middle").where(...)
)
notNext()

Appends a new negative pattern. A matching (negative) event has to directly succeed the previous matching event (strict contiguity) for the partial match to be discarded:

val notNext = start.notNext("not")
notFollowedBy()

Appends a new negative pattern. A partial matching event sequence will be discarded even if other events occur between the matching (negative) event and the previous matching event (relaxed contiguity):

val notFollowedBy = start.notFollowedBy("not")
within(time)

Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event sequence exceeds this time, it is discarded:

pattern.within(Time.seconds(10))

匹配后跳过策略 

对于给定的模式,可以将相同的事件分配给多个成功匹配的事件。要控制分配给事件的匹配数量,需要指定称为AfterMatchSkipStrategy的跳过策略。跳跃策略有五种类型,具体如下:

  • NO_SKIP: Every possible match will be emitted.
  • SKIP_TO_NEXT: Discards every partial match that started with the same event, emitted match was started.
  • SKIP_PAST_LAST_EVENT: Discards every partial match that started after the match started but before it ended.
  • SKIP_TO_FIRST: Discards every partial match that started after the match started but before the first event of PatternName occurred.
  • SKIP_TO_LAST: Discards every partial match that started after the match started but before the last event of PatternName occurred.

注意:在使用SKIP_TO_FIRST和SKIP_TO_LAST跳过策略时,还应该指定一个有效的模式名。

例如,对于给定的模式b+ c和数据流b1 b2 b3 c,这四种跳跃策略的区别如下:

Skip StrategyResultDescription
NO_SKIPb1 b2 b3 c
b2 b3 c
b3 c
After found matching b1 b2 b3 c, the match process will not discard any result.
SKIP_TO_NEXTb1 b2 b3 c
b2 b3 c
b3 c
After found matching b1 b2 b3 c, the match process will not discard any result, because no other match could start at b1.
SKIP_PAST_LAST_EVENTb1 b2 b3 cAfter found matching b1 b2 b3 c, the match process will discard all started partial matches.
SKIP_TO_FIRST[b]b1 b2 b3 c
b2 b3 c
b3 c
After found matching b1 b2 b3 c, the match process will try to discard all partial matches started before b1, but there are no such matches. Therefore nothing will be discarded.
SKIP_TO_LAST[b]b1 b2 b3 c
b3 c
After found matching b1 b2 b3 c, the match process will try to discard all partial matches started before b3. There is one such match b2 b3 c

还可以看看另一个示例,以更好地了解NO_SKIP和SKIP_TO_FIRST: Pattern (a | b | c) (b | c) c+之间的区别。贪心d和序列:a b c1 c2 c3 d,则结果将为:

Skip StrategyResultDescription
NO_SKIPa b c1 c2 c3 d
b c1 c2 c3 d
c1 c2 c3 d
After found matching a b c1 c2 c3 d, the match process will not discard any result.
SKIP_TO_FIRST[c*]a b c1 c2 c3 d
c1 c2 c3 d
After found matching a b c1 c2 c3 d, the match process will discard all partial matches started before c1. There is one such match b c1 c2 c3 d.

为了更好地理解NO_SKIP和SKIP_TO_NEXT之间的区别,请看下面的例子:

Skip StrategyResultDescription
NO_SKIPa b1
a b1 b2
a b1 b2 b3
After found matching a b1, the match process will not discard any result.
SKIP_TO_NEXTa b1After found matching a b1, the match process will discard all partial matches started at a. This means neither a b1 b2 nor a b1 b2 b3 could be generated.

要指定使用哪个跳过策略,只需通过调用以下命令创建一个 AfterMatchSkipStrategy:

FunctionDescription
AfterMatchSkipStrategy.noSkip()Create a NO_SKIP skip strategy
AfterMatchSkipStrategy.skipToNext()Create a SKIP_TO_NEXT skip strategy
AfterMatchSkipStrategy.skipPastLastEvent()Create a SKIP_PAST_LAST_EVENT skip strategy
AfterMatchSkipStrategy.skipToFirst(patternName)Create a SKIP_TO_FIRST skip strategy with the referenced pattern name patternName
AfterMatchSkipStrategy.skipToLast(patternName)Create a SKIP_TO_LAST skip strategy with the referenced pattern name patternName

然后通过调用以下方法来应用该策略:

val skipStrategy = ...
Pattern.begin("patternName", skipStrategy)

注意:对于SKIP_TO_FIRST/LAST,有两个选项用于处理没有元素映射到指定变量的情况。在这种情况下,默认情况下将使用NO_SKIP策略。另一个选项是在这种情况下抛出异常。可以通过以下方式启用此选项:

AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()

检测模式

指定了要查找的模式序列之后,就可以将其应用到输入流中,以检测潜在的匹配。要根据模式序列运行事件流,必须创建一个 PatternStream。给定一个输入流输入,一个模式和一个可选的 comparator 用于排序事件具有相同的时间戳,以防 EventTime 在同一时刻到达,您可以通过调用创建模式流:

val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...
var comparator : EventComparator[Event] = ... // optional

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)

输入流可以是键控的,也可以是非键控的,这取决于您的用例。

注意:将您的模式应用于非键控流将导致并行度等于1的作业。

从模式选择

一旦您获得了一个 PatternStream,您就可以对检测到的事件序列应用转换。建议的方法是使用PatternProcessFunction。

PatternProcessFunction 有一个 processMatch 方法,它为每个匹配的事件序列调用。它以 Map<String, List< in >>的形式接收匹配,其中键是模式序列中每个模式的名称,值是该模式的所有可接受事件的列表(in是输入元素的类型)。给定模式的事件按时间戳排序。为每个模式返回可接受事件列表的原因是,在使用循环模式(例如 oneToMany()和 times())时,可能会为给定模式接受多个事件。

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {
    @Override
    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
        IN startEvent = match.get("start").get(0);
        IN endEvent = match.get("end").get(0);
        out.collect(OUT(startEvent, endEvent));
    }
}

PatternProcessFunction 提供对上下文对象的访问。由于它,人们可以访问与时间相关的特征,如 currentProcessingTime 或当前匹配的时间戳(它是分配给匹配的最后一个元素的时间戳)。

处理超时的部分模式

只要模式通过 within 关键字附加了窗口长度,就有可能丢弃部分事件序列,因为它们超出了窗口长度。要处理超时的部分匹配,可以使用 TimedOutPartialMatchHandler 接口。这个接口应该以mixin风格使用。这意味着您可以使用 PatternProcessFunction 来实现这个接口。TimedOutPartialMatchHandler 提供了额外的 processTimedOutMatch方法,该方法将在每次超时部分匹配时调用。

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
    @Override
    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
        ...
    }

    @Override
    public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
        IN startEvent = match.get("start").get(0);
        ctx.output(outputTag, T(startEvent));
    }
}

 注意:processTimedOutMatch不允许访问主输出。尽管如此,您仍然可以通过副输出(通过 Context 对象)发出结果。

简单的API

前面提到的 PatternProcessFunction 是在Flink 1.8中引入的,从那时起它就是与匹配项交互的推荐方式。人们仍然可以使用老式的 API,比如 select/flatSelect,它将在内部被转换成 PatternProcessFunction。

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val outputTag = OutputTag[String]("side-output")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(outputTag){
    (pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) =>
        out.collect(TimeoutEvent())
} {
    (pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) =>
        out.collect(ComplexEvent())
}

val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(outputTag)

 CEP library 中的时间

Event Time处理迟到的数据

在CEP中,处理元素的顺序很重要。在使用 Event Time 时保证元素以正确的顺序处理,传入的元素是最初把缓冲区中元素是根据他们的时间戳按升序排序,当一个水印来临,在这个缓冲时间戳的所有元素小于水印处理。这意味着 watermarks 之间的元素是按 event-time 顺序处理的。

注意:当在 event time 工作时,库假设 watermark 是正确的。

为了保证跨 watermarks 的元素按 event-time 顺序处理,Flink 的 CEP 库假设 watermarks 是正确的,并将其视为时间戳小于最后看到的 watermark 的最新元素。 后来的元素没有进一步处理。另外,您可以指定一个sideOutput标记来收集最后一次看到的水印之后的最新元素,您可以像这样使用它。

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val lateDataOutputTag = OutputTag[String]("late-data")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream
      .sideOutputLateData(lateDataOutputTag)
      .select{
          pattern: Map[String, Iterable[ComplexEvent]] => ComplexEvent()
      }

val lateData: DataStream[String] = result.getSideOutput(lateDataOutputTag)

时间Context

在 PatternProcessFunction 和 IterativeCondition 中,用户可以访问实现 TimeContext 的上下文,如下所示:

/**
 * Enables access to time related characteristics such as current processing time or timestamp of
 * currently processed element. Used in {@link PatternProcessFunction} and
 * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
 */
@PublicEvolving
public interface TimeContext {

	/**
	 * Timestamp of the element currently being processed.
	 *
	 * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this
	 * will be set to the time when event entered the cep operator.
	 */
	long timestamp();

	/** Returns the current processing time. */
	long currentProcessingTime();
}

此上下文允许用户访问已处理事件的时间特征。调用 TimeContext#currentProcessingTime 总是返回当前处理时间的值,这个调用应该是首选的,例如调用 System.currentTimeMillis()。

对于 TimeContext#timestamp(),返回的值等于对于 EventTime 分配的时间戳。在 ProcessingTime 中,这将等于该事件输入 cep 操作符时的时间点(或者在 PatternProcessFunction 中生成匹配时的时间点)。这意味着该值在对该方法的多个调用中是一致的。

Examples

下面的示例检测键控事件数据流的模式开始、中间(name = "error") ->结束(name = "critical")。事件由它们的id进行键控,有效的模式必须在10秒内出现。整个处理是用 event-time 完成的。

val env : StreamExecutionEnvironment = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val input : DataStream[Event] = ...

val partitionedInput = input.keyBy(event => event.getId)

val pattern = Pattern.begin[Event]("start")
  .next("middle").where(_.getName == "error")
  .followedBy("end").where(_.getName == "critical")
  .within(Time.seconds(10))

val patternStream = CEP.pattern(partitionedInput, pattern)

val alerts = patternStream.select(createAlert(_))

从较老的Flink版本迁移(1.3之前)

迁移到1.4 +

在Flink-1.4中,CEP库与<= Flink 1.2的向后兼容性被删除。不幸的是,无法恢复曾经在1.2.x中运行的CEP作业

迁移到1.3 +

Flink-1.3中的CEP库附带了许多新特性,这些特性导致了API中的一些变化。在这里,我们将描述您需要对旧的CEP作业进行的更改,以便能够使用Flink-1.3运行它们。在进行这些更改并重新编译作业之后,您将能够从旧版本作业的保存点重新执行作业,即不必重新处理以前的数据。

所需要的改变是:

  1. 更改条件(where(…)子句中的条件)以扩展 SimpleCondition 类,而不是实现 FilterFunction 接口。
  2. 将作为参数提供的函数更改为 select(…)和 flatSelect(…)方法,以期望得到与每个模式相关的事件列表(Java中的List,Scala中的Iterable)。这是因为通过添加循环模式,多个输入事件可以匹配单个(循环)模式。
  3. Flink 1.1和1.2中的followedBy()表示不确定的松弛邻近(参见这里)。在Flink 1.3中,这已经发生了变化,followedBy()意味着放松的接近,而如果需要不确定的放松的接近,应该使用followedByAny()。
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐