当前位置: 首页 > news >正文

大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink CEP 开发的流程
  • CEP 开发依赖
  • CEP 案例:恶意登录检测实现

在这里插入图片描述

Fline CEP

之前已经介绍过,但是防止大家没看到,这里再简单介绍以下。

基本概念

Flink CEP(Complex Event Processing)是Apache Flink提供的一个扩展库,用于实时复杂事件处理。通过Flink CEP,开发者可以从流数据中识别出特定的事件模式。这在欺诈检测、网络安全、实时监控、物联网等场景中非常有用。

Flink CEP的核心是通过定义事件模式,从流中检测复杂事件序列。
具体来说,CEP允许用户:

  • 定义事件模式:用户可以描述感兴趣的事件组合(如连续事件、延迟事件等)。
  • 匹配模式:Flink CEP从流中搜索与定义模式相匹配的事件序列。
  • 处理匹配结果:一旦找到符合模式的事件序列,用户可以定义如何处理这些匹配。

基本组成部分

  • Pattern(模式):描述要在事件流中匹配的事件序列。可以是单个事件或多个事件的组合。常用的模式操作包括next(紧邻)、followedBy(接续)等。
  • PatternStream(模式流):通过应用模式定义,将事件流转变为模式流。
  • Select函数:用于从模式流中提取匹配的事件序列

CEP开发步骤

开发Flink CEP应用的基本步骤包括:

定义事件流:创建一个DataStream,表示原始的事件流。
定义事件模式:使用Flink CEP的API定义事件模式,例如连续事件、迟到事件等。
将模式应用到流中:将定义好的模式应用到事件流上,生成模式流PatternStream。
提取匹配事件:使用select函数提取匹配模式的事件,并定义如何处理这些事件。

使用场景

  • 欺诈检测:可以通过CEP识别连续发生的异常行为,如频繁的登录尝试等。
  • 网络监控:检测一段时间内的特定网络攻击模式。
  • 物联网:分析传感器数据,检测设备异常、温度异常等。
  • 用户行为分析:分析用户在某一时间段内的行为序列,从而作出预测或检测异常。

案例2:检测交易活跃用户

业务需求

业务上需要找出24小时内,至少5次有效交易的用户。
数据源如下:

new CepActiveUserBean("100XX", 0.0D, 1597905234000L),
new CepActiveUserBean("100XX", 100.0D, 1597905235000L),
new CepActiveUserBean("100XX", 200.0D, 1597905236000L),
new CepActiveUserBean("100XX", 300.0D, 1597905237000L),
new CepActiveUserBean("100XX", 400.0D, 1597905238000L),
new CepActiveUserBean("100XX", 500.0D, 1597905239000L),
new CepActiveUserBean("101XX", 0.0D, 1597905240000L),
new CepActiveUserBean("101XX", 100.0D, 1597905241000L)
  • 获取数据源
  • Watermark转化
  • keyBy转化
  • 至少5次:timeOrMore(5)
  • 24小时之内:within(Time.hours(24))
  • 模式匹配
  • 提取匹配成功的数据

编写代码

