27

基于 Flink 实时计算商品订单流失量

 3 years ago
source link: https://mp.weixin.qq.com/s/GTq-xV0-RC7-kt2QucGBYw
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

假设有个需求需要实时计算商品的订单流失量,规则如下:

  • 用户点击商品 A,但购买了同类商品 B,则商品 A 记为一次订单流失量;

  • 点击商品 A 到购买同类商品 B 的有效时间窗口应该小于 12 个小时;

  • 有效窗口内多次点击商品 A 视为一次订单流失。

第三条规则可以理解为数据流去重,我在上一节已经介绍过了。为了更加专注于计算商品的订单流失量,本篇文章不再关注数据去重。

看到这个需求,想到可以用上一节的 ProcessFunction 进行状态管理,比如说基于用户进行分流,然后每个用户维护一个状态和一个有效时间窗口,触发购买同类事件后进行数据统计,过了有效期后舍弃。

但是,有没有更优雅的一点方式呢?

答案是有的,我们可以使用 Flink 自带的 CEP 来实现。

下面先简单介绍下 FlinkCEP,然后给出代码实践。

1.FlinkCEP

1.1 什么是 CEP

CEP 全称为 Complex Event Process,是在 Flink 之上实现的复杂事件处理(CEP)库。它允许你在无界的事件流中检测事件模式,让你有机会掌握数据中重要的事项。

例如:“起床-->洗漱-->吃饭-->上班”这一系列串联起来的事件流形成的模式称为 CEP。如果发现某一次起床后没有刷牙洗脸亦或是吃饭就直接上班,就可以把这种非正常的事件流匹配出来进行分析,看看今天是不是起晚了。

再举几个经典例子:

  • 异常检测:打车计费后 12 小时还未结束订单;用户短时间内连续完成多个订单;

  • 实时营销:用户在不同平台进行比价;

  • 数据监控:检测某些指标,比如订单流失量。

1.2 FlinkCEP 原理

FlinkCEP 内部是用 「NFA(非确定有限自动机) 「来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为」 起始状态」 「中间状态」「最终状态」 三种,边分为 「take」「ignore」「proceed」 三种。

zeURNnm.jpg!mobile
  • 「take」:必须存在一个条件判断,当到来的消息满足 take 边条件判断时,把这个消息放入结果集,将状态转移到下一状态。

  • 「ignore」:当消息到来时,可以忽略这个消息,将状态自旋在当前不变,是一个自己到自己的状态转移。

  • 「proceed」:又叫做状态的空转移,当前状态可以不依赖于消息到来而直接转移到下一状态。举个例子,当用户购买商品时,如果购买前有一个咨询客服的行为,需要把咨询客服行为和购买行为两个消息一起放到结果集中向下游输出;如果购买前没有咨询客服的行为,只需把购买行为放到结果集中向下游输出就可以了。也就是说,如果有咨询客服的行为,就存在咨询客服状态的上的消息保存,如果没有咨询客服的行为,就不存在咨询客服状态的上的消息保存,咨询客服状态是由一条 proceed 边和下游的购买状态相连。

当然,在我们的场景中不会涉及太多复杂的概念。

2.FlinkCEP 简单上手

本节内容引用参考 1,用于完成基本的概念讲解和 Demo 实现。

2.1 单个 Pattern

我们先从简单的内容入手。看看在单个Pattern下,Flink CEP是如何匹配的。

2.1.1 各个API的用法

在学习 Flink CEP 的过程中,很容易找到相似的博文,文章中使用表格列举出了各个 API 的作用。然而大家很容易发现,这东西太像正则表达式了(实际上底层匹配逻辑的实现方式应该也和正则表达式类似)。因此,结合正则表达式理解这些 API 显得十分快速,所以我自作主张,加上了功能相近的正则表达式。例如,我们要用 CEP 匹配字母 x:

E3QrIr7.png!mobile

2.1.2 仅使用 where 和 or 写一个程序

比如说,我们现在有一个简单的需求,对于输入的数据流中,匹配所有以 x 或 y 开头的数据:

