当前位置: 首页 > news >正文

Flink广播流 BroadcastStream

文章目录

  • 前言
  • BroadcastStream代码示例
  • Broadcast 使用注意事项


前言

Flink中的广播流(BroadcastStream)是一种特殊的流处理方式,它允许将一个流(通常是一个较小的流)广播到所有的并行任务中,从而实现在不同任务间共享数据的目的。广播流在处理配置信息、小数据集或者全局变量等场景下特别有用,因为这些数据需要在所有任务中保持一致且实时更新。

广播流的使用通常涉及以下步骤:

  1. 定义MapStateDescriptor:首先需要定义一个MapStateDescriptor来描述要广播的数据的格式。这个描述器指定了数据的键值对类型。

  2. 创建广播流:然后,需要将一个普通的流转换为广播流。这通常通过调用流的broadcast()方法实现,并将MapStateDescriptor作为参数传入。

  3. 连接广播流与非广播流:一旦有了广播流,就可以将其与一个或多个非广播流(无论是Keyed流还是Non-Keyed流)连接起来。这通过调用非广播流的connect()方法完成,并将广播流作为参数传入。连接后的流是一个BroadcastConnectedStream,它提供了process()方法用于处理数据。

  4. 处理数据:在process()方法中,可以编写逻辑来处理非广播流和广播流的数据。根据非广播流的类型(Keyed或Non-Keyed),需要传入相应的KeyedBroadcastProcessFunctionBroadcastProcessFunction类型的处理函数。

广播流的一个典型使用场景是在处理数据时需要实时动态改变配置。例如,当需要从MySQL数据库中实时查询和更新某些关键字过滤规则时,如果直接在计算函数中进行查询,可能会阻塞整个计算过程甚至导致任务停止。通过使用广播流,可以将这些配置信息广播到所有相关任务的实例中,然后在计算过程中直接使用这些配置信息,从而提高计算效率和实时性。

总的来说,Flink的广播流提供了一种有效的方式来实现不同任务间的数据共享和实时更新,适用于各种需要全局数据或配置的场景。


BroadcastStream代码示例

功能:将用户信息进行广播,从Kafka中读取用户访问记录,判断访问用户是否存在


import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;import flink.demo.data.UserVo;
/*** 多流connect,并进行join**/
public class BroadcastTest{public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties proterties = new Properties();proterties.setProperty("bootstrap.servers", "10.168.88.88:9092");proterties.setProperty("group.id", "test");proterties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");proterties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        proterties.setProperty("auto.offset.reset", "latest");FlinkKafkaConsumer<ObjectNode> consumerVisit= new FlinkKafkaConsumer<>("test",new JSONKeyValueDeserializationSchema(false), proterties);DataStreamSource<ObjectNode> streamSource = env.addSource(consumerVisit);DataStreamSource<Tuple2<String, List<UserVo>>> userStreamSource = env.addSource(new UserListSource());MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));BroadcastStream<Tuple2<String, List<UserVo>>> broadcastStream = userStreamSource.broadcast(descriptor);// 将数据流和控制流进行连接,利用控制流中的数据来控制字符串的输出BroadcastConnectedStream<ObjectNode, Tuple2<String, List<UserVo>>> tmp=streamSource.connect(broadcastStream);tmp.process(new UserPvProcessor()).print();env.execute("kafkaTest");}private static class UserPvProcessorextends BroadcastProcessFunction<ObjectNode, Tuple2<String, List<UserVo>>, String> {private static final long serialVersionUID = 1L;MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));@Override//用户信息处理public void processBroadcastElement(Tuple2<String, List<UserVo>> value, Context ctx, Collector<String> out)throws Exception {// 将接收到的控制数据放到 broadcast state 中  ctx.getBroadcastState(descriptor).put(value.f0, value.f1);// 打印控制信息System.out.println(Thread.currentThread().getName() + " 接收到用户信息 : "+value.f0+"   " + value.f1);}@Override//数据流public void processElement(ObjectNode element, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 从 broadcast state 中拿到用户列表信息List<UserVo> userList = ctx.getBroadcastState(descriptor).get("userList");String time=LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));if(userList!=null&&userList.size()>0) {Map<String,String> userMap=new HashMap<>();for(UserVo vo:userList) {userMap.put(vo.getUserid(), vo.getUserName());}
//				System.out.println(userMap);JsonNode value = element.get("value");String userid=value.get("user").asText();String userName=userMap.get(userid);if (StringUtils.isNotBlank(userName)) {out.collect(Thread.currentThread().getName()+"存在用户"+userid+"  "+userName +" "+time);}else {out.collect(Thread.currentThread().getName()+"不存在用户"+userid+" "+time );}}else {out.collect(Thread.currentThread().getName()+"不存在用户"+element.get("value")+" "+time );}}}
}