package icu.wzk;import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.util.List;
import java.util.Map;public class FlinkCepActiveUser {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);DataStreamSource<CepActiveUserBean> data = env.fromElements(new CepActiveUserBean("100XX", 0.0D, 1597905234000L),new CepActiveUserBean("100XX", 100.0D, 1597905235000L),new CepActiveUserBean("100XX", 200.0D, 1597905236000L),new CepActiveUserBean("100XX", 300.0D, 1597905237000L),new CepActiveUserBean("100XX", 400.0D, 1597905238000L),new CepActiveUserBean("100XX", 500.0D, 1597905239000L),new CepActiveUserBean("101XX", 0.0D, 1597905240000L),new CepActiveUserBean("101XX", 100.0D, 1597905241000L));SingleOutputStreamOperator<CepActiveUserBean> watermark = data.assignTimestampsAndWatermarks(new WatermarkStrategy<CepActiveUserBean>() {@Overridepublic WatermarkGenerator<CepActiveUserBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<CepActiveUserBean>() {long maxTimestamp = Long.MAX_VALUE;long maxOutOfOrderness = 500L;@Overridepublic void onEvent(CepActiveUserBean event, long eventTimestamp, WatermarkOutput output) {maxTimestamp = Math.max(event.getTimestamp(), maxTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));}};}}.withTimestampAssigner((element, recordTimes) -> element.getTimestamp()));KeyedStream<CepActiveUserBean, String> keyed = watermark.keyBy(new KeySelector<CepActiveUserBean, String>() {@Overridepublic String getKey(CepActiveUserBean value) throws Exception {return value.getUsername();}});Pattern<CepActiveUserBean, CepActiveUserBean> pattern = Pattern.<CepActiveUserBean>begin("start").where(new SimpleCondition<CepActiveUserBean>() {@Overridepublic boolean filter(CepActiveUserBean value) throws Exception {return value.getPrice() > 0;}}).timesOrMore(5).within(Time.hours(24));PatternStream<CepActiveUserBean> parentStream = CEP.pattern(keyed, pattern);SingleOutputStreamOperator<CepActiveUserBean> process = parentStream.process(new PatternProcessFunction<CepActiveUserBean, CepActiveUserBean>() {@Overridepublic void processMatch(Map<String, List<CepActiveUserBean>> map, Context context, Collector<CepActiveUserBean> collector) throws Exception {System.out.println("map: " + map);}});process.print();env.execute("FlinkCepActiveUser");}}class CepActiveUserBean {private String username;private Double price;private Long timestamp;public CepActiveUserBean(String username, Double price, Long timestamp) {this.username = username;this.price = price;this.timestamp = timestamp;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public Double getPrice() {return price;}public void setPrice(Double price) {this.price = price;}public Long getTimestamp() {return timestamp;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}@Overridepublic String toString() {return "CepActiveUserBean{" +"username='" + username + '\'' +", price=" + price +", timestamp=" + timestamp +'}';}
}

运行结果

map: {start=[CepActiveUserBean{username='100XX', price=100.0, timestamp=1597905235000}, CepActiveUserBean{username='100XX', price=200.0, timestamp=1597905236000}, CepActiveUserBean{username='100XX', price=300.0, timestamp=1597905237000}, CepActiveUserBean{username='100XX', price=400.0, timestamp=1597905238000}, CepActiveUserBean{username='100XX', price=500.0, timestamp=1597905239000}]}Process finished with exit code 0

运行结果如下图所示:
在这里插入图片描述

案例3:超时未支付

业务需求

找出下单后10分钟没有支付的订单,数据源如下:

new TimeOutPayBean(1L, "create", 1597905234000L),
new TimeOutPayBean(1L, "pay", 1597905235000L),
new TimeOutPayBean(2L, "create", 1597905236000L),
new TimeOutPayBean(2L, "pay", 1597905237000L),
new TimeOutPayBean(3L, "create", 1597905239000L)
  • 获取数据源
  • 转 Watermark
  • keyBy 转化
  • 做出 Pattern (下单以后10分钟未支付)
  • 模式匹配
  • 取出匹配成功的数据

编写代码

package icu.wzk;import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;import java.util.List;
import java.util.Map;public class FlinkCepTimeOutPay {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);DataStreamSource<TimeOutPayBean> data = env.fromElements(new TimeOutPayBean(1L, "create", 1597905234000L),new TimeOutPayBean(1L, "pay", 1597905235000L),new TimeOutPayBean(2L, "create", 1597905236000L),new TimeOutPayBean(2L, "pay", 1597905237000L),new TimeOutPayBean(3L, "create", 1597905239000L));DataStream<TimeOutPayBean> watermark = data.assignTimestampsAndWatermarks(new WatermarkStrategy<TimeOutPayBean>() {@Overridepublic WatermarkGenerator<TimeOutPayBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<TimeOutPayBean>() {long maxTimestamp = Long.MAX_VALUE;long maxOutOfOrderness = 500L;@Overridepublic void onEvent(TimeOutPayBean event, long eventTimestamp, WatermarkOutput output) {maxTimestamp = Math.max(maxTimestamp, event.getTimestamp());}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));}};}}.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp()));KeyedStream<TimeOutPayBean, Long> keyedStream = watermark.keyBy(new KeySelector<TimeOutPayBean, Long>() {@Overridepublic Long getKey(TimeOutPayBean value) throws Exception {return value.getUserId();}});// 逻辑处理代码OutputTag<TimeOutPayBean> orderTimeoutOutput = new OutputTag<>("orderTimeout") {};Pattern<TimeOutPayBean, TimeOutPayBean> pattern = Pattern.<TimeOutPayBean>begin("begin").where(new IterativeCondition<TimeOutPayBean>() {@Overridepublic boolean filter(TimeOutPayBean timeOutPayBean, Context<TimeOutPayBean> context) throws Exception {return timeOutPayBean.getOperation().equals("create");}}).followedBy("pay").where(new IterativeCondition<TimeOutPayBean>() {@Overridepublic boolean filter(TimeOutPayBean timeOutPayBean, Context<TimeOutPayBean> context) throws Exception {return timeOutPayBean.getOperation().equals("pay");}}).within(Time.seconds(600));PatternStream<TimeOutPayBean> patternStream = CEP.pattern(keyedStream, pattern);SingleOutputStreamOperator<TimeOutPayBean> result = patternStream.select(orderTimeoutOutput, new PatternTimeoutFunction<TimeOutPayBean, TimeOutPayBean>() {@Overridepublic TimeOutPayBean timeout(Map<String, List<TimeOutPayBean>> map, long l) throws Exception {return map.get("begin").get(0);}}, new PatternSelectFunction<TimeOutPayBean, TimeOutPayBean>() {@Overridepublic TimeOutPayBean select(Map<String, List<TimeOutPayBean>> map) throws Exception {return map.get("pay").get(0);}});// 输出结果// result.print();System.out.println("==============");DataStream<TimeOutPayBean> sideOutput = result.getSideOutput(orderTimeoutOutput);sideOutput.print();// 执行env.execute("FlinkCepTimeOutPay");}}class TimeOutPayBean {private Long userId;private String operation;private Long timestamp;public TimeOutPayBean(Long userId, String operation, Long timestamp) {this.userId = userId;this.operation = operation;this.timestamp = timestamp;}public Long getUserId() {return userId;}public void setUserId(Long userId) {this.userId = userId;}public String getOperation() {return operation;}public void setOperation(String operation) {this.operation = operation;}public Long getTimestamp() {return timestamp;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}@Overridepublic String toString() {return "TimeOutPayBean{" +"userId=" + userId +", operation='" + operation + '\'' +", timestamp=" + timestamp +'}';}
}