public class CepDemo {
public static void main(String[] args) throws Exception {
var environment = StreamExecutionEnvironment.getExecutionEnvironment();
var stream = environment.setParallelism(1).addSource(new ReadLineSource("Data.txt"));

// 使用 where 和 or 来定义两个需求;
// 当然也可以放在一个 where 里。
var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {
@Override
public boolean filter(String s, Context<String> context) {
return s.startsWith("x");
}
}).or(new IterativeCondition<>() {
@Override
public boolean filter(String s, Context<String> context) throws Exception {
return s.startsWith("y");
}
});

// CEP.pattern 的第一个参数是数据流,第二个是规则;
// 然后利用 select 方法抽取出匹配到的数据。
// 这里用了 lambda 表达式
CEP.pattern(stream, pattern).select((map ->
Arrays.toString(map.get("start").toArray()))
).addSink(new SinkFunction<>() {
@Override
public void invoke(String value, Context context) {
System.out.println(value);
}
});
environment.execute();
}
}

对于输入的数据流:

x1
z2
c3
y4

我们有输出:

读取:x1   
[x1]
读取:z2
读取:c3
读取:y4
[y4]

可以看到,Flink CEP 可以根据输入的每一条数据进行匹配。单条数据可以是本文中的字符串,也可以是复杂的事件对象,当然也可以是字符。如果每一条数据都是一个字符,那 CEP 就和正则表达式十分相似了。

2.1.3 加上量词

接下来,还是在单个 Pattern 中,我们加上量词 API,研究研究 Flink CEP 是如何匹配多条数据的。从这里开始,事情和正则表达式有了一些差距。差距主要在结果的数量上。由于是流计算,因此在实际处理过程中,Flink 无法知道后续的数据,所以会输出所有匹配的结果。

例如,使用 timesOrMore() 函数,匹配以 a 开头的字符串出现 3 次及以上的情况,首先编写代码(其他代码与上方的例子完全一致,为节约篇幅不再列出,下同):

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {
@Override
public boolean filter(String s, Context<String> context) {
return s.startsWith("a");
}
}).timesOrMore(3);

随后在Data.txt中输入如下字符串序列:

a1
a2
a3
b1
a4

运行程序,输出如下结果:

读取:a1
读取:a2
读取:a3
[a1, a2, a3]
读取:b1
读取:a4
[a1, a2, a3, a4]
[a2, a3, a4]

下面分析一下执行流程。程序开始后,等待数据流入。当a1和a2输入后,由于暂时不满足条件,所以没有产生结果,只是将数据储存在状态中。a3到来后,第一次满足了匹配条件,因此程序输出结果 [a1, a2, a3] 。随后,b1输入,不满足条件;接下来a4输入。此时,a1、a2和a3依旧储存在状态中,因此依然可以参与匹配。匹配可以产生多个结果,但是有两个原则:

  1. 必须严格按照数据流入的顺序;

  2. 产生的结果必须包含当前元素;

原则 1 很好理解,由于数据的流入是按照 a1 -> a2 -> a3 -> a4 的顺序,所以结果生成的序列也必须按照这个顺序,不能删减中间数据,更不能打乱顺序。因此, [a1, a2, a4] 和  [a3, a2, a4, a1] 这种结果是不可能生成的。原则 2 就更好理解了,数据是因为 a4 的流入才产生的,再考虑到我们设定的量词条件是“三个及以上”,因此产生的结果只可能是 [a2, a3, a4] 和 [a1, a2, a3, a4]。

同理,如果我们在 Data.txt 最后加入一行 a5,则程序输出结果如下:

读取:a1
读取:a2
读取:a3
[a1, a2, a3]
读取:b1
读取:a4
[a1, a2, a3, a4]
[a2, a3, a4]
读取:a5
[a1, a2, a3, a4, a5]
[a2, a3, a4, a5]
[a3, a4, a5]

按照这种思路,如果我们继续加上 a6、a7、a8、……、a100,那么每个数据产生的结果会越来越多,因为 Flink CEP 会把所有符合条件的数据储存在状态里。 「这样下去不行的,要不然内存养不起它的」 。因此,oneOrMore() 和 timesOrMore() 之类的函数后面,一般都要跟上 until() 函数,从而指定终止条件。

2.1.4 把量词换成 times()