Broadcast 使用注意事项

  • 同一个 operator 的各个 task 之间没有通信,广播流侧(processBroadcastElement)可以能修改 broadcast state,而数据流侧(processElement)只能读 broadcast state.;
  • 需要保证所有 Operator task 对 broadcast state 的修改逻辑是相同的,否则会导致非预期的结果;
  • Operator tasks 之间收到的广播流元素的顺序可能不同:虽然所有元素最终都会下发给下游tasks,但是元素到达的顺序可能不同,所以更新state时不能依赖元素到达的顺序;
  • 每个 task 对各自的 Broadcast state 都会做快照,防止热点问题;
  • 目前不支持 RocksDB 保存 Broadcast state:Broadcast state 目前只保存在内存中,需要为其预留合适的内存

相关文章:

  • 如何从 Mac 电脑外部硬盘恢复删除的数据文件
  • 【Python】Leetcode 240. 搜索二维矩阵 II - 削减矩阵+递归,击败88%
  • 《智能便利,畅享便利柜平台的架构奇妙之旅》
  • 人形机器人进展:IEEE Robotics出版双臂通用协同机械手操作架构
  • WPF 两个程序之间传递参数(shell32.dll)
  • #QT(一种朴素的计算器实现方法)
  • 腾讯云服务器CVM_云主机_云计算服务器_弹性云服务器
  • 网络基础 - 预备知识(协议、网络协议、网络传输流程、地址管理)
  • 基础---nginx 启动不了,跟 Apache2 服务冲突
  • <Senior High School Math>: inequality question
  • 【C/C++ 学习笔记】指针
  • 【OceanBase诊断调优】 —— 敏捷诊断工具obdiag一键收集诊断信息实践
  • ChatGPT的核心技术
  • GoLang:云原生时代致力于构建高性能服务器的后端语言
  • 实现两栏布局
  • 5、React组件事件详解
  • Angular 响应式表单 基础例子
  • canvas实际项目操作,包含:线条,圆形,扇形,图片绘制,图片圆角遮罩,矩形,弧形文字...
  • GraphQL学习过程应该是这样的
  • JavaScript中的对象个人分享
  • Just for fun——迅速写完快速排序
  • React Transition Group -- Transition 组件
  • React的组件模式
  • SpiderData 2019年2月25日 DApp数据排行榜
  • 百度贴吧爬虫node+vue baidu_tieba_crawler
  • 从setTimeout-setInterval看JS线程
  • 订阅Forge Viewer所有的事件
  • 前端设计模式
  • 说说动画卡顿的解决方案
  • 应用生命周期终极 DevOps 工具包
  • 原生Ajax
  • Linux权限管理(week1_day5)--技术流ken
  • Nginx惊现漏洞 百万网站面临“拖库”风险
  • ​TypeScript都不会用,也敢说会前端?
  • # Swust 12th acm 邀请赛# [ K ] 三角形判定 [题解]
  • #git 撤消对文件的更改
  • #Linux杂记--将Python3的源码编译为.so文件方法与Linux环境下的交叉编译方法
  • #NOIP 2014# day.1 T3 飞扬的小鸟 bird
  • (06)Hive——正则表达式
  • (26)4.7 字符函数和字符串函数
  • (AngularJS)Angular 控制器之间通信初探
  • (LeetCode 49)Anagrams
  • (Redis使用系列) Springboot 使用redis实现接口Api限流 十
  • (汇总)os模块以及shutil模块对文件的操作
  • (三分钟了解debug)SLAM研究方向-Debug总结
  • ****** 二十三 ******、软设笔记【数据库】-数据操作-常用关系操作、关系运算
  • ... 是什么 ?... 有什么用处?
  • .net core 6 使用注解自动注入实例,无需构造注入 autowrite4net
  • .net 打包工具_pyinstaller打包的exe太大?你需要站在巨人的肩膀上-VC++才是王道
  • .net 受管制代码
  • .NET 线程 Thread 进程 Process、线程池 pool、Invoke、begininvoke、异步回调
  • .Net+SQL Server企业应用性能优化笔记4——精确查找瓶颈
  • .net获取当前url各种属性(文件名、参数、域名 等)的方法
  • .net中我喜欢的两种验证码
  • .net专家(高海东的专栏)