运行结果

控制台输出为:

==============
TimeOutPayBean{userId=1, operation='pay', timestamp=1597905235000}
TimeOutPayBean{userId=3, operation='create', timestamp=1597905239000}
TimeOutPayBean{userId=2, operation='pay', timestamp=1597905237000}Process finished with exit code 0

对应截图如下:
在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Java 设计模式-状态模式
  • prometheus基于consul的服务发现
  • 了解MySQL 高可用架构:主从备份
  • 【H2O2|全栈】关于CSS(3)CSS基础(三)
  • 大屏自适应缩放解决方案
  • 跨平台集成:在 AI、微服务和 Azure 云之间实现无缝工作流
  • 如何在YoloV8中添加注意力机制(两种方式)
  • PyTest装饰器
  • 腾讯云、阿里云、华为云优惠券领取、查看、使用教程分享
  • C++ 中的 override 和 overload的区别
  • 旋转电连接器航空插头插座的特点
  • 《深度学习》OpenCV轮廓检测 模版匹配 解析及实现
  • QT信号槽原理是什么,如何去使用它?
  • [前端][JS]html中js不同位置的区别
  • 87-java 可轮询锁和定时锁
  • CSS 提示工具(Tooltip)
  • el-input获取焦点 input输入框为空时高亮 el-input值非法时
  • HTML中设置input等文本框为不可操作
  • JavaScript设计模式之工厂模式
  • JAVA之继承和多态
  • Python打包系统简单入门
  • React-Native - 收藏集 - 掘金
  • 百度贴吧爬虫node+vue baidu_tieba_crawler
  • 回顾2016
  • 计算机常识 - 收藏集 - 掘金
  • 理解在java “”i=i++;”所发生的事情
  • 双管齐下,VMware的容器新战略
  • 跳前端坑前,先看看这个!!
  • Salesforce和SAP Netweaver里数据库表的元数据设计
  • ​云纳万物 · 数皆有言|2021 七牛云战略发布会启幕,邀您赴约
  • (13)Latex:基于ΤΕΧ的自动排版系统——写论文必备
  • (23)Linux的软硬连接
  • (6)设计一个TimeMap
  • (Repost) Getting Genode with TrustZone on the i.MX
  • (ros//EnvironmentVariables)ros环境变量
  • (webRTC、RecordRTC):navigator.mediaDevices undefined
  • (安卓)跳转应用市场APP详情页的方式
  • (二)基于wpr_simulation 的Ros机器人运动控制,gazebo仿真
  • (分布式缓存)Redis哨兵
  • (附源码)基于SSM多源异构数据关联技术构建智能校园-计算机毕设 64366
  • (剑指Offer)面试题41:和为s的连续正数序列
  • (七)理解angular中的module和injector,即依赖注入
  • (三)Kafka离线安装 - ZooKeeper开机自启
  • (转)mysql使用Navicat 导出和导入数据库
  • **Java有哪些悲观锁的实现_乐观锁、悲观锁、Redis分布式锁和Zookeeper分布式锁的实现以及流程原理...
  • .equal()和==的区别 怎样判断字符串为空问题: Illegal invoke-super to void nio.file.AccessDeniedException
  • .net 7和core版 SignalR
  • .Net Winform开发笔记(一)
  • .net安装_还在用第三方安装.NET?Win10自带.NET3.5安装
  • .Net开发笔记(二十)创建一个需要授权的第三方组件
  • .NET使用存储过程实现对数据库的增删改查
  • .NET中分布式服务
  • ??javascript里的变量问题
  • @GetMapping和@RequestMapping的区别
  • @ResponseBody