快速入门
1. 创建工程
引入POM.xml依赖:
xml
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink-version>1.17.2</flink-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep</artifactId>
<version>${flink-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink-version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
2. 需求实操
检测用户行为,如果连续三次登录失败,就输出报警信息。
3. 编码实操
定义一个登录事件POJO类:
java
public class LoginEvent {
// 用户id
public String userId;
// 用户ip地址
public String ipAddress;
// 用户登录成功与否
public Boolean eventType;
// 登录时间戳
public Long timestamp;
public LoginEvent() {}
public LoginEvent(String userId, String ipAddress, Boolean eventType, Long timestamp){
this.userId = userId;
this.ipAddress = ipAddress;
this.eventType = eventType;
this.timestamp = timestamp;
}
// 省略toString 有参构造
}
编写事件处理:
java
public class LoginDetectExample {
public static void main(String[] args) throws Exception {
// 创建一个表执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setAutoWatermarkInterval(500); // 100毫秒生成一次水位线
// 1.用户登录事件
// 时间戳的提取器
SingleOutputStreamOperator<LoginEvent> streamOperator = env.fromElements(
new LoginEvent("1", "192.168.10.1", false, 1000L),
new LoginEvent("2", "192.168.10.6", true, 2000L),
new LoginEvent("1", "192.168.10.1", false, 5000L),
new LoginEvent("2", "192.168.10.6", false, 5000L),
new LoginEvent("1", "192.168.10.1", false, 4000L)
).assignTimestampsAndWatermarks(WatermarkStrategy
.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 延迟2秒保证数据正确
.withTimestampAssigner((event, l) -> event.timestamp)
);
// 2.定义模式
// 2.1 模式的第一个事件是用户登陆失败
Pattern<LoginEvent, LoginEvent> loginEventPattern = Pattern.<LoginEvent>begin("first-false")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) {
return !loginEvent.eventType; // 类型为false的则代表登陆失败
}
}) // next衔接模式的第二个事件
.next("second-false")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) {
return !loginEvent.eventType;
}
}) // 以后的每个事件都用next衔接即可
.next("third-false")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) {
return !loginEvent.eventType;
}
});
// 3.将模式应用到数据流,检测复杂事件
PatternStream<LoginEvent> patternStream =
CEP.pattern(streamOperator.keyBy(event -> event.userId), loginEventPattern);
// 4.提取复杂事件,进行处理 select类似于map只不过我们处理的是一组事件
SingleOutputStreamOperator<String> warningOut =
// 这里是一个map map的key就是我们定义的事件名称,value对应的事件列表,
// 我们这里列表里只有一个事件,为什么是列表,因为我们定义的一个事件,它可能会重复发生
patternStream.select(map -> {
// 提取三次事件
LoginEvent firstFailEvent = map.get("first-false").get(0);
LoginEvent secondFailEvent = map.get("second-false").get(0);
LoginEvent thirdFailEvent = map.get("third-false").get(0);
return "用户 " + firstFailEvent.userId + "在" +
Duration.ofMillis(firstFailEvent.timestamp).getSeconds() + "秒、" +
Duration.ofMillis(secondFailEvent.timestamp).getSeconds() + "秒、" +
Duration.ofMillis(thirdFailEvent.timestamp).getSeconds() +
"秒 已连续三次登陆失败!";
});
// 5.打印输出
warningOut.print();
env.execute();
}
}