如果使用和上面一样的数据,但是把量词换成 times(3),会产生什么样的结果?我们首先修改代码:

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {
@Override
public boolean filter(String s, Context<String> context) {
return s.startsWith("a");
}
}).times(3);

由于固定了只匹配三个,再加上前文提到的两个原则的束缚,结果就很明显了:

读取:a1
读取:a2
读取:a3
[a1, a2, a3]
读取:b1
读取:a4
[a2, a3, a4]
读取:a5
[a3, a4, a5]

从 a1 到 b1 的逻辑完全相同,当读取到 a4 时,由于只匹配 3 个,同时结果必须包含 a4,因此产生的结果只能是 [a2, a3, a4] 。同理读取到 a5 后,由于结果必须包含 a5 且只匹配 3 个,所以结果只能是 [a3, a4, a5] 。这种情况下,过期的数据会被清理掉,妈妈再也不用担心我的内存不够用了。

除了固定参数,times() 函数还支持 times(from, to) 指定边界。这种情况下的匹配结果和上文类似,相信大家很容易就能推出来,在此我就不再赘述了。

2.1.5 使用严格模式

大家也许注意到,上文的 Data.txt 中,一直有一个讨厌的 b1。由于不满足我们的基本匹配条件,b1 直接被我们的程序忽略掉了。这是因为 Flink CEP 默认采用了不严格的匹配模式,而在某些情况下,这种数据是不能忽略的,这时候就可以使用 consecutive() 函数,指定严格的匹配模式。修改代码如下:

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {
@Override
public boolean filter(String s, Context<String> context) {
return s.startsWith("a");
}
}).times(3).consecutive();

运行程序,产生如下结果:

读取:a1
读取:a2
读取:a3
[a1, a2, a3]
读取:b1
读取:a4
读取:a5

此时,由于 a1、a2、a3 是紧密相连的,因此被成功匹配。而 a2、a3、a4 和 a3、a4、a5 中间由于多了一个 b1,在严格模式下不能被匹配。可以看出,严格模式下的匹配策略更像正则表达式。

2.2 多个 Pattern

一般而言,需要使用 CEP 的任务都得依靠多个 Pattern 才能解决。此时,可以使用 followedBy()、next() 等函数创建一个新的 Pattern,并按照不同的逻辑将新 Pattern 和前一个 Pattern 连接起来。

2.2.1 使用 followedBy() 创建一个新的 Pattern

我们再来看一下如何处理多个 Pattern,比如说我们需要匹配“包含 2-3 个 a 开头的字符串,同时包含 1-2 个 b 开头的字符串”的输入数据。

// 我们用 times(2,3) 来控制匹配 2-3 次;
// followBy 用于控制两个具有顺序的关系的 Pattern。
var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {
@Override
public boolean filter(String s, Context<String> context) {
return s.startsWith("a");
}
}).times(2, 3).followedBy("middle").where(new IterativeCondition<String>() {
@Override
public boolean filter(String s, Context<String> context) throws Exception {
return s.startsWith("b");
}
}).times(1, 2);

CEP.pattern(stream, pattern).select(map -> {
// 把匹配的结果装进 list 中。
var list = map.get("start");
list.addAll(map.get("middle"));
return Arrays.toString(list.toArray());
}).addSink(new SinkFunction<>() {
@Override
public void invoke(String value, Context context) {
System.out.println(value);
}
});

这里我们使用了 followedBy () 函数,该函数创建了一个名为 “middle” 的新 Pattern,新 Pattern 中包含了指向原 Pattern 的引用。同样发生变化的是 select 函数中的 lambda 表达式。在表达式中,我们除了获取名为 “start” 的 Pattern 中的数据,还获取了名为 “middle” 的 Pattern 的数据,并将他们拼在一起。这与正则表达式中的子表达式特别类似,实际上,我们可以将每个 Pattern 近似看作一个子表达式,在读取结果的时候,使用 Pattern 的名字,从 map 中提取出结果。

数据的输入为:

a1
a2
a3
b1
a4
a5
b2

数据输出为:

读取:a1
读取:a2
读取:a3
读取:b1
[a1, a2, a3, b1]
[a1, a2, b1]
[a2, a3, b1]
读取:a4
读取:a5
读取:b2
[a1, a2, a3, b1, b2]
[a1, a2, b1, b2]
[a2, a3, a4, b2]
[a2, a3, b1, b2]
[a3, a4, a5, b2]
[a3, a4, b2]
[a4, a5, b2]

