当前位置: 首页 > news >正文

Flink 生产问题(数据倾斜)

Flink 生产问题(数据倾斜)

问题概述

  • 任务节点频繁出现反压,但是增加并行度后并不能解决问题;
  • 部分节点出现 OOM 异常,原因是大量的数据集中在某个节点上,导致该节点内存被爆,任务失败重启。
产生数据倾斜的原因:
  • 业务上有严重的数据热点,比如滴滴打车的订单数据中北京、上海等几个城市的订单量远远超过其他地区;

  • 技术上大量使用了 KeyBy、GroupBy 等操作,错误的使用了分组 Key,人为产生数据热点。

解决问题思路:
  • 业务上要尽量避免热点 key 的设计,例如可以把北京、上海等热点城市分成不同的区域,并进行单独处理;

  • 技术上出现热点时,要调整方案打散原来的 key,避免直接聚合;。

数据倾斜场景案例和解决方案

数据倾斜场景:

统计各省下单次数(北京、上海等几个城市的订单量远远超过其他地区)

解决思路(二次聚合):
  • 首先把分组的 key 打散,加随机数前缀;
  • 对打散后的数据进行聚合;
  • 把打散的 key 去除随机树前缀还原为真正的 key;
  • 二次 KeyBy 进行结果统计,然后输出。

Java 代码如下:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Random;public class OrderCountByProvince {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 模拟输入数据流:订单 (省份, 下单次数)DataStream<Tuple2<String, Integer>> orders = env.fromElements(Tuple2.of("Beijing", 1),Tuple2.of("Shanghai", 1),Tuple2.of("Guangdong", 1),Tuple2.of("Beijing", 1),Tuple2.of("Shanghai", 1),Tuple2.of("Guangdong", 1),Tuple2.of("Beijing", 1),Tuple2.of("Shanghai", 1));// 添加随机前缀,缓解省份热点问题DataStream<Tuple2<String, Integer>> ordersWithPrefix = orders.map(new AddRandomPrefix());// 按带有前缀的省份进行分区统计DataStream<Tuple2<String, Integer>> orderCountsWithPrefix = ordersWithPrefix.keyBy(order -> order.f0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {return Tuple2.of(value1.f0, value1.f1 + value2.f1);}});// 移除随机前缀,合并前缀后的结果DataStream<Tuple2<String, Integer>> orderCounts = orderCountsWithPrefix.map(new RemoveRandomPrefix()).keyBy(order -> order.f0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {return Tuple2.of(value1.f0, value1.f1 + value2.f1);}});// 打印结果orderCounts.print();// 执行任务env.execute("Order Count By Province with Hotspot Mitigation");}// 添加随机前缀的 MapFunctionpublic static class AddRandomPrefix implements MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {private static final Random random = new Random();@Overridepublic Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {String randomPrefix = String.valueOf(random.nextInt(10));return Tuple2.of(randomPrefix + "-" + value.f0, value.f1);}}// 移除随机前缀的 MapFunctionpublic static class RemoveRandomPrefix implements MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {@Overridepublic Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {String province = value.f0.split("-", 2)[1];return Tuple2.of(province, value.f1);}}
}

相关文章:

  • Java类和对象(五)—— 抽象类、接口、Object类和内部类
  • JAVA开发 基于最长公共子序列来计算两个字符串之间的重复率
  • 删除链表的倒数第N个节点-力扣
  • GitLab的原理及应用详解(三)
  • 深入解析kube-scheduler的算法自定义插件
  • 软件系统安全设计规范(word原件)
  • 使用Golang开发一个HTTP客户端请求命令行工具
  • Spring 中常用的手动装载 bean 方法
  • 代码随想录训练营Day 38|力扣435. 无重叠区间、763.划分字母区间、56. 合并区间
  • docker实战之搭建MYSQL8.0主从同步
  • C++11function包装器的使用
  • 如何使用Java发送SOAP请求与webservice 服务进行通信
  • 如何搭建springBoot项目中的全局异常处理和自定义异常处理
  • golang通过go-aci适配神通数据库
  • 【全网最全】2024电工杯数学建模B题问题一14页论文+19建模过程代码+py代码+2种保奖思路+数据等(后续会更新成品论文等)
  • ----------
  • 4个实用的微服务测试策略
  • canvas 高仿 Apple Watch 表盘
  • create-react-app做的留言板
  • HashMap ConcurrentHashMap
  • input实现文字超出省略号功能
  • Javascript设计模式学习之Observer(观察者)模式
  • Java多态
  • Linux中的硬链接与软链接
  • TypeScript实现数据结构(一)栈,队列,链表
  • 第2章 网络文档
  • 关于springcloud Gateway中的限流
  • 和 || 运算
  • 面试遇到的一些题
  • 前端相关框架总和
  • 前嗅ForeSpider教程:创建模板
  • 区块链技术特点之去中心化特性
  • 数据科学 第 3 章 11 字符串处理
  • ​LeetCode解法汇总518. 零钱兑换 II
  • ​七周四次课(5月9日)iptables filter表案例、iptables nat表应用
  • ​软考-高级-系统架构设计师教程(清华第2版)【第1章-绪论-思维导图】​
  • # 20155222 2016-2017-2 《Java程序设计》第5周学习总结
  • (C11) 泛型表达式
  • (附源码)python旅游推荐系统 毕业设计 250623
  • (附源码)ssm本科教学合格评估管理系统 毕业设计 180916
  • (论文阅读26/100)Weakly-supervised learning with convolutional neural networks
  • (已解决)报错:Could not load the Qt platform plugin “xcb“
  • .NET Core 项目指定SDK版本
  • .NET CORE使用Redis分布式锁续命(续期)问题
  • .NET Standard / dotnet-core / net472 —— .NET 究竟应该如何大小写?
  • .NET 中使用 Mutex 进行跨越进程边界的同步
  • .NET/C# 阻止屏幕关闭,阻止系统进入睡眠状态
  • .NET中使用Redis (二)
  • .net中应用SQL缓存(实例使用)
  • .pub是什么文件_Rust 模块和文件 - 「译」
  • .sys文件乱码_python vscode输出乱码
  • ??eclipse的安装配置问题!??
  • ??如何把JavaScript脚本中的参数传到java代码段中
  • @vue/cli脚手架
  • [ vulhub漏洞复现篇 ] AppWeb认证绕过漏洞(CVE-2018-8715)