当前位置: 首页 > news >正文

使用[KafkaStreams流计算框架实时计算产生报警(升级报警)

使用KafkaStream(Apache Kafka)实时计算报警,官方文档非常完善。

对Kafka不太了解的,可以看下我的博客Kafka集群部署和调优实践_offsets.topic.replication.factor-CSDN博客

需求背景很简单,每秒钟采集一次设备数据,流计算框架需要对数据做处理,判断采集值超过100就产生报警,如果持续5分钟产生高报,持续10分钟产生高高报。流计算服务只负责产出报警到topic,下游服务负责监听topic后续处理。需要注意,当报警被处置后会向接收数据的主题发送处置信号,处置后需要重置这个设备的时间窗口,它对应的报警从新开始计算。每个设备在报警未被处置前只会升级报警,不会重复报警

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;import java.time.Duration;
import java.util.Properties;public class SensorAlarmApp {public static void main(String[] args) {// 配置 Kafka StreamsProperties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sensor-alarm-app");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Double().getClass());StreamsBuilder builder = new StreamsBuilder();// 从 sensor-readings 主题读取传感器数据KStream<String, Double> readings = builder.stream("sensor-readings");// 从 sensor-reset-topic 主题读取报警处置信号KStream<String, Double> resetStream = builder.stream("sensor-reset-topic");// 合并传感器数据流和重置信号流KStream<String, Double> filteredReadings = readings.merge(resetStream) // 合并数据流.filter((k, v) -> true); // 可以添加更多的过滤逻辑,如果需要的话// 使用 SessionWindows 来处理数据流的窗口SessionWindows sessionWindows = SessionWindows.with(Duration.ofMinutes(10)).grace(Duration.ofMinutes(5));// 将数据流转换为 KTable,并使用 SessionWindows 计算报警次数KTable<SessionWindowed<String>, Long> alarmCounts = filteredReadings.filter((k, v) -> v > 100) // 只处理值大于100的记录.groupBy((k, v) -> k) // 按照传感器ID分组.windowedBy(sessionWindows) // 使用 SessionWindows 窗口.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("alarm-count-store").withValueSerde(Serdes.Long()));// 创建一个状态存储,用于跟踪报警状态final String alarmStateStoreName = "alarm-state-store";final KeyValueStore<String, AlarmStatus> alarmStateStore = builder.store(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(alarmStateStoreName),Serdes.String(),AlarmStatus.serde()).withCachingEnabled());// 处理报警KTable<Windowed<String>, String> lowAlarms = alarmCounts.toStream().filter((k, v) -> v == 1) // 第一次超过100.filter((k, v) -> shouldTriggerAlarm(k.key(), "low", alarmStateStore)).mapValues((k, v) -> updateAlarmStatus(k.key(), "low", "ALARM: Sensor value over 100", alarmStateStore));// 处理高报KTable<Windowed<String>, String> highAlarms = alarmCounts.toStream().filter((k, v) -> k.window().end() - k.window().start() >= Duration.ofMinutes(5).toMillis()) // 窗口持续时间 >= 5分钟.filter((k, v) -> shouldTriggerAlarm(k.key(), "high", alarmStateStore)).mapValues((k, v) -> updateAlarmStatus(k.key(), "high", "HIGH ALARM: Sensor value over 100 for more than 5 minutes", alarmStateStore));// 处理高高报KTable<Windowed<String>, String> highHighAlarms = alarmCounts.toStream().filter((k, v) -> k.window().end() - k.window().start() >= Duration.ofMinutes(10).toMillis()) // 窗口持续时间 >= 10分钟.filter((k, v) -> shouldTriggerAlarm(k.key(), "high-high", alarmStateStore)).mapValues((k, v) -> updateAlarmStatus(k.key(), "high-high", "HIGH HIGH ALARM: Sensor value over 100 for more than 10 minutes", alarmStateStore));// 处置报警filteredReadings.foreach((k, v) -> handleAlarmDisposal(k, v, alarmStateStore, filteredReadings));// 输出报警通知lowAlarms.toStream().to("low-alarm-notifications", Produced.with(Serdes.String(), Serdes.String()));highAlarms.toStream().to("high-alarm-notifications", Produced.with(Serdes.String(), Serdes.String()));highHighAlarms.toStream().to("high-high-alarm-notifications", Produced.with(Serdes.String(), Serdes.String()));// 启动 Kafka Streams 实例KafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();}// 更新报警状态的方法private static String updateAlarmStatus(String sensorId, String alarmType, String message, KeyValueStore<String, AlarmStatus> store) {AlarmStatus status = store.get(sensorId);if (status == null) {status = new AlarmStatus(); // 创建新的报警状态}status.setAlarmType(alarmType);status.setAlarmMessage(message);status.setLastUpdated(System.currentTimeMillis());store.put(sensorId, status); // 保存报警状态return message;}// 决定是否触发报警的方法private static boolean shouldTriggerAlarm(String sensorId, String alarmType, KeyValueStore<String, AlarmStatus> store) {AlarmStatus status = store.get(sensorId);if (status == null) {return true; // 初始状态,可以触发报警} else {if (alarmType.equals(status.getAlarmType())) {return false; // 报警类型相同,不触发}if ("low".equals(status.getAlarmType()) && "high".equals(alarmType)) {return true; // 升级到高报}if ("high".equals(status.getAlarmType()) && "high-high".equals(alarmType)) {return true; // 升级到高高报}return false; // 其他情况不触发}}// 处置报警的方法private static void handleAlarmDisposal(String sensorId, Double value, KeyValueStore<String, AlarmStatus> store, KStream<String, Double> readings) {if (value < 100) {store.remove(sensorId); // 清除报警状态// 发送设备的重置信号到 sensor-reset-topicreadings.filter((k, v) -> k.equals(sensorId)).to("sensor-reset-topic", Produced.with(Serdes.String(), Serdes.Double()));}}// 报警状态类static class AlarmStatus {private String alarmType;private String alarmMessage;private long lastUpdated;public String getAlarmType() {return alarmType;}public void setAlarmType(String alarmType) {this.alarmType = alarmType;}public String getAlarmMessage() {return alarmMessage;}public void setAlarmMessage(String alarmMessage) {this.alarmMessage = alarmMessage;}public long getLastUpdated() {return lastUpdated;}public void setLastUpdated(long lastUpdated) {this.lastUpdated = lastUpdated;}public static Serde<AlarmStatus> serde() {return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(AlarmStatus.class));}}
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • AI基础 -- 练手之预测耗时方案
  • vllm 推理qwen gguf模型使用案例;openai接口调用、requests调用
  • ploarDNctf靶场[CRYPTO]你知道M型栅栏密码吗?、一闪一闪亮星星、interesting
  • JavaScript ES6+ 新特性
  • 《费曼学习法》
  • Android 12系统源码_输入系统(二)InputManagerService服务
  • Kubernetes存储Volume
  • 【STM32】时钟体系
  • 凌鸥学园电机控制学习盛宴,诚邀您的加入
  • 若依后端添加子模块swagger分区
  • MySQL中的事物详解
  • Electron程序逆向(asar归档解包)
  • YoloV8实战:使用YoloV8实现OBB框检测
  • 数据结构---单链表(常见的复杂操作)
  • OpenAI 神秘模型「草莓」预计今秋推出,ChatGPT 将迎重大升级|TodayAI
  • (ckeditor+ckfinder用法)Jquery,js获取ckeditor值
  • CSS盒模型深入
  • Golang-长连接-状态推送
  • Javascript基础之Array数组API
  • js数组之filter
  • Meteor的表单提交:Form
  • Node项目之评分系统(二)- 数据库设计
  • Spring Cloud Alibaba迁移指南(一):一行代码从 Hystrix 迁移到 Sentinel
  • Spring Cloud中负载均衡器概览
  • Swoft 源码剖析 - 代码自动更新机制
  • Yeoman_Bower_Grunt
  • 给Prometheus造假数据的方法
  • 检测对象或数组
  • 京东美团研发面经
  • 聊聊flink的TableFactory
  • 我与Jetbrains的这些年
  • 一加3T解锁OEM、刷入TWRP、第三方ROM以及ROOT
  • 一起来学SpringBoot | 第三篇:SpringBoot日志配置
  • 优化 Vue 项目编译文件大小
  • Java总结 - String - 这篇请使劲喷我
  • 宾利慕尚创始人典藏版国内首秀,2025年前实现全系车型电动化 | 2019上海车展 ...
  • 好程序员大数据教程Hadoop全分布安装(非HA)
  • ​HTTP与HTTPS:网络通信的安全卫士
  • ​埃文科技受邀出席2024 “数据要素×”生态大会​
  • ​渐进式Web应用PWA的未来
  • ​软考-高级-信息系统项目管理师教程 第四版【第23章-组织通用管理-思维导图】​
  • #07【面试问题整理】嵌入式软件工程师
  • #define
  • #QT项目实战(天气预报)
  • (2)(2.4) TerraRanger Tower/Tower EVO(360度)
  • (仿QQ聊天消息列表加载)wp7 listbox 列表项逐一加载的一种实现方式,以及加入渐显动画...
  • (亲测有效)解决windows11无法使用1500000波特率的问题
  • (一)SvelteKit教程:hello world
  • (杂交版)植物大战僵尸
  • .NET 6 在已知拓扑路径的情况下使用 Dijkstra,A*算法搜索最短路径
  • .NET DevOps 接入指南 | 1. GitLab 安装
  • .NET Standard / dotnet-core / net472 —— .NET 究竟应该如何大小写?
  • .NET 药厂业务系统 CPU爆高分析
  • .NET/C#⾯试题汇总系列:集合、异常、泛型、LINQ、委托、EF!(完整版)
  • @Transactional注解下,循环取序列的值,但得到的值都相同的问题