一下子产生了这么多数据,我一开始还是很懵的。接下来我们逐步分析下:

  1. a1, a2 依次读入,不满足整体条件,但是满足 “start” 条件,且产生了 [a1, a2] 这一中间结果,存在状态中;

  2. a3 读入,不满足整体条件,但是满足 “start” 条件,且产生了 [a2, a3] 和 [a1, a2, a3] 两个结果;

  3. b1 读入,满足 “middle” 条件,产生 [b1] 中间结果。此时整体条件满足,因此和上述中间结果组合输出 [a1, a2, a3, b1] 、 [a1, a2, b1] 和 [a2, a3, b1] ;

  4. a4 读入,继续满足 “start” 条件,产生 [a2, a3, a4] 和 [a3, a4];两个结果,但是由于这两个结果是在 b1 读入之后产生的,因此这两个结果不能和 [b1] 进行组合;

  5. a5 读入,继续满足 “start” 条件,产生 [a3, a4, a5] 和 [a4, a5] 两个中间结果,同理不能和 [b1] 进行组合;

  6. b2 读入,继续满足 “middle” 条件,产生 [b1, b2] 和 [b2] 两个中间结果。这里开始比较复杂了,需要严格结合时间顺序来分析。由于 b1 是在 a4 之前读入的,因此包含 b1 的序列 [b1, b2] 只能与 [a1, a2] 、 [a2, a3] 和 [a1, a2, a3] 进行关联。而 [b2] 则可以与包含了 a4 或 a5 的 [a2, a3, a4] 、 [a3, a4]、 [a3, a4, a5] 和 [a4, a5] 四个序列关联,因此此时输出结果如下:

[a1, a2, a3, b1, b2]    // [a1, a2, a3] 和 [b1, b2] 关联
[a1, a2, b1, b2] // [a1, a2] 和 [b1, b2] 关联
[a2, a3, a4, b2] // [a2, a3, a4] 和 [b2] 关联
[a2, a3, b1, b2] // [a2, a3] 和 [b1, b2] 关联
[a3, a4, a5, b2] // [a3, a4, a5] 和 [b2] 关联
[a3, a4, b2] // [a3, a4] 和 [b2] 关联
[a4, a5, b2] // [a4, a5] 和 [b2] 关联

那么有一个问题,为什么 [b2] 不能与 [a1, a2] 、 [a2, a3] 和 [a1, a2, a3] 进行关联呢?还是要站在时间序列的角度进行解释。因为只有 b1 是跟随在这三个元素后面的,所以只有包含 b1 的两个序列([b1] 和 [b1, b2])可以和它们进行关联,这就是 followedBy 的含义。为了验证这一观点,我们在 Data.txt 最后加上一个 b3,在其他代码均不变的情况下,最后读入 b3 后,输出如下结果:

[a2, a3, a4, b2, b3]
[a3, a4, a5, b2, b3]
[a3, a4, b2, b3]
[a4, a5, b2, b3]

分析如下:当读入 b3 后,满足 “middle” 条件,生成 [b2, b3] 和 [b3]。其中,只有 [b2, b3] 包含了 b2,由于 b2 是距离 [a2, a3, a4] 、 [a3, a4]、 [a3, a4, a5] 和 [a4, a5] 四个序列最近的数据,因此只有 [b2, b3] 才能和上述四个序列关联。而 [b3] 由于不包含 b2,因此无法和它们关联。

2.2.2 将 followedBy() 换成 next()

可以将 next () 看作是加强版的 followedBy ()。在 followedBy 中,两个 Pattern 直接允许不紧密连接,例如上文中的 [a1, a2] 和 [b1] ,他们中间隔了一个 a3. 这种数据在 next () 中会被丢弃掉。使用上文同样的数据(不包括 b3),将代码中的 followedBy 换成 next,修改如下:

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {
@Override
public boolean filter(String s, Context<String> context) {
return s.startsWith("a");
}
}).times(2, 3).next("middle").where(new IterativeCondition<String>() {
@Override
public boolean filter(String s, Context<String> context) throws Exception {
return s.startsWith("b");
}
}).times(1, 2);

