当前位置: 首页 > news >正文

二百零三、Flume——Flume实时采集数据频率为1s的高频率Kafka数据直接写入ODS层表的HDFS文件路径下

一、目的

在离线数仓中,需要用Flume去采集Kafka中的数据,然后写入HDFS中。

由于每种数据类型的频率、数据大小、数据规模不同,因此每种数据的采集需要不同的Flume配置文件。玩了几天Flume,感觉Flume的使用难点就是配置文件

二、使用场景

静态排队数据是数据频率为1s的数据类型代表,数据量很大、频率很高,因此搞定了静态排队数据的采集就搞定了这一类高频率数据的实时采集问题

1台雷达每日的静态排队数据规模是25MB,10台雷达的数据规模则是250MB

三、静态排队数据的配置文件


## agent a1
a1.sources = s1
a1.channels = c1
a1.sinks = k1

## configure source s1
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.kafka.bootstrap.servers = 192.168.0.27:9092
a1.sources.s1.kafka.topics = topic_b_queue
a1.sources.s1.kafka.consumer.group.id = queue_group
a1.sources.s1.kafka.consumer.auto.offset.reset = latest
a1.sources.s1.batchSize = 1000

## configure channel c1
## a1.channels.c1.type = memory
## a1.channels.c1.capacity = 10000
## a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/data/flumeData/checkpoint/queue
a1.channels.c1.dataDirs = /home/data/flumeData/flumedata/queue

## configure sink k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hurys23:8020/user/hive/warehouse/hurys_dc_ods.db/ods_queue/day=%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = queue
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 10240000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 60
a1.sinks.k1.hdfs.minBlockReplicas = 1

## Bind the source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

四、Flume写入HDFS结果

Flume根据时间戳按照ODS层表的分区,将数据写入对应HDFS文件

五、ODS表刷新分区后查验数据

(一)刷新表分区

MSCK REPAIR TABLE ods_queue;

(二)查看表数据

select * from ods_queue;

六、注意点

(一)配置文件中的重点是红色标记的几点

a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 10240000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 60
a1.sinks.k1.hdfs.minBlockReplicas = 1

(二)这几个重点参数的含义

序号Flume参数参数含义
1round是否启用时间上的”舍弃”,如果启用,则会影响除了%t的其他所有时间表达式                                       默认值:false
2roundValue多少时间单位创建一个新的文件夹
3roundUnit重新定义时间单位
4rollSize当临时文件达到该大小(单位:bytes)时,滚动成目标文件;默认值:1024byte            如果设置成0,则表示不根据临时文件大小来滚动文件
5rollCount当events数据达到该数量时候,将临时文件滚动成目标文件;默认值:10               如果设置成0,则表示不根据events数据来滚动文件
6rollInterval多久将临时文件滚动成最终目标文件,单位:秒;默认值:30s                    如果设置成0,则表示不根据时间来滚动文件;
7idleTimeout当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件;            默认值:0
8minBlockReplicas写入HDFS文件块的最小副本数,一般配置成1才能正确滚动文件

更多Flume配置文件参数含义请看鄙人另一篇博客

一百九十一、Flume——Flume配置文件各参数含义(持续完善中)

http://t.csdnimg.cn/o5XbGicon-default.png?t=N7T8http://t.csdnimg.cn/o5XbG

就先这样吧,如果有问题的话后面再更新!!!

相关文章:

  • 三行Python代码即可将视频转Gif
  • ASP.NETWeb开发(C#版)-day1-C#基础+实操
  • 【SA8295P 源码分析 (三)】121 - MAX9295A 加串器芯片手册分析 及初始化参数分析
  • 在 HarmonyOS 上实现 ArkTS 与 H5 的交互
  • LeetCode-2760. 最长奇偶子数组-滑动窗口暴力
  • 基于Matlab+ AlexNet神经网络的动物识别系统
  • 基于Gin+Gorm框架搭建MVC模式的Go语言企业级后端系统
  • Windows客户端开发框架WPF简介
  • 3.5 Windows驱动开发:应用层与内核层内存映射
  • 二维码智慧门牌管理系统升级解决方案:实现服务状态实时监控
  • JPA概述
  • OceanBase杨冰:完全自研,才能逢山开路遇水搭桥
  • 服务器数据恢复—磁盘出现坏道掉线导致raid5阵列崩溃的数据恢复案例
  • 服务器数据恢复—VMware虚拟化下误操作导致服务器崩溃的数据恢复案例
  • 什么是自动化测试框架?
  • 分享一款快速APP功能测试工具
  • @angular/forms 源码解析之双向绑定
  • E-HPC支持多队列管理和自动伸缩
  • ES6之路之模块详解
  • JS基础篇--通过JS生成由字母与数字组合的随机字符串
  • node 版本过低
  • spring + angular 实现导出excel
  • TypeScript实现数据结构(一)栈,队列,链表
  • Web标准制定过程
  • 第十八天-企业应用架构模式-基本模式
  • 前嗅ForeSpider教程:创建模板
  • 容器化应用: 在阿里云搭建多节点 Openshift 集群
  • 什么软件可以剪辑音乐?
  • 它承受着该等级不该有的简单, leetcode 564 寻找最近的回文数
  • 我看到的前端
  • 学习Vue.js的五个小例子
  • 用element的upload组件实现多图片上传和压缩
  • 曾刷新两项世界纪录,腾讯优图人脸检测算法 DSFD 正式开源 ...
  • ​草莓熊python turtle绘图代码(玫瑰花版)附源代码
  • # 达梦数据库知识点
  • #1014 : Trie树
  • #etcd#安装时出错
  • $(selector).each()和$.each()的区别
  • (06)金属布线——为半导体注入生命的连接
  • (ZT)出版业改革:该死的死,该生的生
  • (利用IDEA+Maven)定制属于自己的jar包
  • (每日持续更新)信息系统项目管理(第四版)(高级项目管理)考试重点整理 第13章 项目资源管理(七)
  • (十一)图像的罗伯特梯度锐化
  • (一)Linux+Windows下安装ffmpeg
  • (一)python发送HTTP 请求的两种方式(get和post )
  • (已解决)什么是vue导航守卫
  • .net CHARTING图表控件下载地址
  • .net core Swagger 过滤部分Api
  • .NET Core 版本不支持的问题
  • .NET Project Open Day(2011.11.13)
  • .NET 使用 ILRepack 合并多个程序集(替代 ILMerge),避免引入额外的依赖
  • .NET/C# 使窗口永不激活(No Activate 永不获得焦点)
  • [20171106]配置客户端连接注意.txt
  • [AI]文心一言出圈的同时,NLP处理下的ChatGPT-4.5最新资讯
  • [BUUCTF 2018]Online Tool(特详解)