当前位置: 首页 > news >正文

#调用传感器数据_Flink使用函数之监控传感器温度上升提醒

使用函数ProcessFunction介绍

Flink的ProcessFunction 函数是低阶流处理算子,可以访问流应用程序所有(非循环)基本构建块:

  • 事件 (数据流元素)

  • 状态 (容错和一致性)

  • 定时器 (事件时间和处理时间)

DataStream API提供了ProcessFunction转换算子,可以访问时间戳、注册定时事件。输出特定的一些事件。

ProcessFunction 是一种提供对 KeyedState 和定时器访问的 FlatMapFunction。每在输入流中接收到一个事件,就会调用来此函数来处理。对于容错的状态,ProcessFunction 可以通过 RuntimeContext 访问 KeyedState。

Timers 定时器可以对处理时间和事件时间的变化做一些处理。每次调用 processElement() 都可以获得一个 Context 对象,通过该对象可以访问元素的事件时间戳以及 TimerService。TimerService 可以为尚未发生的事件时间/处理时间实注册回调。当定时器到达某个时刻时,会调用 onTimer() 方法。在调用期间,所有状态再次限定为定时器创建的键,允许定时器操作 KeyedState。

ProcessFunction使用

模拟案例场景:传感器状态信息(这里举例:温度)会隔几秒上传,如果这次上传的温度

比上一次上传的温度高,温度10秒以内都是比较高,就进行持续的提示。

1、这里传感器状态消息模拟从socket接收,消息格式包含二个字段分别是:传感器标识id,传感器温度。

2、接收消息之后对数据流按 传感器标识Id 进行分组,对于温度比上一次高的消息,设置定时器10秒以内如果都是对比上一次温度高,则定时进行提示,如果10秒内对比上一次温度低,则认为温度没有升高,则删除定时器,取消提示。

传感器消息类SensorReading,代码如下:

public class SensorReading {    // 传感器 id    public String sensorId;    // 时间戳    public String timeStamp;    // 温度    public Double temperature;    // 状态描述    public  String lowOrhigt;    // 状态标识    public String status;    public SensorReading() {}    public  SensorReading(String sensorId,String timeStamp,Double temperature){        this.sensorId = sensorId;        this.timeStamp = timeStamp;        this.temperature = temperature;    }    public  SensorReading(String sensorId,Double temperature,String status){        this.sensorId = sensorId;        this.temperature = temperature;        this.status=status;    }    public  SensorReading(String sensorId,Double temperature){        this.sensorId = sensorId;        this.temperature = temperature;    }    @Override    public String toString() {        return "SensorReading{" +                "sensorId='" + sensorId + '\'' +                ", timeStamp=" + timeStamp +                ", temperature=" + temperature +                ", lowOrhigt=" + lowOrhigt +                ", status=" + status +                '}';    }}

自定义TempIncreKeyedProcessFunction类继承自KeyedProcessFunction,代码如下:

可仔细阅读代码注释

/** * @author: Created By yanshien * @company ChinaUnicom Software ysn * @date: 2020-12-24 15:13 * @version: v1.0 * @description: 监控传感器温度是否上升 **/public class TempIncreKeyedProcessFunction extends KeyedProcessFunction<String, SensorReading, String> {    ValueState lastTempState;  // 上一次温度值    ValueState timerTsState;    //  注册定时器的时间戳    @Override    public void open(Configuration parameters) throws Exception {        super.open(parameters);        lastTempState=getRuntimeContext().getState(new ValueStateDescriptor<>("last-temp", Double.class));        timerTsState=getRuntimeContext().getState(new ValueStateDescriptor<>("timer-ts", Long.class));    }    @Override    public void processElement(SensorReading sensorReading, Context context, Collector collector) throws Exception {        // 先取出上一次状态        Double lastTemp = lastTempState.value();        Long timerTs = timerTsState.value();        // 更新温度值        lastTempState.update(sensorReading.temperature);        // 当前温度值和上次温度进行比较,如果温度上升,且没有定时器,注册当前时间10s后进行提醒        // System.out.println( sensorReading.temperature);        if (lastTemp == null) {            lastTemp = sensorReading.temperature;        }        if (timerTs == null) {            timerTs = 0L;        }        if( sensorReading.temperature > lastTemp && timerTs == 0 ){            // 注册当前时间10s后的定时器            Long ts = context.timerService().currentProcessingTime()  +10000L;            context.timerService().registerProcessingTimeTimer(ts);            timerTsState.update(ts);        // 如果温度下降,那么删除定时器        } else if( sensorReading.temperature < lastTemp ){            context.timerService().deleteProcessingTimeTimer(timerTs);            timerTsState.clear();        }    }    @Override    public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {        out.collect("传感器" + ctx.getCurrentKey() + "的温度连续" + 10000L/1000 + "秒连续上升");        timerTsState.clear();    }}

主函数代码如下:

/** * @author: Created By yanshien * @company ChinaUnicom Software ysn * @date: 2020-12-24 15:13 * @version: v1.0 * @description: 监控传感器温度是否上升 **/public class ProcessFunctionDemo {    public static void main(String[] args) throws Exception {        // 创建流处理的执行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        /*设置使用EventTime作为Flink的时间处理标准,不指定默认是ProcessTime*/        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        // 这里为了便于理解,设置并行度为1,默认并行度是当前机器的cpu数量        env.setParallelism(1);        // 指定数据源 从socket的9000端口接收数据        // 在 cmd 打开 nc -L -p 9000         DataStream inputStream = env.socketTextStream("localhost", 9000);        DataStream sourceDS = inputStream.filter(new FilterFunction() {                    @Override                    public boolean filter(String line) throws Exception {                        if (null == line || "".equals(line)) {                            return false;                        }                        String[] lines = line.split(",");                        if (lines.length != 2) {                            return false;                        }                        return true;                    }                });        // map转换,将数据转换成SensorReading格式,第一个字段代表是传感器id,第二个字段的代表的是传感器温度        DataStream warningDS = sourceDS.map(new MapFunction() {            @Override            public SensorReading map(String line) throws Exception {                String[] lines = line.split(",");                return new SensorReading(lines[0], Double.valueOf(lines[1]));            }        }).keyBy(new KeySelector() {            @Override            public String getKey(SensorReading value) throws Exception {                return value.sensorId;            }        }).process(new TempIncreKeyedProcessFunction());        // 打印提醒信息        warningDS.print();        env.execute();    }

程序测试

在 cmd 打开执行命令:nc -L -p 9000,然后运行程序,从socket端发送数据如下:

sensor_1,29.7sensor_1,30.9sensor_1,32

测试10秒内发送温度比前一次高,则进行提醒输出,如下图所示:

401ac0dfd59098fc9e4451c9a32c7573.png

如果觉得文章能帮到您,欢迎关注、转发。

相关文章:

  • 内存使用率_微软深度优化Edge:跑分提高13% 内存占用下降27% 体积缩小近50%
  • 程序员转实施工程师_实施工程师到底做什么的?我认为比程序员接触面更广
  • bocketmq 多个消费者同时_RocketMQ 实战之快速入门
  • linux 安装mysql_linux下在线安装mysql(完整版)
  • mysql insert into 时间_关于MySQL的insert添加自动获取日期的now()的用法
  • mysql 存储模板文件_mysql 模板 存储
  • cyq.data 连接mysql_CYQ.Data V5 文本数据库支持SQL语句操作(实现原理解说)-阿里云开发者社区...
  • mysql组复制脑裂_MySQL 组复制介绍
  • 前端websocket更新列表数据_详解前端websocket服务器之数据传输协议
  • stopwords怎么用_用Python画词云图,展示“新冠肺炎”关键词
  • pygame简单小游戏代码_用pygame实现一个简单的垃圾分类小游戏(已获校级二等奖)!...
  • mysql数据迁移到redis_Mysql到Redis的数据迁移方法
  • linux mysql 客户端 安装配置_linux下mysql的安装部署
  • mysql虚拟表的创建_mysql虚拟表
  • win 10 查看mysql密码_win10系统登录mysql时报错1045?查看解决方案
  • 「前端早读君006」移动开发必备:那些玩转H5的小技巧
  • axios请求、和返回数据拦截,统一请求报错提示_012
  • Babel配置的不完全指南
  • css的样式优先级
  • Invalidate和postInvalidate的区别
  • java架构面试锦集:开源框架+并发+数据结构+大企必备面试题
  • Joomla 2.x, 3.x useful code cheatsheet
  • JS题目及答案整理
  • mongo索引构建
  • Puppeteer:浏览器控制器
  • SAP云平台里Global Account和Sub Account的关系
  • select2 取值 遍历 设置默认值
  • UEditor初始化失败(实例已存在,但视图未渲染出来,单页化)
  • vue-cli在webpack的配置文件探究
  • yii2权限控制rbac之rule详细讲解
  • 开源SQL-on-Hadoop系统一览
  • 前端路由实现-history
  • 我从编程教室毕业
  • 译米田引理
  • 容器镜像
  • ​什么是bug?bug的源头在哪里?
  • (22)C#传智:复习,多态虚方法抽象类接口,静态类,String与StringBuilder,集合泛型List与Dictionary,文件类,结构与类的区别
  • (C#)Windows Shell 外壳编程系列4 - 上下文菜单(iContextMenu)(二)嵌入菜单和执行命令...
  • (补)B+树一些思想
  • (超简单)使用vuepress搭建自己的博客并部署到github pages上
  • (附源码)计算机毕业设计大学生兼职系统
  • (六)c52学习之旅-独立按键
  • (六)激光线扫描-三维重建
  • (六)库存超卖案例实战——使用mysql分布式锁解决“超卖”问题
  • (免费领源码)Java#Springboot#mysql农产品销售管理系统47627-计算机毕业设计项目选题推荐
  • (原創) 未来三学期想要修的课 (日記)
  • (转)Scala的“=”符号简介
  • **登录+JWT+异常处理+拦截器+ThreadLocal-开发思想与代码实现**
  • .NET Windows:删除文件夹后立即判断,有可能依然存在
  • .NET 材料检测系统崩溃分析
  • .NET 常见的偏门问题
  • .NET6 开发一个检查某些状态持续多长时间的类
  • .NET分布式缓存Memcached从入门到实战
  • .NET与java的MVC模式(2):struts2核心工作流程与原理
  • .sh