运行后,看到如下结果:

读取:a1
读取:a2
读取:a3
读取:b1
[a1, a2, a3, b1]
[a2, a3, b1]
读取:a4
读取:a5
读取:b2
[a1, a2, a3, b1, b2]
[a2, a3, b1, b2]
[a3, a4, a5, b2]
[a4, a5, b2]

和之前的结果进行分析,发现结果中的 [a1, a2, b1] 、 [a1, a2, b1, b2]、 [a2, a3, a4, b2] 和 [a3, a4, b2] 均被排除,因为他们相比原序列,分别缺少了 a3、a3、a5、a5。

2.2.3 greedy() 做了什么

关于 greedy () 的用法,可以说是十分令人迷惑的。我看了许多文章,对 greedy () 的描述几乎都是一笔带过。描述大多是 “尽可能多的匹配”,但是实际上,大多数情况下加不加 greedy () 几乎没有任何区别。 「因为 greedy () 虽然被归为量词 API,但是它实际上是在多个 Pattern 中才能起作用的。」 为此,我找到了 greedy () 的实现逻辑,在 NFACompiler 类的 updateWithGreedyCondition 方法中,代码如下:

private void updateWithGreedyCondition(
State<T> state,
IterativeCondition<T> takeCondition) {
for (StateTransition<T> stateTransition : state.getStateTransitions()) {
stateTransition.setCondition(
new RichAndCondition<>(stateTransition.getCondition(),
new RichNotCondition<>(takeCondition)));
}
}

阅读代码,发现该方法实际上添加了一个逻辑: 「确认当前条件满足转换到下一个 state 所需的条件,且不满足当前 state 的条件」 。意思就是,如果当前处于 Pattern1,但是出现了一条同时满足两个 Pattern1 和 Pattern2 条件的数据,在不加 greedy () 的情况下,会跳转到 Pattern2,但是如果加了 greedy (),则会留在 Pattern1。下面我们来验证一下,编写如下代码:

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {
@Override
public boolean filter(String s, Context<String> context) {
return s.startsWith("a");
}
}).times(2, 3).next("middle").where(new IterativeCondition<String>() {
@Override
public boolean filter(String s, Context<String> context) throws Exception {
return s.length() == 3;
}
}).times(1, 2);

在这一代码中,如果一条数据为a开头,且长度为3,则同时满足“start”和“middle”。同时,为了方便区分数据到底属于哪个Pattern,我们在输出前加入分隔符:

CEP.pattern(stream, pattern).select(map -> {
var list = map.get("start");
list.add("|");
list.addAll(map.get("middle"));
return Arrays.toString(list.toArray());
}).addSink(new SinkFunction<>() {
@Override
public void invoke(String value, Context context) {
System.out.println(value);
}
});

准备如下数据:

a
a1
a22
b33

在不加greedy()的情况下,运行结果如下:

读取:a
读取:a1
读取:a22
[a, a1, |, a22]
读取:b33
[a, a1, a22, |, b33]
[a, a1, |, a22, b33]
[a1, a22, |, b33]

观察结果,可知a22在两个Pattern中左右横跳,输出了所有可能的结果。接下来我们加上greedy():

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {
@Override
public boolean filter(String s, Context<String> context) {
return s.startsWith("a");
}
}).times(2, 3).greedy().next("middle").where(new IterativeCondition<String>() {
@Override
public boolean filter(String s, Context<String> context) throws Exception {
return s.length() == 3;
}
}).times(1, 2);

运行结果如下:

读取:a
读取:a1
读取:a22
读取:b33
[a, a1, a22, |, b33]
[a1, a22, |, b33]

此时,a22 被划到了 “start” 这一 Pattern 中。由此可见,greedy () 影响的是 “同时满足两个 Pattern 条件的数据的划分逻辑”,而且加了 greedy () 后,产生的结果会变少,并不是直观印象中的,产生尽可能多条的数据。

2.代码实践

简单看一下代码,主要以注释方式进行讲解

输入数据为:

