当前位置: 首页 > news >正文

【实战-08】 flink自定义Map中的变量的行为

场景

自定义Map或者别的算子的时候,有时候需要定义一些类变量,在flink内部高并发的情况下需要正确理解这些变量的行为

代码

package com.pg.function;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;public class FlinkFunction {//对于自定义函数中的变量,只有内置的状态是完全按照flink内置的 keyBy行为来的//如果是自定义的缓存比如ArrayList 则可能不会按照预期的行为public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStream<String> dataStream = env.fromElements( "b","b","b","c","c","c","d","d","d");dataStream.keyBy(x->{return x;}).map(new MyMap()).print();env.execute();}}class MyMap extends RichMapFunction<String, String> {public ArrayList<String> list= new ArrayList<>();
//     public ValueState<Integer> counter;//存储数据条数
//     public ValueState<String> element;//存储临时数据
//     @Override
//     public void open(Configuration parameters) throws Exception {
//         counter = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("counter", Types.INT));
//         element = getRuntimeContext().getState(new ValueStateDescriptor<>("element", Types.STRING));
//     }@Overridepublic String map(String s) throws Exception {list.add(s);if(list.size()==2){String re = list.toString();list.clear();return re;}else {return "null";}
//        if (counter.value() == null) {
//            counter.update(1);//遇见第一条数据的时候,计数器为1
//        } else {
//            counter.update(counter.value() + 1);
//        }
//        if (element.value() == null) {
//            element.update(s);//element只存储上一次到来的数据
//        }else {
//            element.update(element.value()+s);
//        }
//        if (counter.value() == 2) {
//            String re = element.value();
//            //发出结果之后清楚状态
//            counter.clear();
//            element.clear();
//            return re;
//        }else {
//            return "null";
//        }}
}

分析

keyBy之后,理论上相同key的会在map中用同样的处理逻辑,我们的预期行为是输出:bb,cc,dd
但是用ArrayList实现的逻辑最终输出却是:bb,bc,cc,dd
用ValueState的输出是:bb,cc,dd
这说明了,keBy后的逻辑,ArrayList不会按照预期的行为执行。这是因为在flink中,当多个并发的时候,多个key如果落入同一个线程
则当前线程的valueState是和某一个key绑定的,符合flink预期行为,但是ArrayList以及其它你定义的变量则不做保证, 它是线程级别的局部变量, 这点要注意。

相关文章:

  • 深入JVM:解析OOM的三大场景,原因及实战解决方案
  • 论文阅读NAM:Normalization-based Attention Module
  • 错误:comparison method violates its general contract
  • 智慧应急:构建全方位、立体化的安全保障网络
  • vue使用gitshot生成gif
  • 【Langchain多Agent实践】一个有推销功能的旅游聊天机器人
  • 如何在Window系统部署BUG管理软件并结合内网穿透实现远程管理本地BUG
  • SpringMVC 学习(二)之第一个 SpringMVC 案例
  • 解释什么是内连接、左连接和右连接,并给出每种连接的SQL示例
  • day03_登录注销(前端接入登录,异常处理, 图片验证码,获取用户信息接口,退出功能)
  • 【pytorch矩阵应用】
  • 哈工大中文mistral介绍(Chinese-Mixtral-8x7B)
  • Redis实现滑动窗口限流
  • 微服务之qiankun主项目+子项目搭建
  • C++:封装
  • 《Java8实战》-第四章读书笔记(引入流Stream)
  • Brief introduction of how to 'Call, Apply and Bind'
  • java小心机(3)| 浅析finalize()
  • redis学习笔记(三):列表、集合、有序集合
  • vue自定义指令实现v-tap插件
  • 爱情 北京女病人
  • 百度地图API标注+时间轴组件
  • 二维平面内的碰撞检测【一】
  • 好的网址,关于.net 4.0 ,vs 2010
  • 技术发展面试
  • 微服务框架lagom
  • 异步
  • media数据库操作,可以进行增删改查,实现回收站,隐私照片功能 SharedPreferences存储地址:
  • raise 与 raise ... from 的区别
  • 如何在招聘中考核.NET架构师
  • ​水经微图Web1.5.0版即将上线
  • #Js篇:单线程模式同步任务异步任务任务队列事件循环setTimeout() setInterval()
  • #我与Java虚拟机的故事#连载14:挑战高薪面试必看
  • (4.10~4.16)
  • (NO.00004)iOS实现打砖块游戏(九):游戏中小球与反弹棒的碰撞
  • (poj1.2.1)1970(筛选法模拟)
  • (分享)自己整理的一些简单awk实用语句
  • (附源码)spring boot智能服药提醒app 毕业设计 102151
  • (十八)三元表达式和列表解析
  • (完整代码)R语言中利用SVM-RFE机器学习算法筛选关键因子
  • (转)winform之ListView
  • (转载)hibernate缓存
  • .Net 6.0 处理跨域的方式
  • .Net Attribute详解(上)-Attribute本质以及一个简单示例
  • .NET Core实战项目之CMS 第十二章 开发篇-Dapper封装CURD及仓储代码生成器实现
  • .NET MVC第三章、三种传值方式
  • .Net Remoting(分离服务程序实现) - Part.3
  • .NET 药厂业务系统 CPU爆高分析
  • .NET 依赖注入和配置系统
  • .Net6使用WebSocket与前端进行通信
  • @EnableAsync和@Async开始异步任务支持
  • [ 第一章] JavaScript 简史
  • [2018/11/18] Java数据结构(2) 简单排序 冒泡排序 选择排序 插入排序
  • [E链表] lc83. 删除排序链表中的重复元素(单链表+模拟)
  • [Godot] 3D拾取