952483,310884,4580532,pv,1511712000
952483,5119439,982926,pv,1511712000
952483,4484065,1320293,pv,1511712000
952483,5097906,149192,pv,1511712000
952483,2348702,3002561,pv,1511712000
952483,2157435,1013319,buy,1511712020
952483,1132597,4181361,pv,1511712020
952483,3505100,2465336,pv,1511712020
952483,3815446,2342116,pv,1511712030
952483,3815446,2442116,buy,1511712030

数据源代码为:

package com.aze.producer;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.io.BufferedReader;
import java.io.FileReader;

/**
* @Author: aze
* @Date: 2020-09-16 14:41
*/

public class ReadLineSource implements SourceFunction<String> {

private String filePath;
private boolean canceled = false;

public ReadLineSource(String filePath){
this.filePath = filePath;
}

@Override
public void run(SourceContext<String> sourceContext) throws Exception {
BufferedReader reader = new BufferedReader(new FileReader(filePath));
while (!canceled && reader.ready()){
String line = reader.readLine();
sourceContext.collect(line);
}
}

@Override
public void cancel() {
canceled = true;
}
}

主代码为

package com.aze.consumer;

import lombok.val;
import com.aze.producer.ReadLineSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
* 订单流失率
*
* @Author: aze
* @Date: 2020-09-23 14:45
*/

public class OrderLostCEP {

public static void main(String[] args) throws Exception {

val env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

val dataStream = env.addSource(new ReadLineSource("src/main/resources/data.txt"));

// 先配置一下事件时间
// 然后利用 uid 和商品类别进行分组(商品类别的第一个字母代表一级类别)
val keyStream = dataStream.assignTimestampsAndWatermarks(WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner((SerializableTimestampAssigner<String>)
(s, l) -> Long.parseLong(s.split(",")[4]) * 1000))
.keyBy((KeySelector<String, String>) s ->
s.split(",")[0] + "-" + s.split(",")[2].substring(0, 1));

// 我们采用不丢弃的策略,主要逻辑在于,点击商品 A,而购买同类商品 B 和同类商品 C 算作两次订单流失
val noSkip = AfterMatchSkipStrategy.noSkip();

// 制定一个匹配规则;
// 用 followedByAny 指定不确定的松散连续,读者可以试下其与 followedBy 的区别。
val pattern = Pattern
.<String>begin("start", noSkip).where(new IterativeCondition<String>() {
@Override
public boolean filter(String s, Context<String> ctx) {
return "pv".equals(s.split(",")[3]);
}
}).within(Time.minutes(10))
.followedByAny("end").where(new IterativeCondition<String>() {
@Override
public boolean filter(String s, Context<String> ctx) {
return "buy".equals(s.split(",")[3]);
}
});

// 经过 CEP 规则匹配后,抽取点击的事件流
// 利用商品 id 进行分组,并利用 process 进行状态统计。
val patStream = CEP.pattern(keyStream, pattern)
.select(map -> map.get("start").get(0))
.keyBy((KeySelector<String, String>) s -> s.split(",")[1])
.process(new KeyedProcessFunction<String, String, Object>() {

private ValueState<Long> clickState;

@Override
public void open(Configuration parameters) {
clickState = getRuntimeContext().getState(
new ValueStateDescriptor<>("OrderLost", Long.class));
}

@Override
public void processElement(String in, Context ctx, Collector<Object> out)
throws Exception
{

Long clickValue = clickState.value();
clickValue = clickValue == null ? 1L : ++clickValue;
clickState.update(clickValue);
out.collect("【" + in.split(",")[1] + "】OrderLost:" + clickValue);

}
});

patStream.print();

env.execute("test");

}

}

结果:

【3505100】OrderLost:1
【3815446】OrderLost:1
【4484065】OrderLost:1
【5097906】OrderLost:1

3.总结

本文主要介绍了如何使用 FlinkCEP,并给出诸多 Demo 进行学习。

但完成开头的需求是,我采用的是基于 uid 和商品类别进行分组,然后用 cep 去挖掘配对规则。当然也可以先基于 uid 进行分组,然后用 cep 挖掘配对模式 [点击商品、购买商品],然后利用 select 去过滤是否是同类商品。

最后留一个新需求:如果需要同时计算商品的下单量、CTR 该怎么操作?

4.参考

  1. 《探索如何使用Flink CEP》

  2. 《Apache Flink CEP 实战》


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK